/*
This file is part of GNUnet.
- Copyright (C) 2012-2014 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2012-2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_protocols.h"
-#include "gnunet_client_lib.h"
#include "gnunet_set_service.h"
#include "set.h"
*/
struct GNUNET_SET_Handle
{
- /**
- * Client connected to the set service.
- */
- struct GNUNET_CLIENT_Connection *client;
-
/**
* Message queue for @e client.
*/
*/
struct GNUNET_SET_ListenHandle
{
- /**
- * Connection to the service.
- */
- struct GNUNET_CLIENT_Connection *client;
/**
* Message queue for the client.
/**
* Task for reconnecting when the listener fails.
*/
- struct GNUNET_SCHEDULER_Task * reconnect_task;
+ struct GNUNET_SCHEDULER_Task *reconnect_task;
/**
* Operation we listen for.
static struct GNUNET_SET_Handle *
create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
enum GNUNET_SET_OperationType op,
- uint32_t *cookie);
+ const uint32_t *cookie);
/**
* iterator and sends an acknowledgement to the service.
*
* @param cls the `struct GNUNET_SET_Handle *`
- * @param mh the message
+ * @param msg the message
*/
static void
handle_copy_lazy (void *cls,
- const struct GNUNET_MessageHeader *mh)
+ const struct GNUNET_SET_CopyLazyResponseMessage *msg)
{
- struct GNUNET_SET_CopyLazyResponseMessage *msg;
struct GNUNET_SET_Handle *set = cls;
struct SetCopyRequest *req;
struct GNUNET_SET_Handle *new_set;
- msg = (struct GNUNET_SET_CopyLazyResponseMessage *) mh;
-
req = set->copy_req_head;
-
if (NULL == req)
{
/* Service sent us unsolicited lazy copy response */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Handling response to lazy copy\n");
-
GNUNET_CONTAINER_DLL_remove (set->copy_req_head,
set->copy_req_tail,
req);
-
-
// We pass none as operation here, since it doesn't matter when
// cloning.
- new_set = create_internal (set->cfg, GNUNET_SET_OPERATION_NONE, &msg->cookie);
-
+ new_set = create_internal (set->cfg,
+ GNUNET_SET_OPERATION_NONE,
+ &msg->cookie);
req->cb (req->cls, new_set);
-
GNUNET_free (req);
}
+/**
+ * Check that the given @a msg is well-formed.
+ *
+ * @param cls closure
+ * @param msg message to check
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_iter_element (void *cls,
+ const struct GNUNET_SET_IterResponseMessage *msg)
+{
+ /* minimum size was already checked, everything else is OK! */
+ return GNUNET_OK;
+}
+
+
/**
* Handle element for iteration over the set. Notifies the
* iterator and sends an acknowledgement to the service.
* @param cls the `struct GNUNET_SET_Handle *`
* @param mh the message
*/
-static void
-handle_iter_element (void *cls,
- const struct GNUNET_MessageHeader *mh)
+ static void
+ handle_iter_element (void *cls,
+ const struct GNUNET_SET_IterResponseMessage *msg)
{
struct GNUNET_SET_Handle *set = cls;
GNUNET_SET_ElementIterator iter = set->iterator;
struct GNUNET_SET_Element element;
- const struct GNUNET_SET_IterResponseMessage *msg;
struct GNUNET_SET_IterAckMessage *ack_msg;
struct GNUNET_MQ_Envelope *ev;
uint16_t msize;
- msize = ntohs (mh->size);
- if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage)))
- {
- /* message malformed */
- GNUNET_break (0);
- set->iterator = NULL;
- set->iteration_id++;
- iter (set->iterator_cls,
- NULL);
- iter = NULL;
- }
- msg = (const struct GNUNET_SET_IterResponseMessage *) mh;
+ msize = ntohs (msg->header.size);
if (set->iteration_id != ntohs (msg->iteration_id))
{
/* element from a previous iteration, skip! */
if (NULL != iter)
{
element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
- element.element_type = htons (msg->element_type);
+ element.element_type = ntohs (msg->element_type);
element.data = &msg[1];
iter (set->iterator_cls,
&element);
}
+/**
+ * Check that the given @a msg is well-formed.
+ *
+ * @param cls closure
+ * @param msg message to check
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_result (void *cls,
+ const struct GNUNET_SET_ResultMessage *msg)
+{
+ /* minimum size was already checked, everything else is OK! */
+ return GNUNET_OK;
+}
+
+
/**
* Handle result message for a set operation.
*
*/
static void
handle_result (void *cls,
- const struct GNUNET_MessageHeader *mh)
+ const struct GNUNET_SET_ResultMessage *msg)
{
struct GNUNET_SET_Handle *set = cls;
- const struct GNUNET_SET_ResultMessage *msg;
struct GNUNET_SET_OperationHandle *oh;
struct GNUNET_SET_Element e;
enum GNUNET_SET_Status result_status;
- msg = (const struct GNUNET_SET_ResultMessage *) mh;
GNUNET_assert (NULL != set->mq);
result_status = ntohs (msg->result_status);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Ignoring result from canceled operation\n");
return;
}
- if (GNUNET_SET_STATUS_OK != result_status)
+
+ switch (result_status)
{
- /* status is not #GNUNET_SET_STATUS_OK => there's no attached element,
- * and this is the last result message we get */
- GNUNET_MQ_assoc_remove (set->mq,
- ntohl (msg->request_id));
- GNUNET_CONTAINER_DLL_remove (set->ops_head,
- set->ops_tail,
- oh);
- if ( (GNUNET_YES == set->destroy_requested) &&
- (NULL == set->ops_head) )
- GNUNET_SET_destroy (set);
- if (NULL != oh->result_cb)
- oh->result_cb (oh->result_cls,
- NULL,
- result_status);
- switch (result_status)
- {
case GNUNET_SET_STATUS_OK:
- break;
+ case GNUNET_SET_STATUS_ADD_LOCAL:
+ case GNUNET_SET_STATUS_ADD_REMOTE:
+ goto do_element;
case GNUNET_SET_STATUS_FAILURE:
- oh->result_cb = NULL;
- break;
- case GNUNET_SET_STATUS_HALF_DONE:
- break;
case GNUNET_SET_STATUS_DONE:
- oh->result_cb = NULL;
- break;
- }
- GNUNET_free (oh);
- return;
+ goto do_final;
+ case GNUNET_SET_STATUS_HALF_DONE:
+ /* not used anymore */
+ GNUNET_assert (0);
}
+
+do_final:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Treating result as final status\n");
+ GNUNET_MQ_assoc_remove (set->mq,
+ ntohl (msg->request_id));
+ GNUNET_CONTAINER_DLL_remove (set->ops_head,
+ set->ops_tail,
+ oh);
+ if (NULL != oh->result_cb)
+ {
+ oh->result_cb (oh->result_cls,
+ NULL,
+ result_status);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "No callback for final status\n");
+ }
+ if ( (GNUNET_YES == set->destroy_requested) &&
+ (NULL == set->ops_head) )
+ GNUNET_SET_destroy (set);
+ GNUNET_free (oh);
+ return;
+
+do_element:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Treating result as element\n");
e.data = &msg[1];
- e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
- e.element_type = msg->element_type;
+ e.size = ntohs (msg->header.size) - sizeof (struct GNUNET_SET_ResultMessage);
+ e.element_type = ntohs (msg->element_type);
if (NULL != oh->result_cb)
oh->result_cb (oh->result_cls,
&e,
enum GNUNET_MQ_Error error)
{
struct GNUNET_SET_Handle *set = cls;
+ GNUNET_SET_ElementIterator iter = set->iterator;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Handling client set error %d\n",
GNUNET_SET_STATUS_FAILURE);
set_operation_destroy (set->ops_head);
}
+ set->iterator = NULL;
+ set->iteration_id++;
+ if (NULL != iter)
+ iter (set->iterator_cls,
+ NULL);
set->invalid = GNUNET_YES;
if (GNUNET_YES == set->destroy_requested)
{
static struct GNUNET_SET_Handle *
create_internal (const struct GNUNET_CONFIGURATION_Handle *cfg,
enum GNUNET_SET_OperationType op,
- uint32_t *cookie)
+ const uint32_t *cookie)
{
- static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
- { &handle_result,
- GNUNET_MESSAGE_TYPE_SET_RESULT,
- 0 },
- { &handle_iter_element,
- GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
- 0 },
- { &handle_iter_done,
- GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
- sizeof (struct GNUNET_MessageHeader) },
- { &handle_copy_lazy,
- GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
- sizeof (struct GNUNET_SET_CopyLazyResponseMessage) },
- GNUNET_MQ_HANDLERS_END
+ GNUNET_MQ_hd_var_size (result,
+ GNUNET_MESSAGE_TYPE_SET_RESULT,
+ struct GNUNET_SET_ResultMessage);
+ GNUNET_MQ_hd_var_size (iter_element,
+ GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT,
+ struct GNUNET_SET_IterResponseMessage);
+ GNUNET_MQ_hd_fixed_size (iter_done,
+ GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
+ struct GNUNET_MessageHeader);
+ GNUNET_MQ_hd_fixed_size (copy_lazy,
+ GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE,
+ struct GNUNET_SET_CopyLazyResponseMessage);
+ struct GNUNET_SET_Handle *set = GNUNET_new (struct GNUNET_SET_Handle);
+ struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+ make_result_handler (set),
+ make_iter_element_handler (set),
+ make_iter_done_handler (set),
+ make_copy_lazy_handler (set),
+ GNUNET_MQ_handler_end ()
};
- struct GNUNET_SET_Handle *set;
struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_CreateMessage *create_msg;
struct GNUNET_SET_CopyLazyConnectMessage *copy_msg;
- set = GNUNET_new (struct GNUNET_SET_Handle);
- set->client = GNUNET_CLIENT_connect ("set", cfg);
set->cfg = cfg;
- if (NULL == set->client)
+ set->mq = GNUNET_CLIENT_connecT (cfg,
+ "set",
+ mq_handlers,
+ &handle_client_set_error,
+ set);
+ if (NULL == set->mq)
{
GNUNET_free (set);
return NULL;
}
- set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
- mq_handlers,
- &handle_client_set_error,
- set);
- GNUNET_assert (NULL != set->mq);
-
if (NULL == cookie)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
}
mqm = GNUNET_MQ_msg_extra (msg, element->size,
GNUNET_MESSAGE_TYPE_SET_ADD);
- msg->element_type = element->element_type;
+ msg->element_type = htons (element->element_type);
memcpy (&msg[1],
element->data,
element->size);
mqm = GNUNET_MQ_msg_extra (msg,
element->size,
GNUNET_MESSAGE_TYPE_SET_REMOVE);
- msg->element_type = element->element_type;
+ msg->element_type = htons (element->element_type);
memcpy (&msg[1],
element->data,
element->size);
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Really destroying set\n");
- if (NULL != set->client)
- {
- GNUNET_CLIENT_disconnect (set->client);
- set->client = NULL;
- }
if (NULL != set->mq)
{
GNUNET_MQ_destroy (set->mq);
* Connect to the set service in order to listen for requests.
*
* @param cls the `struct GNUNET_SET_ListenHandle *` to connect
- * @param tc task context if invoked as a task, NULL otherwise
*/
static void
-listen_connect (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc);
+listen_connect (void *cls);
+
+
+/**
+ * Check validity of request message for a listen operation
+ *
+ * @param cls the listen handle
+ * @param msg the message
+ * @return #GNUNET_OK if the message is well-formed
+ */
+static int
+check_request (void *cls,
+ const struct GNUNET_SET_RequestMessage *msg)
+{
+ const struct GNUNET_MessageHeader *context_msg;
+
+ context_msg = GNUNET_MQ_extract_nested_mh (msg);
+ if (NULL == context_msg)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
/**
* Handle request message for a listen operation
*
* @param cls the listen handle
- * @param mh the message
+ * @param msg the message
*/
static void
handle_request (void *cls,
- const struct GNUNET_MessageHeader *mh)
+ const struct GNUNET_SET_RequestMessage *msg)
{
struct GNUNET_SET_ListenHandle *lh = cls;
- const struct GNUNET_SET_RequestMessage *msg;
struct GNUNET_SET_Request req;
const struct GNUNET_MessageHeader *context_msg;
- uint16_t msize;
struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_RejectMessage *rmsg;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Processing incoming operation request\n");
- msize = ntohs (mh->size);
- if (msize < sizeof (struct GNUNET_SET_RequestMessage))
- {
- GNUNET_break (0);
- GNUNET_CLIENT_disconnect (lh->client);
- lh->client = NULL;
- GNUNET_MQ_destroy (lh->mq);
- lh->mq = NULL;
- lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
- &listen_connect, lh);
- lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
- return;
- }
/* we got another valid request => reset the backoff */
lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
- msg = (const struct GNUNET_SET_RequestMessage *) mh;
req.accept_id = ntohl (msg->accept_id);
req.accepted = GNUNET_NO;
context_msg = GNUNET_MQ_extract_nested_mh (msg);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Listener broke down (%d), re-connecting\n",
(int) error);
- GNUNET_CLIENT_disconnect (lh->client);
- lh->client = NULL;
GNUNET_MQ_destroy (lh->mq);
lh->mq = NULL;
lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
- &listen_connect, lh);
+ &listen_connect,
+ lh);
lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
}
* Connect to the set service in order to listen for requests.
*
* @param cls the `struct GNUNET_SET_ListenHandle *` to connect
- * @param tc task context if invoked as a task, NULL otherwise
*/
static void
-listen_connect (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+listen_connect (void *cls)
{
- static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
- { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST },
- GNUNET_MQ_HANDLERS_END
- };
+ GNUNET_MQ_hd_var_size (request,
+ GNUNET_MESSAGE_TYPE_SET_REQUEST,
+ struct GNUNET_SET_RequestMessage);
struct GNUNET_SET_ListenHandle *lh = cls;
+ struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+ make_request_handler (lh),
+ GNUNET_MQ_handler_end ()
+ };
struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_ListenMessage *msg;
- if ( (NULL != tc) &&
- (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) )
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Listener not reconnecting due to shutdown\n");
- return;
- }
lh->reconnect_task = NULL;
- GNUNET_assert (NULL == lh->client);
- lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
- if (NULL == lh->client)
- return;
GNUNET_assert (NULL == lh->mq);
- lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client,
- mq_handlers,
- &handle_client_listener_error, lh);
+ lh->mq = GNUNET_CLIENT_connecT (lh->cfg,
+ "set",
+ mq_handlers,
+ &handle_client_listener_error,
+ lh);
+ if (NULL == lh->mq)
+ return;
mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
msg->operation = htonl (lh->operation);
msg->app_id = lh->app_id;
- GNUNET_MQ_send (lh->mq, mqm);
+ GNUNET_MQ_send (lh->mq,
+ mqm);
}
lh->operation = operation;
lh->app_id = *app_id;
lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
- listen_connect (lh, NULL);
- if (NULL == lh->client)
+ listen_connect (lh);
+ if (NULL == lh->mq)
{
GNUNET_free (lh);
return NULL;
GNUNET_MQ_destroy (lh->mq);
lh->mq = NULL;
}
- if (NULL != lh->client)
- {
- GNUNET_CLIENT_disconnect (lh->client);
- lh->client = NULL;
- }
if (NULL != lh->reconnect_task)
{
GNUNET_SCHEDULER_cancel (lh->reconnect_task);
GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
struct GNUNET_SET_Handle *set)
{
- GNUNET_assert (NULL == oh->set);
+ if (NULL != oh->set)
+ {
+ /* Some other set was already commited for this
+ * operation, there is a logic bug in the client of this API */
+ GNUNET_break (0);
+ return GNUNET_OK;
+ }
if (GNUNET_YES == set->invalid)
return GNUNET_SYSERR;
GNUNET_assert (NULL != oh->conclude_mqm);
}
+/**
+ * Create a copy of an element. The copy
+ * must be GNUNET_free-d by the caller.
+ *
+ * @param element the element to copy
+ * @return the copied element
+ */
+struct GNUNET_SET_Element *
+GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element)
+{
+ struct GNUNET_SET_Element *copy;
+
+ copy = GNUNET_malloc (element->size + sizeof (struct GNUNET_SET_Element));
+ copy->size = element->size;
+ copy->element_type = element->element_type;
+ copy->data = ©[1];
+ memcpy ((void *) copy->data, element->data, copy->size);
+
+ return copy;
+}
+
+
+/**
+ * Hash a set element.
+ *
+ * @param element the element that should be hashed
+ * @param[out] ret_hash a pointer to where the hash of @a element
+ * should be stored
+ */
+void
+GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element,
+ struct GNUNET_HashCode *ret_hash)
+{
+ struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
+
+ /* It's not guaranteed that the element data is always after the element header,
+ so we need to hash the chunks separately. */
+ GNUNET_CRYPTO_hash_context_read (ctx, &element->size, sizeof (uint16_t));
+ GNUNET_CRYPTO_hash_context_read (ctx, &element->element_type, sizeof (uint16_t));
+ GNUNET_CRYPTO_hash_context_read (ctx, element->data, element->size);
+ GNUNET_CRYPTO_hash_context_finish (ctx, ret_hash);
+}
+
/* end of set_api.c */