/**
* Should the set be destroyed once all operations are gone?
+ * #GNUNET_SYSERR if #GNUNET_SET_destroy() must raise this flag,
+ * #GNUNET_YES if #GNUNET_SET_destroy() did raise this flag.
*/
int destroy_requested;
* @param cls the `struct GNUNET_SET_Handle *`
* @param mh the message
*/
- static void
- handle_iter_element (void *cls,
- const struct GNUNET_SET_IterResponseMessage *msg)
+static void
+handle_iter_element (void *cls,
+ const struct GNUNET_SET_IterResponseMessage *msg)
{
struct GNUNET_SET_Handle *set = cls;
GNUNET_SET_ElementIterator iter = set->iterator;
struct GNUNET_MQ_Envelope *ev;
uint16_t msize;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received element in set iteration\n");
msize = ntohs (msg->header.size);
if (set->iteration_id != ntohs (msg->iteration_id))
{
GNUNET_SET_ElementIterator iter = set->iterator;
if (NULL == iter)
+ {
+ /* FIXME: if this is true, could cancel+start a fresh one
+ cause elements to go to the wrong iteration? */
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "Service completed set iteration that was already cancelled\n");
return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Set iteration completed\n");
+ set->destroy_requested = GNUNET_SYSERR;
set->iterator = NULL;
set->iteration_id++;
iter (set->iterator_cls,
NULL);
+ if (GNUNET_SYSERR == set->destroy_requested)
+ set->destroy_requested = GNUNET_NO;
+ if (GNUNET_YES == set->destroy_requested)
+ GNUNET_SET_destroy (set);
}
int destroy_set;
GNUNET_assert (NULL != set->mq);
- result_status = ntohs (msg->result_status);
+ result_status = (enum GNUNET_SET_Status) ntohs (msg->result_status);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Got result message with status %d\n",
result_status);
{
oh->result_cb (oh->result_cls,
NULL,
+ GNUNET_ntohll (msg->current_size),
result_status);
}
else
if (NULL != oh->result_cb)
oh->result_cb (oh->result_cls,
&e,
+ GNUNET_ntohll (msg->current_size),
result_status);
}
struct GNUNET_SET_CancelMessage *m;
struct GNUNET_MQ_Envelope *mqm;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Cancelling SET operation\n");
if (NULL != set)
{
mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
struct GNUNET_SET_Handle *set = cls;
GNUNET_SET_ElementIterator iter = set->iterator;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
+ LOG (GNUNET_ERROR_TYPE_ERROR,
"Handling client set error %d\n",
error);
while (NULL != set->ops_head)
if (NULL != set->ops_head->result_cb)
set->ops_head->result_cb (set->ops_head->result_cls,
NULL,
+ 0,
GNUNET_SET_STATUS_FAILURE);
set_operation_destroy (set->ops_head);
}
set->iterator = NULL;
set->iteration_id++;
+ set->invalid = GNUNET_YES;
if (NULL != iter)
iter (set->iterator_cls,
NULL);
- set->invalid = GNUNET_YES;
- if (GNUNET_YES == set->destroy_requested)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Destroying set after operation failure\n");
- GNUNET_SET_destroy (set);
- }
}
+/**
+ * FIXME.
+ */
static struct GNUNET_SET_Handle *
create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
enum GNUNET_SET_OperationType op,
const uint32_t *cookie)
{
- GNUNET_MQ_hd_var_size (result,
- GNUNET_MESSAGE_TYPE_SET_RESULT,
- struct GNUNET_SET_ResultMessage);
- GNUNET_MQ_hd_var_size (iter_element,
- GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
- struct GNUNET_SET_IterResponseMessage);
- GNUNET_MQ_hd_fixed_size (iter_done,
- GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
- struct GNUNET_MessageHeader);
- GNUNET_MQ_hd_fixed_size (copy_lazy,
- GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
- struct GNUNET_SET_CopyLazyResponseMessage);
struct GNUNET_SET_Handle *set = GNUNET_new (struct GNUNET_SET_Handle);
struct GNUNET_MQ_MessageHandler mq_handlers[] = {
- make_result_handler (set),
- make_iter_element_handler (set),
- make_iter_done_handler (set),
- make_copy_lazy_handler (set),
+ GNUNET_MQ_hd_var_size (result,
+ GNUNET_MESSAGE_TYPE_SET_RESULT,
+ struct GNUNET_SET_ResultMessage,
+ set),
+ GNUNET_MQ_hd_var_size (iter_element,
+ GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
+ struct GNUNET_SET_IterResponseMessage,
+ set),
+ GNUNET_MQ_hd_fixed_size (iter_done,
+ GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
+ struct GNUNET_MessageHeader,
+ set),
+ GNUNET_MQ_hd_fixed_size (copy_lazy,
+ GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
+ struct GNUNET_SET_CopyLazyResponseMessage,
+ set),
GNUNET_MQ_handler_end ()
};
struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
set->cfg = cfg;
- set->mq = GNUNET_CLIENT_connecT (cfg,
+ set->mq = GNUNET_CLIENT_connect (cfg,
"set",
mq_handlers,
&handle_client_set_error,
GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT);
copy_msg->cookie = *cookie;
}
- GNUNET_MQ_send (set->mq, mqm);
+ GNUNET_MQ_send (set->mq,
+ mqm);
return set;
}
GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
enum GNUNET_SET_OperationType op)
{
- return create_internal (cfg, op, NULL);
+ struct GNUNET_SET_Handle *set;
+
+ set = create_internal (cfg,
+ op,
+ NULL);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating set %p for operation %d\n",
+ set,
+ op);
+ return set;
}
struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_ElementMessage *msg;
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "adding element of type %u to set %p\n",
+ (unsigned int) element->element_type,
+ set);
if (GNUNET_YES == set->invalid)
{
if (NULL != cont)
cont (cont_cls);
return GNUNET_SYSERR;
}
- mqm = GNUNET_MQ_msg_extra (msg, element->size,
+ mqm = GNUNET_MQ_msg_extra (msg,
+ element->size,
GNUNET_MESSAGE_TYPE_SET_ADD);
msg->element_type = htons (element->element_type);
GNUNET_memcpy (&msg[1],
struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_ElementMessage *msg;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Removing element from set %p\n",
+ set);
if (GNUNET_YES == set->invalid)
{
if (NULL != cont)
/* destroying set while iterator is active is currently
not supported; we should expand the API to allow
clients to explicitly cancel the iteration! */
- GNUNET_assert (NULL == set->iterator);
- if (NULL != set->ops_head)
+ if ( (NULL != set->ops_head) ||
+ (NULL != set->iterator) ||
+ (GNUNET_SYSERR == set->destroy_requested) )
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Set operations are pending, delaying set destruction\n");
const struct GNUNET_HashCode *app_id,
const struct GNUNET_MessageHeader *context_msg,
enum GNUNET_SET_ResultMode result_mode,
+ struct GNUNET_SET_Option options[],
GNUNET_SET_ResultIterator result_cb,
void *result_cls)
{
struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_OperationHandle *oh;
struct GNUNET_SET_EvaluateMessage *msg;
+ struct GNUNET_SET_Option *opt;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client prepares set operation (%d)\n",
+ result_mode);
oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
oh->result_cb = result_cb;
oh->result_cls = result_cls;
msg->app_id = *app_id;
msg->result_mode = htonl (result_mode);
msg->target_peer = *other_peer;
+ for (opt = options; opt->type != 0; opt++)
+ {
+ switch (opt->type)
+ {
+ case GNUNET_SET_OPTION_BYZANTINE:
+ msg->byzantine = GNUNET_YES;
+ msg->byzantine_lower_bound = opt->v.num;
+ break;
+ case GNUNET_SET_OPTION_FORCE_FULL:
+ msg->force_full = GNUNET_YES;
+ break;
+ case GNUNET_SET_OPTION_FORCE_DELTA:
+ msg->force_delta = GNUNET_YES;
+ break;
+ default:
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Option with type %d not recognized\n", (int) opt->type);
+ }
+ }
oh->conclude_mqm = mqm;
oh->request_id_addr = &msg->request_id;
struct GNUNET_SET_RejectMessage *rmsg;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Processing incoming operation request\n");
+ "Processing incoming operation request with id %u\n",
+ ntohl (msg->accept_id));
/* we got another valid request => reset the backoff */
lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
req.accept_id = ntohl (msg->accept_id);
if (GNUNET_YES == req.accepted)
return; /* the accept-case is handled in #GNUNET_SET_accept() */
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Rejecting request\n");
+ "Rejected request %u\n",
+ ntohl (msg->accept_id));
mqm = GNUNET_MQ_msg (rmsg,
GNUNET_MESSAGE_TYPE_SET_REJECT);
rmsg->accept_reject_id = msg->accept_id;
static void
listen_connect (void *cls)
{
- GNUNET_MQ_hd_var_size (request,
- GNUNET_MESSAGE_TYPE_SET_REQUEST,
- struct GNUNET_SET_RequestMessage);
struct GNUNET_SET_ListenHandle *lh = cls;
struct GNUNET_MQ_MessageHandler mq_handlers[] = {
- make_request_handler (lh),
+ GNUNET_MQ_hd_var_size (request,
+ GNUNET_MESSAGE_TYPE_SET_REQUEST,
+ struct GNUNET_SET_RequestMessage,
+ lh),
GNUNET_MQ_handler_end ()
};
struct GNUNET_MQ_Envelope *mqm;
lh->reconnect_task = NULL;
GNUNET_assert (NULL == lh->mq);
- lh->mq = GNUNET_CLIENT_connecT (lh->cfg,
+ lh->mq = GNUNET_CLIENT_connect (lh->cfg,
"set",
mq_handlers,
&handle_client_listener_error,
{
struct GNUNET_SET_ListenHandle *lh;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting listener for app %s\n",
+ GNUNET_h2s (app_id));
lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
lh->listen_cb = listen_cb;
lh->listen_cls = listen_cls;
GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Canceling listener\n");
+ "Canceling listener %s\n",
+ GNUNET_h2s (&lh->app_id));
if (NULL != lh->mq)
{
GNUNET_MQ_destroy (lh->mq);
struct GNUNET_SET_OperationHandle *
GNUNET_SET_accept (struct GNUNET_SET_Request *request,
enum GNUNET_SET_ResultMode result_mode,
+ struct GNUNET_SET_Option options[],
GNUNET_SET_ResultIterator result_cb,
void *result_cls)
{
struct GNUNET_SET_AcceptMessage *msg;
GNUNET_assert (GNUNET_NO == request->accepted);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client accepts set operation (%d) with id %u\n",
+ result_mode,
+ request->accept_id);
request->accepted = GNUNET_YES;
- mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
+ mqm = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_SET_ACCEPT);
msg->accept_reject_id = htonl (request->accept_id);
msg->result_mode = htonl (result_mode);
oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
}
if (GNUNET_YES == set->invalid)
return GNUNET_SYSERR;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client commits to SET\n");
GNUNET_assert (NULL != oh->conclude_mqm);
oh->set = set;
GNUNET_CONTAINER_DLL_insert (set->ops_head,
set->ops_tail,
oh);
- oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
+ oh->request_id = GNUNET_MQ_assoc_add (set->mq,
+ oh);
*oh->request_id_addr = htonl (oh->request_id);
- GNUNET_MQ_send (set->mq, oh->conclude_mqm);
+ GNUNET_MQ_send (set->mq,
+ oh->conclude_mqm);
oh->conclude_mqm = NULL;
oh->request_id_addr = NULL;
return GNUNET_OK;
struct GNUNET_MQ_Envelope *ev;
struct SetCopyRequest *req;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating lazy copy of set\n");
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE);
GNUNET_MQ_send (set->mq, ev);