#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
-/**
- * Actions that can be queued.
- */
-struct QueuedMessage
-{
- /**
- * Queued messages are stored in a doubly linked list.
- */
- struct QueuedMessage *next;
-
- /**
- * Queued messages are stored in a doubly linked list.
- */
- struct QueuedMessage *prev;
-
- /**
- * The actual queued message.
- */
- struct GNUNET_MessageHeader *msg;
-
- /**
- * Will be called after transmit, if not NULL
- */
- GNUNET_CONSENSUS_InsertDoneCallback idc;
-
- /**
- * Closure for idc
- */
- void *idc_cls;
-};
-
/**
* Handle for the service.
*/
struct GNUNET_PeerIdentity **peers;
- /**
- * Currently active transmit request.
- */
- struct GNUNET_CLIENT_TransmitHandle *th;
-
/**
* GNUNES_YES iff the join message has been sent to the service.
*/
int joined;
- /**
- * Closure for the insert done callback.
- */
- void *idc_cls;
-
/**
* Called when the conclude operation finishes or fails.
*/
*/
struct GNUNET_TIME_Absolute conclude_deadline;
- unsigned int conclude_min_size;
-
- struct QueuedMessage *messages_head;
-
- struct QueuedMessage *messages_tail;
-
+ /**
+ * Message queue for the client.
+ */
+ struct GNUNET_MQ_Handle *mq;
};
-
-
/**
- * Schedule transmitting the next message.
- *
- * @param consensus consensus handle
+ * FIXME: this should not bee necessary when the API
+ * issue has been fixed
*/
-static void
-send_next (struct GNUNET_CONSENSUS_Handle *consensus);
-
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_queued (void *cls, size_t size,
- void *buf)
+struct InsertDoneInfo
{
- struct GNUNET_CONSENSUS_Handle *consensus;
- struct QueuedMessage *qmsg;
- size_t msg_size;
-
- consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
- consensus->th = NULL;
-
- qmsg = consensus->messages_head;
- GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
-
- if (NULL == buf)
- {
- if (NULL != qmsg->idc)
- {
- qmsg->idc (qmsg->idc_cls, GNUNET_YES);
- }
- return 0;
- }
-
- msg_size = ntohs (qmsg->msg->size);
-
- GNUNET_assert (size >= msg_size);
-
- memcpy (buf, qmsg->msg, msg_size);
- if (NULL != qmsg->idc)
- {
- qmsg->idc (qmsg->idc_cls, GNUNET_YES);
- }
- GNUNET_free (qmsg->msg);
- GNUNET_free (qmsg);
- /* FIXME: free the messages */
-
- send_next (consensus);
-
- return msg_size;
-}
-
-
-/**
- * Schedule transmitting the next message.
- *
- * @param consensus consensus handle
- */
-static void
-send_next (struct GNUNET_CONSENSUS_Handle *consensus)
-{
- if (NULL != consensus->th)
- return;
-
- if (NULL != consensus->messages_head)
- {
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO, &transmit_queued, consensus);
- }
-}
+ GNUNET_CONSENSUS_InsertDoneCallback idc;
+ void *cls;
+};
/**
* Called when the server has sent is a new element
*
- * @param consensus consensus handle
- * @param msg element message
+ * @param cls consensus handle
+ * @param mh element message
*/
static void
-handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
- struct GNUNET_CONSENSUS_ElementMessage *msg)
+handle_new_element (void *cls,
+ const struct GNUNET_MessageHeader *mh)
{
+ struct GNUNET_CONSENSUS_Handle *consensus = cls;
+ const struct GNUNET_CONSENSUS_ElementMessage *msg
+ = (const struct GNUNET_CONSENSUS_ElementMessage *) mh;
struct GNUNET_SET_Element element;
LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
element.data = &msg[1];
consensus->new_element_cb (consensus->new_element_cls, &element);
-
- send_next (consensus);
}
* Called when the server has announced
* that the conclusion is over.
*
- * @param consensus consensus handle
- * @param msg conclude done message
+ * @param cls consensus handle
+ * @param mh conclude done message
*/
static void
-handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
+handle_conclude_done (void *cls,
const struct GNUNET_MessageHeader *msg)
{
+ struct GNUNET_CONSENSUS_Handle *consensus = cls;
+
GNUNET_CONSENSUS_ConcludeCallback cc;
GNUNET_assert (NULL != (cc = consensus->conclude_cb));
}
-/**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
- */
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
-{
- struct GNUNET_CONSENSUS_Handle *consensus = cls;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
-
- if (NULL == msg)
- {
- /* Error, timeout, death */
- LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
- GNUNET_CLIENT_disconnect (consensus->client);
- consensus->client = NULL;
- consensus->new_element_cb (consensus->new_element_cls, NULL);
- return;
- }
- GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
- GNUNET_TIME_UNIT_FOREVER_REL);
- switch (ntohs (msg->type))
- {
- case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
- handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
- break;
- case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
- handle_conclude_done (consensus, msg);
- break;
- default:
- GNUNET_break (0);
- }
-}
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_join (void *cls, size_t size, void *buf)
-{
- struct GNUNET_CONSENSUS_JoinMessage *msg;
- struct GNUNET_CONSENSUS_Handle *consensus;
- int msize;
-
- GNUNET_assert (NULL != buf);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
-
- consensus = cls;
- consensus->th = NULL;
- consensus->joined = 1;
-
- msg = buf;
-
- msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
- consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
-
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
- msg->header.size = htons (msize);
- msg->session_id = consensus->session_id;
- msg->num_peers = htonl (consensus->num_peers);
- memcpy(&msg[1],
- consensus->peers,
- consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
- send_next (consensus);
- GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
- GNUNET_TIME_UNIT_FOREVER_REL);
-
- return msize;
-}
-
/**
* Create a consensus session.
*
void *new_element_cls)
{
struct GNUNET_CONSENSUS_Handle *consensus;
- size_t join_message_size;
+ struct GNUNET_CONSENSUS_JoinMessage *join_msg;
+ struct GNUNET_MQ_Envelope *ev;
+ const static struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+ {handle_new_element,
+ GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0},
+ {handle_conclude_done,
+ GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0},
+ GNUNET_MQ_HANDLERS_END
+ };
consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
consensus->cfg = cfg;
GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
+ consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client,
+ mq_handlers, consensus);
GNUNET_assert (consensus->client != NULL);
- join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
- (num_peers * sizeof (struct GNUNET_PeerIdentity));
-
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- join_message_size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO, &transmit_join, consensus);
+ ev = GNUNET_MQ_msg_extra (join_msg,
+ (num_peers * sizeof (struct GNUNET_PeerIdentity)),
+ GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
+ join_msg->session_id = consensus->session_id;
+ join_msg->num_peers = htonl (consensus->num_peers);
+ memcpy(&join_msg[1],
+ consensus->peers,
+ consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
- GNUNET_assert (consensus->th != NULL);
+ GNUNET_MQ_send (consensus->mq, ev);
return consensus;
}
+static void
+idc_adapter (void *cls)
+{
+ struct InsertDoneInfo *i = cls;
+ i->idc (i->cls, GNUNET_OK);
+ GNUNET_free (i);
+}
/**
* Insert an element in the set being reconsiled. Must not be called after
GNUNET_CONSENSUS_InsertDoneCallback idc,
void *idc_cls)
{
- struct QueuedMessage *qmsg;
struct GNUNET_CONSENSUS_ElementMessage *element_msg;
- size_t element_msg_size;
+ struct GNUNET_MQ_Envelope *ev;
+ struct InsertDoneInfo *i;
LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
- element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
- element->size);
+ ev = GNUNET_MQ_msg_extra (element_msg, element->size,
+ GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
- element_msg = GNUNET_malloc (element_msg_size);
- element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
- element_msg->header.size = htons (element_msg_size);
memcpy (&element_msg[1], element->data, element->size);
-
- qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
- qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
- qmsg->idc = idc;
- qmsg->idc_cls = idc_cls;
-
- GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
-
- send_next (consensus);
+
+ if (NULL != idc)
+ {
+ i = GNUNET_new (struct InsertDoneInfo);
+ i->idc = idc;
+ i->cls = idc_cls;
+ GNUNET_MQ_notify_sent (ev, idc_adapter, i);
+ }
}
GNUNET_CONSENSUS_ConcludeCallback conclude,
void *conclude_cls)
{
- struct QueuedMessage *qmsg;
+ struct GNUNET_MQ_Envelope *ev;
struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
GNUNET_assert (NULL != conclude);
consensus->conclude_cls = conclude_cls;
consensus->conclude_cb = conclude;
- conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
- conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
- conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
+ ev = GNUNET_MQ_msg (conclude_msg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
- qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
- qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
-
- GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
-
- send_next (consensus);
+ GNUNET_MQ_send (consensus->mq, ev);
}
/**
* Queued messages to the client.
*/
- struct GNUNET_MQ_MessageQueue *client_mq;
+ struct GNUNET_MQ_Handle *client_mq;
/**
* Timeout for all rounds together, single rounds will schedule a timeout task
struct GNUNET_SET_OperationHandle *set_op;
/**
- * Has conclude been called on the set_op?
+ * Has commit been called on the set_op?
*/
- int set_op_concluded;
+ int set_op_commited;
};
GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
}
session->partner_outgoing->set_op =
- GNUNET_SET_evaluate (&session->partner_outgoing->peer_id,
- &session->global_id,
- (struct GNUNET_MessageHeader *) msg,
- 0, /* FIXME */
- GNUNET_SET_RESULT_ADDED,
- set_result_cb, session->partner_outgoing);
- GNUNET_SET_conclude (session->partner_outgoing->set_op, session->element_set);
- session->partner_outgoing->set_op_concluded = GNUNET_YES;
+ GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
+ &session->global_id,
+ (struct GNUNET_MessageHeader *) msg,
+ 0, /* FIXME */
+ GNUNET_SET_RESULT_ADDED,
+ set_result_cb, session->partner_outgoing);
+ GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set);
+ session->partner_outgoing->set_op_commited = GNUNET_YES;
}
#ifdef GNUNET_EXTRA_LOGGING
set_result_cb, &session->info[index]);
if (ntohl (msg->exp_subround) == session->exp_subround)
{
- cpi->set_op_concluded = GNUNET_YES;
- GNUNET_SET_conclude (cpi->set_op, session->element_set);
+ cpi->set_op_commited = GNUNET_YES;
+ GNUNET_SET_commit (cpi->set_op, session->element_set);
}
else
{
- cpi->set_op_concluded = GNUNET_NO;
+ cpi->set_op_commited = GNUNET_NO;
}
break;
default:
if (DEFAULT_FISHEYE_DEPTH - 1 == neighbor->consensus_insertion_distance)
{
/* we have added all elements to the set, run the operation */
- GNUNET_SET_conclude (neighbor->set_op,
+ GNUNET_SET_commit (neighbor->set_op,
neighbor->my_set);
GNUNET_SET_destroy (neighbor->my_set);
neighbor->my_set = NULL;
neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK;
neighbor->my_set = GNUNET_SET_create (cfg,
GNUNET_SET_OPERATION_UNION);
- neighbor->set_op = GNUNET_SET_evaluate (&neighbor->peer,
- &neighbor->real_session_id,
- NULL,
- 0 /* FIXME: salt */,
- GNUNET_SET_RESULT_ADDED,
- &handle_set_union_result,
- neighbor);
+ neighbor->set_op = GNUNET_SET_prepare (&neighbor->peer,
+ &neighbor->real_session_id,
+ NULL,
+ 0 /* FIXME: salt */,
+ GNUNET_SET_RESULT_ADDED,
+ &handle_set_union_result,
+ neighbor);
build_set (neighbor);
}
* GNUNET_NO if not.
*/
typedef int (*GNUNET_CONTAINER_HashMapIterator) (void *cls,
- const struct GNUNET_HashCode * key,
+ const struct GNUNET_HashCode *key,
void *value);
* the tunnel.
* @param handlers Callbacks for messages we care about, NULL-terminated. Each
* one must call GNUNET_MESH_receive_done on the tunnel to
- * receive the next message.
+ * receive the next message. Messages of a type that is not
+ * in the handlers array are ignored if received.
* @param ports NULL or 0-terminated array of port numbers for incoming tunnels.
*
* @return handle to the mesh service NULL on error
/**
* Request information about the running mesh peer.
* The callback will be called for every tunnel known to the service,
- * listing all active peers that blong to the tunnel.
+ * listing all active peers that belong to the tunnel.
*
* If called again on the same handle, it will overwrite the previous
* callback and cls. To retrieve the cls, monitor_cancel must be
GNUNET_MESH_get_tunnels_cancel (struct GNUNET_MESH_Handle *h);
+/**
+ * Create a message queue for a mesh tunnel.
+ * The message queue can only be used to transmit messages,
+ * not to receive them.
+ *
+ * @param tunnel the tunnel to create the message qeue for
+ * @return a message queue to messages over the tunnel
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel);
+
+
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
/**
* @author Florian Dold
* @file set/mq.h
- * @brief general purpose request queue
+ * @brief general purpose message queue
*/
#ifndef GNUNET_MQ_H
#define GNUNET_MQ_H
/**
- * Allocate a GNUNET_MQ_Message, with extra space allocated after the space needed
+ * Allocate an envelope, with extra space allocated after the space needed
* by the message struct.
* The allocated message will already have the type and size field set.
*
#define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), (esize) + sizeof *(mvar), (type))
/**
- * Allocate a GNUNET_MQ_Message.
- * The allocated message will already have the type and size field set.
+ * Allocate a GNUNET_MQ_Envelope.
+ * The contained message will already have the type and size field set.
*
* @param mvar variable to store the allocated message in;
* must have a header field
* @param type type of the message
- * @return the MQ message
+ * @return the allocated envelope
*/
#define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type)
/**
- * Allocate a GNUNET_MQ_Message, where the message only consists of a header.
+ * Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
* The allocated message will already have the type and size field set.
*
* @param type type of the message
/**
- * Allocate a GNUNET_MQ_Message, where the message only consists of a header and extra space.
+ * Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space.
* The allocated message will already have the type and size field set.
*
* @param mh pointer that will changed to point at to the allocated message header
/**
- * Allocate a GNUNET_MQ_Message, and append a payload message after the given
+ * Allocate a GNUNET_MQ_Envelope, and append a payload message after the given
* message struct.
*
* @param mvar pointer to a message struct, will be changed to point at the newly allocated message,
* whose size is 'sizeof(*mvar) + ntohs (mh->size)'
* @param type message type of the allocated message, has no effect on the nested message
* @param mh message to nest
- * @return a newly allocated 'struct GNUNET_MQ_Message *'
+ * @return a newly allocated 'struct GNUNET_MQ_Envelope *'
*/
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh) GNUNET_MQ_msg_nested_mh_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), sizeof (*(mvar)), (type), mh)
#define GNUNET_MQ_extract_nested_mh(var) GNUNET_MQ_extract_nested_mh_ ((struct GNUNET_MessageHeader *) (var), sizeof (*(var)))
+/**
+ * Implementation of the GNUNET_MQ_extract_nexted_mh macro.
+ *
+ * @param mh message header to extract nested message header from
+ * @param base_size size of the message before the nested message's header appears
+ * @return pointer to the nested message, does not copy the message
+ */
struct GNUNET_MessageHeader *
GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size);
-struct GNUNET_MQ_Message *
+/**
+ * Implementation of the GNUNET_MQ_msg_nested_mh macro.
+ *
+ * @param mhp pointer to the message header pointer that will be changed to allocate at
+ * the newly allocated space for the message.
+ */
+struct GNUNET_MQ_Envelope *
GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
const struct GNUNET_MessageHeader *nested_mh);
#define GNUNET_MQ_HANDLERS_END {NULL, 0, 0}
-struct GNUNET_MQ_MessageQueue;
+/**
+ * Opaque handle to a message queue.
+ */
+struct GNUNET_MQ_Handle;
-struct GNUNET_MQ_Message;
+/**
+ * Opaque handle to an envelope.
+ */
+struct GNUNET_MQ_Envelope;
enum GNUNET_MQ_Error
{
* @param msg the received message
*/
typedef void
-(*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg);
+(*GNUNET_MQ_MessageCallback) (void *cls,
+ const struct GNUNET_MessageHeader *msg);
/**
* Signature of functions implementing the
- * sending part of a message queue
+ * sending functionality of a message queue.
*
- * @param q the message queue
- * @param m the message
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+typedef void
+(*GNUNET_MQ_SendImpl) (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg,
+ void *impl_state);
+
+
+/**
+ * Signature of functions implementing the
+ * destruction of a message queue.
+ * Implementations must not free 'mq', but should
+ * take care of 'impl_state'.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
*/
typedef void
-(*GNUNET_MQ_SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m);
+(*GNUNET_MQ_DestroyImpl) (struct GNUNET_MQ_Handle *mq, void *impl_state);
+/**
+ * Implementation function that cancels the currently sent message.
+ *
+ * @param mq message queue
+ * @param impl_state state specific to the implementation
+ */
typedef void
-(*GNUNET_MQ_DestroyImpl) (struct GNUNET_MQ_MessageQueue *q);
+(*GNUNET_MQ_CancelImpl) (struct GNUNET_MQ_Handle *mq, void *impl_state);
/**
(*GNUNET_MQ_NotifyCallback) (void *cls);
-typedef void
-(*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error);
-
-
-struct GNUNET_MQ_Message
-{
- /**
- * Messages are stored in a linked list
- */
- struct GNUNET_MQ_Message *next;
-
- /**
- * Messages are stored in a linked list
- */
- struct GNUNET_MQ_Message *prev;
-
- /**
- * Actual allocated message header,
- * usually points to the end of the containing GNUNET_MQ_Message
- */
- struct GNUNET_MessageHeader *mh;
-
- /**
- * Queue the message is queued in, NULL if message is not queued.
- */
- struct GNUNET_MQ_MessageQueue *parent_queue;
-
- /**
- * Called after the message was sent irrevokably
- */
- GNUNET_MQ_NotifyCallback sent_cb;
-
- /**
- * Closure for send_cb
- */
- void *sent_cls;
-};
-
-
/**
- * Handle to a message queue.
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure, same closure as for the message handlers
+ * @param error error code
*/
-struct GNUNET_MQ_MessageQueue
-{
- /**
- * Handlers array, or NULL if the queue should not receive messages
- */
- const struct GNUNET_MQ_Handler *handlers;
-
- /**
- * Closure for the handler callbacks,
- * as well as for the error handler.
- */
- void *handlers_cls;
-
- /**
- * Actual implementation of message sending,
- * called when a message is added
- */
- GNUNET_MQ_SendImpl send_impl;
-
- /**
- * Implementation-dependent queue destruction function
- */
- GNUNET_MQ_DestroyImpl destroy_impl;
-
- /**
- * Implementation-specific state
- */
- void *impl_state;
-
- /**
- * Callback will be called when an error occurs.
- */
- GNUNET_MQ_ErrorHandler error_handler;
-
- /**
- * Linked list of messages pending to be sent
- */
- struct GNUNET_MQ_Message *msg_head;
-
- /**
- * Linked list of messages pending to be sent
- */
- struct GNUNET_MQ_Message *msg_tail;
-
- /**
- * Message that is currently scheduled to be
- * sent. Not the head of the message queue, as the implementation
- * needs to know if sending has been already scheduled or not.
- */
- struct GNUNET_MQ_Message *current_msg;
-
- /**
- * Map of associations, lazily allocated
- */
- struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
-
- /**
- * Next id that should be used for the assoc_map,
- * initialized lazily to a random value together with
- * assoc_map
- */
- uint32_t assoc_id;
-};
+typedef void
+(*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error);
/**
* Message handler for a specific message type.
*/
-struct GNUNET_MQ_Handler
+struct GNUNET_MQ_MessageHandler
{
/**
* Callback, called every time a new message of
/**
- * Create a new message for MQ.
+ * Create a new envelope.
*
* @param mhp message header to store the allocated message header in, can be NULL
* @param size size of the message to allocate
* @param type type of the message, will be set in the allocated message
* @return the allocated MQ message
*/
-struct GNUNET_MQ_Message *
+struct GNUNET_MQ_Envelope *
GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type);
* @param mqm the message to discard
*/
void
-GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm);
+GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm);
/**
* @param mqm the message to send.
*/
void
-GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm);
+GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev);
/**
* @param mqm queued message to cancel
*/
void
-GNUNET_MQ_send_cancel (struct GNUNET_MQ_Message *mqm);
+GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev);
/**
* @param assoc_data to associate
*/
uint32_t
-GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
- struct GNUNET_MQ_Message *mqm,
- void *assoc_data);
+GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data);
/**
* Get the data associated with a request id in a queue
* @return the associated data
*/
void *
-GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id);
+GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id);
/**
* @return the associated data
*/
void *
-GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id);
+GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id);
* @param cls closure for the handlers
* @return the message queue
*/
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
- const struct GNUNET_MQ_Handler *handlers,
+ const struct GNUNET_MQ_MessageHandler *handlers,
void *cls);
* @param client the client
* @return the message queue
*/
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client);
*
* @param send function the implements sending messages
* @param destroy function that implements destroying the queue
+ * @param destroy function that implements canceling a message
* @param state for the queue, passed to 'send' and 'destroy'
* @param handlers array of message handlers
* @param error_handler handler for read and write errors
+ * @param cls closure for handlers
* @return a new message queue
*/
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
GNUNET_MQ_DestroyImpl destroy,
+ GNUNET_MQ_CancelImpl cancel,
void *impl_state,
- struct GNUNET_MQ_Handler *handlers,
+ const struct GNUNET_MQ_MessageHandler *handlers,
GNUNET_MQ_ErrorHandler error_handler,
void *cls);
* Takes effect immediately, even for messages that already have been received, but for
* with the handler has not been called.
*
+ * If the message queue does not support receiving messages,
+ * this function has no effect.
+ *
* @param mq message queue
* @param new_handlers new handlers
* @param cls new closure for the handlers
*/
void
-GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
- const struct GNUNET_MQ_Handler *new_handlers,
+GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MQ_MessageHandler *new_handlers,
void *cls);
/**
- * Call a callback once the message has been sent, that is, the message
- * can not be canceled anymore.
- * There can be only one notify sent callback per message.
+ * Call a callback once the envelope has been sent, that is,
+ * sending it can not be canceled anymore.
+ * There can be only one notify sent callback per envelope.
*
- * @param mqm message to call the notify callback for
+ * @param ev message to call the notify callback for
* @param cb the notify callback
* @param cls closure for the callback
*/
void
-GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
+GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
GNUNET_MQ_NotifyCallback cb,
void *cls);
* @param mq message queue to destroy
*/
void
-GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq);
+GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq);
/**
* @param mh message to dispatch
*/
void
-GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq,
- const struct GNUNET_MessageHeader *mh);
+GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *mh);
+
+
+/**
+ * Call the right callback for an error condition.
+ *
+ * @param mq message queue
+ */
+void
+GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
+ enum GNUNET_MQ_Error error);
+
+
+/**
+ * Call the send implementation for the next queued message,
+ * if any.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
+ */
+void
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq);
+
+
+/**
+ * Get the message that should currently be sent.
+ * Fails if there is no current message.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue with the current message
+ * @return message to send, never NULL
+ */
+const struct GNUNET_MessageHeader *
+GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq);
+
+
+/**
+ * Get the implementation state associated with the
+ * message queue.
+ *
+ * While the GNUNET_MQ_Impl* callbacks receive the
+ * implementation state, continuations that are scheduled
+ * by the implementation function often only have one closure
+ * argument, with this function it is possible to get at the
+ * implementation state when only passing the GNUNET_MQ_Handle
+ * as closure.
+ *
+ * @param mq message queue with the current message
+ * @return message to send, never NULL
+ */
+void *
+GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq);
+
+/**
+ * Mark the current message as irrevocably sent, but do not
+ * proceed with sending the next message.
+ * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
+ *
+ * @param mq message queue
+ */
+void
+GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq);
#endif
/**
- * Create a set operation for evaluation with another peer.
+ * Prepare a set operation to be evaluated with another peer.
* The evaluation will not start until the client provides
- * a local set with GNUNET_SET_conclude.
+ * a local set with GNUNET_SET_commit.
*
* @param other_peer peer with the other set
* @param app_id hash for the application using the set
* @param result_cls closure for result_cb
* @return a handle to cancel the operation
*/
-struct GNUNET_SET_OperationHandle * // FIXME: rename to _connect?
-GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer,
- const struct GNUNET_HashCode *app_id,
- const struct GNUNET_MessageHeader *context_msg,
- uint16_t salt,
- enum GNUNET_SET_ResultMode result_mode,
- GNUNET_SET_ResultIterator result_cb,
- void *result_cls);
+struct GNUNET_SET_OperationHandle *
+GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
+ const struct GNUNET_HashCode *app_id,
+ const struct GNUNET_MessageHeader *context_msg,
+ uint16_t salt,
+ enum GNUNET_SET_ResultMode result_mode,
+ GNUNET_SET_ResultIterator result_cb,
+ void *result_cls);
/**
* Accept a request we got via GNUNET_SET_listen. Must be called during
* GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
* afterwards.
- * Call GNUNET_SET_conclude to provide the local set to use for the operation,
+ * Call GNUNET_SET_commit to provide the local set to use for the operation,
* and to begin the exchange with the remote peer.
*
* @param request request to accept
/**
- * Conclude the given set operation using the given set.
+ * Commit a set to be used with a set operation.
* This function is called once we have fully constructed
* the set that we want to use for the operation. At this
* time, the P2P protocol can then begin to exchange the
* @param oh handle to the set operation
* @param set the set to use for the operation
*/
-void // FIXME: rename to _commit
-GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh,
- struct GNUNET_SET_Handle *set);
+void
+GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
+ struct GNUNET_SET_Handle *set);
/**
* @param error_handler callback for errors
* @return the message queue for the socket
*/
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket,
- const struct GNUNET_MQ_Handler *msg_handlers,
+ const struct GNUNET_MQ_MessageHandler *msg_handlers,
GNUNET_MQ_ErrorHandler error_handler,
void *cls);
};
+/**
+ * Implementation state for mesh's message queue.
+ */
+struct MeshMQState
+{
+ /**
+ * The current transmit handle, or NULL
+ * if no transmit is active.
+ */
+ struct GNUNET_MESH_TransmitHandle *th;
+
+ /**
+ * Tunnel to send the data over.
+ */
+ struct GNUNET_MESH_Tunnel *tunnel;
+};
+
+
/******************************************************************************/
/*********************** DECLARATIONS *************************/
/******************************************************************************/
h->tunnel_cls = callback_cls;
return;
-}
\ No newline at end of file
+}
+
+
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data. "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+mesh_mq_ntr (void *cls, size_t size,
+ void *buf)
+{
+ struct GNUNET_MQ_Handle *mq = cls;
+ struct MeshMQState *state = GNUNET_MQ_impl_state (mq);
+ const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
+ uint16_t msize;
+
+ state->th = NULL;
+ if (NULL == buf)
+ {
+ GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
+ return 0;
+ }
+ msize = ntohs (msg->size);
+ GNUNET_assert (msize <= size);
+ memcpy (buf, msg, msize);
+ GNUNET_MQ_impl_send_continue (mq);
+ return msize;
+}
+
+
+/**
+ * Signature of functions implementing the
+ * sending functionality of a message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+mesh_mq_send_impl (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg, void *impl_state)
+{
+ struct MeshMQState *state = impl_state;
+
+ GNUNET_assert (NULL == state->th);
+ GNUNET_MQ_impl_send_commit (mq);
+ state->th =
+ GNUNET_MESH_notify_transmit_ready (state->tunnel,
+ /* FIXME: add option for corking */
+ GNUNET_NO,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ ntohs (msg->size),
+ mesh_mq_ntr, mq);
+
+}
+
+
+/**
+ * Signature of functions implementing the
+ * destruction of a message queue.
+ * Implementations must not free 'mq', but should
+ * take care of 'impl_state'.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+mesh_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+ struct MeshMQState *state = impl_state;
+
+ if (NULL != state->th)
+ GNUNET_MESH_notify_transmit_ready_cancel (state->th);
+
+ GNUNET_free (state);
+}
+
+
+/**
+ * Create a message queue for a mesh tunnel.
+ * The message queue can only be used to transmit messages,
+ * not to receive them.
+ *
+ * @param tunnel the tunnel to create the message qeue for
+ * @return a message queue to messages over the tunnel
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel)
+{
+ struct GNUNET_MQ_Handle *mq;
+ struct MeshMQState *state;
+
+ state = GNUNET_new (struct MeshMQState);
+ state->tunnel = tunnel;
+
+ mq = GNUNET_MQ_queue_for_callbacks (mesh_mq_send_impl,
+ mesh_mq_destroy_impl,
+ NULL, /* FIXME: cancel impl. */
+ state,
+ NULL, /* no msg handlers */
+ NULL, /* no err handlers */
+ NULL); /* no handler cls */
+ return mq;
+}
+
endif
bin_PROGRAMS = \
- gnunet-set
+ gnunet-set-profiler gnunet-set-ibf-profiler
libexec_PROGRAMS = \
gnunet-service-set
lib_LTLIBRARIES = \
libgnunetset.la
-gnunet_set_SOURCES = \
- gnunet-set.c
-gnunet_set_LDADD = \
+gnunet_set_profiler_SOURCES = \
+ gnunet-set-profiler.c
+gnunet_set_profiler_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/set/libgnunetset.la \
- $(top_builddir)/src/stream/libgnunetstream.la \
- $(top_builddir)/src/testbed/libgnunettestbed.la \
+ $(top_builddir)/src/testing/libgnunettesting.la \
$(GN_LIBINTL)
-gnunet_set_DEPENDENCIES = \
+gnunet_set_profiler_DEPENDENCIES = \
libgnunetset.la
+
+gnunet_set_ibf_profiler_SOURCES = \
+ gnunet-set-ibf-profiler.c \
+ ibf.c
+gnunet_set_ibf_profiler_LDADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(GN_LIBINTL)
+
gnunet_service_set_SOURCES = \
gnunet-service-set.c \
gnunet-service-set_union.c \
gnunet_service_set_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/core/libgnunetcore.la \
- $(top_builddir)/src/stream/libgnunetstream.la \
- $(top_builddir)/src/mesh/libgnunetmesh.la \
+ $(top_builddir)/src/mesh/libgnunetmesh2.la \
$(GN_LIBINTL)
libgnunetset_la_SOURCES = \
/**
* Configuration of our local peer.
+ * (Not declared 'static' as also needed in gnunet-service-set_union.c)
*/
const struct GNUNET_CONFIGURATION_Handle *configuration;
/**
- * Socket listening for other peers via stream.
+ * Handle to the mesh service, used
+ * to listen for and connect to remote peers.
+ * (Not declared 'static' as also needed in gnunet-service-set_union.c)
*/
-static struct GNUNET_STREAM_ListenSocket *stream_listen_socket;
+struct GNUNET_MESH_Handle *mesh;
/**
* Sets are held in a doubly linked list.
/**
- * Get set that is owned by the client, if any.
+ * Get set that is owned by the given client, if any.
*
* @param client client to look for
* @return set that the client owns, NULL if the client
* does not own a set
*/
static struct Set *
-get_set (struct GNUNET_SERVER_Client *client)
+set_get (struct GNUNET_SERVER_Client *client)
{
struct Set *set;
for (set = sets_head; NULL != set; set = set->next)
* @param listener listener to destroy
*/
static void
-destroy_listener (struct Listener *listener)
+listener_destroy (struct Listener *listener)
{
if (NULL != listener->client_mq)
{
* @param set the set to destroy
*/
static void
-destroy_set (struct Set *set)
+set_destroy (struct Set *set)
{
switch (set->operation)
{
struct Set *set;
struct Listener *listener;
- set = get_set (client);
+ set = set_get (client);
if (NULL != set)
- destroy_set (set);
+ set_destroy (set);
listener = get_listener (client);
if (NULL != listener)
- destroy_listener (listener);
+ listener_destroy (listener);
}
* @param incoming remote request to destroy
*/
static void
-destroy_incoming (struct Incoming *incoming)
+incoming_destroy (struct Incoming *incoming)
{
- if (NULL != incoming->mq)
+ if (NULL != incoming->tc)
{
- GNUNET_MQ_destroy (incoming->mq);
- incoming->mq = NULL;
- }
- if (NULL != incoming->socket)
- {
- GNUNET_STREAM_close (incoming->socket);
- incoming->socket = NULL;
+ GNUNET_free (incoming->tc);
+ GNUNET_assert (NULL != incoming->tc->tunnel);
+ GNUNET_MESH_tunnel_destroy (incoming->tc->tunnel);
+ incoming->tc = NULL;
}
GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
GNUNET_free (incoming);
}
+
+static void
+tunnel_context_destroy (struct TunnelContext *tc)
+{
+ GNUNET_free (tc);
+ /* FIXME destroy the rest */
+}
+
+
/**
* Handle a request for a set operation from
* another peer.
* @param cls the incoming socket
* @param mh the message
*/
-static void
-handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
+static int
+handle_p2p_operation_request (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *mh)
{
- struct Incoming *incoming = cls;
+ struct TunnelContext *tc = *tunnel_ctx;
+ struct Incoming *incoming;
const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_RequestMessage *cmsg;
struct Listener *listener;
const struct GNUNET_MessageHeader *context_msg;
+ if (CONTEXT_INCOMING != tc->type)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected operation request\n");
+ tunnel_context_destroy (tc);
+ /* don't kill the whole mesh connection */
+ return GNUNET_OK;
+ }
+
+ incoming = tc->data;
+
context_msg = GNUNET_MQ_extract_nested_mh (msg);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n",
ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"set operation request from peer failed: "
"no set with matching application ID and operation type\n");
- return;
+ tunnel_context_destroy (tc);
+ /* don't kill the whole mesh connection */
+ return GNUNET_OK;
}
mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg);
if (NULL == mqm)
{
/* FIXME: disconnect the peer */
GNUNET_break_op (0);
- return;
+ tunnel_context_destroy (tc);
+ /* don't kill the whole mesh connection */
+ return GNUNET_OK;
}
incoming->accept_id = accept_id++;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id);
cmsg->accept_id = htonl (incoming->accept_id);
- cmsg->peer_id = incoming->peer;
+ cmsg->peer_id = incoming->tc->peer;
GNUNET_MQ_send (listener->client_mq, mqm);
+
+ return GNUNET_OK;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n",
ntohs (msg->operation));
- if (NULL != get_set (client))
+ if (NULL != set_get (client))
{
GNUNET_break (0);
GNUNET_SERVER_client_disconnect (client);
{
struct Set *set;
- set = get_set (client);
+ set = set_get (client);
if (NULL == set)
{
GNUNET_break (0);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
- destroy_incoming (incoming);
+ incoming_destroy (incoming);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
{
struct Set *set;
- set = get_set (client);
+ set = set_get (client);
if (NULL == set)
{
GNUNET_break (0);
{
struct Set *set;
- set = get_set (client);
+ set = set_get (client);
if (NULL == set)
{
GNUNET_break (0);
return;
}
-
- set = get_set (client);
+ set = set_get (client);
if (NULL == set)
{
/* note: _GSS_*_accept has to make sure the socket and mq are set to NULL,
* otherwise they will be destroyed and disconnected */
- destroy_incoming (incoming);
+ incoming_destroy (incoming);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
-/**
- * Functions of this type are called upon new stream connection from other peers
- * or upon binding error which happen when the app_port given in
- * GNUNET_STREAM_listen() is already taken.
- *
- * @param cls the closure from GNUNET_STREAM_listen
- * @param socket the socket representing the stream; NULL on binding error
- * @param initiator the identity of the peer who wants to establish a stream
- * with us; NULL on binding error
- * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
- * stream (the socket will be invalid after the call)
- */
-static int
-stream_listen_cb (void *cls,
- struct GNUNET_STREAM_Socket *socket,
- const struct GNUNET_PeerIdentity *initiator)
-{
- struct Incoming *incoming;
- static const struct GNUNET_MQ_Handler handlers[] = {
- {handle_p2p_operation_request, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST},
- GNUNET_MQ_HANDLERS_END
- };
-
- if (NULL == socket)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
-
- incoming = GNUNET_new (struct Incoming);
- incoming->peer = *initiator;
- incoming->socket = socket;
- incoming->mq = GNUNET_STREAM_mq_create (incoming->socket, handlers, NULL, incoming);
- /* FIXME: timeout for peers that only connect but don't send anything */
- GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
- return GNUNET_OK;
-}
-
-
/**
* Called to clean up, after a shutdown has been requested.
*
shutdown_task (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- if (NULL != stream_listen_socket)
+ if (NULL != mesh)
{
- GNUNET_STREAM_listen_close (stream_listen_socket);
- stream_listen_socket = NULL;
+ GNUNET_MESH_disconnect (mesh);
+ mesh = NULL;
}
while (NULL != incoming_head)
{
- destroy_incoming (incoming_head);
+ incoming_destroy (incoming_head);
}
while (NULL != listeners_head)
{
- destroy_listener (listeners_head);
+ listener_destroy (listeners_head);
}
while (NULL != sets_head)
{
- destroy_set (sets_head);
+ set_destroy (sets_head);
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
}
+
+/**
+ * Signature of the main function of a task.
+ *
+ * @param cls closure
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+incoming_timeout_cb (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Incoming *incoming = cls;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "remote peer timed out");
+ incoming_destroy (incoming);
+}
+
+
+/**
+ * Method called whenever another peer has added us to a tunnel
+ * the other peer initiated.
+ * Only called (once) upon reception of data with a message type which was
+ * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy
+ * causes te tunnel to be ignored and no further notifications are sent about
+ * the same tunnel.
+ *
+ * @param cls closure
+ * @param tunnel new handle to the tunnel
+ * @param initiator peer that started the tunnel
+ * @param port Port this tunnel is for.
+ * @return initial tunnel context for the tunnel
+ * (can be NULL -- that's not an error)
+ */
+static void *
+tunnel_new_cb (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *initiator,
+ uint32_t port)
+{
+ struct Incoming *incoming;
+ struct TunnelContext *tc;
+
+ GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET);
+ tc = GNUNET_new (struct TunnelContext);
+ incoming = GNUNET_new (struct Incoming);
+ incoming->tc = tc;
+ tc->peer = *initiator;
+ tc->tunnel = tunnel;
+ tc->mq = GNUNET_MESH_mq_create (tunnel);
+ tc->data = incoming;
+ tc->type = CONTEXT_INCOMING;
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming);
+ GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
+
+ return tc;
+}
+
+
+/**
+ * Function called whenever a tunnel is destroyed. Should clean up
+ * any associated state. This function is NOT called if the client has
+ * explicitly asked for the tunnel to be destroyed using
+ * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
+ * the tunnel.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end (henceforth invalid)
+ * @param tunnel_ctx place where local state associated
+ * with the tunnel is stored
+ */
+static void
+tunnel_end_cb (void *cls,
+ const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
+{
+ struct TunnelContext *ctx = tunnel_ctx;
+
+ switch (ctx->type)
+ {
+ case CONTEXT_INCOMING:
+ incoming_destroy ((struct Incoming *) ctx->data);
+ break;
+ case CONTEXT_OPERATION_UNION:
+ _GSS_union_operation_destroy ((struct UnionEvaluateOperation *) ctx->data);
+ break;
+ case CONTEXT_OPERATION_INTERSECTION:
+ GNUNET_assert (0);
+ /* FIXME: cfuchs */
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+
+}
+
+
/**
* Function called by the service's run
* method to run service-specific setup code.
{handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
{NULL, NULL, 0, 0}
};
+ static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
+ {handle_p2p_operation_request,
+ GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
+ /* messages for the union operation */
+ {_GSS_union_handle_p2p_message,
+ GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0},
+ {_GSS_union_handle_p2p_message,
+ GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
+ {_GSS_union_handle_p2p_message,
+ GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
+ {_GSS_union_handle_p2p_message,
+ GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
+ {_GSS_union_handle_p2p_message,
+ GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0},
+ /* FIXME: messages for intersection operation */
+ {NULL, 0, 0}
+ };
+ static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
configuration = cfg;
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+ &shutdown_task, NULL);
GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
GNUNET_SERVER_add_handlers (server, server_handlers);
- stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET,
- &stream_listen_cb, NULL,
- GNUNET_STREAM_OPTION_END);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set service running\n");
+ mesh = GNUNET_MESH_connect (cfg, NULL, tunnel_new_cb, tunnel_end_cb,
+ mesh_handlers, mesh_ports);
+ if (NULL == mesh)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not connect to mesh\n");
+ return;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "service started\n");
}
main (int argc, char *const *argv)
{
int ret;
- ret = GNUNET_SERVICE_run (argc, argv, "set", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
+ ret = GNUNET_SERVICE_run (argc, argv, "set",
+ GNUNET_SERVICE_OPTION_NONE, &run, NULL);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
return (GNUNET_OK == ret) ? 0 : 1;
}
#include "gnunet_applications.h"
#include "gnunet_util_lib.h"
#include "gnunet_core_service.h"
-#include "gnunet_stream_lib.h"
+#include "gnunet_mesh2_service.h"
#include "gnunet_set_service.h"
#include "set.h"
*/
struct UnionState;
+struct UnionEvaluateOperation;
+
/**
* A set that supports a specific operation
/**
* Message queue for the client
*/
- struct GNUNET_MQ_MessageQueue *client_mq;
+ struct GNUNET_MQ_Handle *client_mq;
/**
* Type of operation supported for this set
/**
* Message queue for the client
*/
- struct GNUNET_MQ_MessageQueue *client_mq;
+ struct GNUNET_MQ_Handle *client_mq;
/**
* Type of operation supported for this set
struct Incoming *prev;
/**
- * Identity of the peer that connected to us
+ * Tunnel context, stores information about
+ * the tunnel and its peer.
*/
- struct GNUNET_PeerIdentity peer;
+ struct TunnelContext *tc;
/**
- * Socket connected to the peer
+ * GNUNET_YES if the incoming peer has sent
+ * an operation request (and we are waiting
+ * for the client to ack/nack), GNUNET_NO otherwise.
*/
- struct GNUNET_STREAM_Socket *socket;
-
- /**
- * Message queue for the peer
- */
- struct GNUNET_MQ_MessageQueue *mq;
+ int received_request;
/**
* App code, set once the peer has
/**
* Unique request id for the request from
- * a remote peer, sent to the client with will
+ * a remote peer, sent to the client, which will
* accept or reject the request.
*/
uint32_t accept_id;
};
+enum TunnelContextType {
+ CONTEXT_INCOMING,
+ CONTEXT_OPERATION_UNION,
+ CONTEXT_OPERATION_INTERSECTION,
+};
+
+struct TunnelContext
+{
+ struct GNUNET_MESH_Tunnel *tunnel;
+ struct GNUNET_PeerIdentity peer;
+ struct GNUNET_MQ_Handle *mq;
+ enum TunnelContextType type;
+ void *data;
+};
+
+
+
/**
* Configuration of the local peer
*/
extern const struct GNUNET_CONFIGURATION_Handle *configuration;
+extern struct GNUNET_MESH_Handle *mesh;
+
/**
* Create a new set supporting the union operation
struct Incoming *incoming);
+/**
+ * Destroy a union operation, and free all resources
+ * associated with it.
+ *
+ * @param eo the union operation to destroy
+ */
+void
+_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo);
+
+
+/**
+ * Dispatch messages for a union operation.
+ *
+ * @param cls closure
+ * @param tunnel mesh tunnel
+ * @param tunnel_ctx tunnel context
+ * @param sender ???
+ * @param mh message to process
+ * @return ???
+ */
+int
+_GSS_union_handle_p2p_message (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *mh);
+
+
#endif
struct GNUNET_MessageHeader *context_msg;
/**
- * Stream socket connected to the other peer
+ * Tunnel context for the peer we
+ * evaluate the union operation with.
*/
- struct GNUNET_STREAM_Socket *socket;
-
- /**
- * Message queue for the peer on the other
- * end
- */
- struct GNUNET_MQ_MessageQueue *mq;
+ struct TunnelContext *tc;
/**
* Request ID to multiplex set operations to
*
* @param eo the union operation to destroy
*/
-static void
-destroy_union_operation (struct UnionEvaluateOperation *eo)
+void
+_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
- if (NULL != eo->mq)
+ if (NULL != eo->tc)
{
- GNUNET_MQ_destroy (eo->mq);
- eo->mq = NULL;
+ GNUNET_MQ_destroy (eo->tc->mq);
+ GNUNET_MESH_tunnel_destroy (eo->tc->tunnel);
+ GNUNET_free (eo->tc);
+ eo->tc = NULL;
}
- if (NULL != eo->socket)
- {
- GNUNET_STREAM_close (eo->socket);
- eo->socket = NULL;
- }
if (NULL != eo->remote_ibf)
{
ibf_destroy (eo->remote_ibf);
static void
fail_union_operation (struct UnionEvaluateOperation *eo)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_ResultMessage *msg;
mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
msg->request_id = htonl (eo->request_id);
GNUNET_MQ_send (eo->set->client_mq, mqm);
- destroy_union_operation (eo);
+ _GSS_union_operation_destroy (eo);
}
static void
send_operation_request (struct UnionEvaluateOperation *eo)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct OperationRequestMessage *msg;
mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg);
}
msg->operation = htons (GNUNET_SET_OPERATION_UNION);
msg->app_id = eo->app_id;
- GNUNET_MQ_send (eo->mq, mqm);
+ GNUNET_MQ_send (eo->tc->mq, mqm);
if (NULL != eo->context_msg)
{
* Insert an element into the union operation's
* key-to-element mapping
*
- * @param the union operation
+ * @param eo the union operation
* @param ee the element entry
*/
static void
while (buckets_sent < (1 << ibf_order))
{
unsigned int buckets_in_message;
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct IBFMessage *msg;
buckets_in_message = (1 << ibf_order) - buckets_sent;
ibf_write_slice (ibf, buckets_sent,
buckets_in_message, &msg[1]);
buckets_sent += buckets_in_message;
- GNUNET_MQ_send (eo->mq, mqm);
+ GNUNET_MQ_send (eo->tc->mq, mqm);
}
eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
static void
send_strata_estimator (struct UnionEvaluateOperation *eo)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_MessageHeader *strata_msg;
mqm = GNUNET_MQ_msg_header_extra (strata_msg,
SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
GNUNET_MESSAGE_TYPE_SET_P2P_SE);
strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
- GNUNET_MQ_send (eo->mq, mqm);
+ GNUNET_MQ_send (eo->tc->mq, mqm);
eo->phase = PHASE_EXPECT_IBF;
}
/**
* Handle a strata estimator from a remote peer
*
- * @param the union operation
+ * @param cls the union operation
* @param mh the message
*/
static void
while (NULL != ke)
{
const struct GNUNET_SET_Element *const element = &ke->element->element;
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_MessageHeader *mh;
GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
}
memcpy (&mh[1], element->data, element->size);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
- GNUNET_MQ_send (eo->mq, mqm);
+ GNUNET_MQ_send (eo->tc->mq, mqm);
ke = ke->next_colliding;
}
return GNUNET_NO;
}
if (GNUNET_NO == res)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
- GNUNET_MQ_send (eo->mq, mqm);
+ GNUNET_MQ_send (eo->tc->mq, mqm);
break;
}
if (1 == side)
}
else
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_MessageHeader *msg;
/* FIXME: before sending the request, check if we may just have the element */
mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
*(struct IBF_Key *) &msg[1] = key;
- GNUNET_MQ_send (eo->mq, mqm);
+ GNUNET_MQ_send (eo->tc->mq, mqm);
}
}
ibf_destroy (diff_ibf);
send_client_element (struct UnionEvaluateOperation *eo,
struct GNUNET_SET_Element *element)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_ResultMessage *rm;
GNUNET_assert (0 != eo->request_id);
}
-/**
- * Completion callback for shutdown
- *
- * @param cls the closure from GNUNET_STREAM_shutdown call
- * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
- * SHUT_RDWR)
- */
-/*
-static void
-stream_shutdown_cb (void *cls,
- int operation)
-{
- //struct UnionEvaluateOperation *eo = cls;
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n");
-
- // destroy_union_operation (eo);
-}
-*/
-
-
/**
* Send a result message to the client indicating
* that the operation is over.
* destroy the evaluate operation.
*
* @param eo union operation
- * @param element element to send
*/
static void
send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_ResultMessage *rm;
GNUNET_assert (0 != eo->request_id);
rm->result_status = htons (GNUNET_SET_STATUS_DONE);
GNUNET_MQ_send (eo->set->client_mq, mqm);
- // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo);
}
if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
{
/* we got all requests, but still have to send our elements as response */
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
eo->phase = PHASE_FINISHED;
mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
- GNUNET_MQ_send (eo->mq, mqm);
+ GNUNET_MQ_send (eo->tc->mq, mqm);
return;
}
if (eo->phase == PHASE_EXPECT_ELEMENTS)
}
-/**
- * The handlers array, used for both evaluate and accept
- */
-static const struct GNUNET_MQ_Handler union_handlers[] = {
- {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
- {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
- {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
- {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
- {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
- GNUNET_MQ_HANDLERS_END
-};
-
-
-/**
- * Functions of this type will be called when a stream is established
- *
- * @param cls the closure from GNUNET_STREAM_open
- * @param socket socket to use to communicate with the
- * other side (read/write)
- */
-static void
-stream_open_cb (void *cls,
- struct GNUNET_STREAM_Socket *socket)
-{
- struct UnionEvaluateOperation *eo = cls;
-
- GNUNET_assert (NULL == eo->mq);
- GNUNET_assert (socket == eo->socket);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "open cb successful\n");
- eo->mq = GNUNET_STREAM_mq_create (eo->socket, union_handlers, NULL, eo);
- /* we started the operation, thus we have to send the operation request */
- send_operation_request (eo);
- eo->phase = PHASE_EXPECT_SE;
-}
-
-
/**
* Evaluate a union operation with
* a remote peer.
*
* @param m the evaluate request message from the client
- * @parem set the set to evaluate the operation with
+ * @param set the set to evaluate the operation with
*/
void
_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
"evaluating union operation, (app %s)\n",
GNUNET_h2s (&eo->app_id));
- eo->socket =
- GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
- &stream_open_cb, eo,
- GNUNET_STREAM_OPTION_END);
+ eo->tc = GNUNET_new (struct TunnelContext);
+ eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer,
+ GNUNET_APPLICATION_TYPE_SET);
+ GNUNET_assert (NULL != eo->tc->tunnel);
+ eo->tc->peer = eo->peer;
+ eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel);
+ /* we started the operation, thus we have to send the operation request */
+ eo->phase = PHASE_EXPECT_SE;
+
GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
eo->set->state.u->ops_tail,
eo);
- /* the stream open callback will kick off the operation */
+
+ send_operation_request (eo);
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
eo = GNUNET_new (struct UnionEvaluateOperation);
+ eo->tc = incoming->tc;
eo->generation_created = set->state.u->current_generation++;
eo->set = set;
- eo->peer = incoming->peer;
eo->salt = ntohs (incoming->salt);
GNUNET_assert (0 != ntohl (m->request_id));
eo->request_id = ntohl (m->request_id);
eo->se = strata_estimator_dup (set->state.u->se);
- eo->mq = incoming->mq;
/* transfer ownership of mq and socket from incoming to eo */
- incoming->mq = NULL;
- eo->socket = incoming->socket;
- incoming->socket = NULL;
- /* the peer's socket is now ours, we'll receive all messages */
- GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
-
GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
eo->set->state.u->ops_tail,
eo);
-
/* kick off the operation */
send_strata_estimator (eo);
}
while (NULL != set->state.u->ops_head)
{
- destroy_union_operation (set->state.u->ops_head);
+ _GSS_union_operation_destroy (set->state.u->ops_head);
}
}
ee->generation_removed = set->state.u->current_generation;
}
+
+/**
+ * Dispatch messages for a union operation.
+ *
+ * @param cls closure
+ * @param tunnel mesh tunnel
+ * @param tunnel_ctx tunnel context
+ * @param sender ???
+ * @param mh message to process
+ * @return ???
+ */
+int
+_GSS_union_handle_p2p_message (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct TunnelContext *tc = *tunnel_ctx;
+ struct UnionEvaluateOperation *eo;
+
+ if (CONTEXT_OPERATION_UNION != tc->type)
+ {
+ /* FIXME: kill the tunnel */
+ /* never kill mesh */
+ return GNUNET_OK;
+ }
+
+ eo = tc->data;
+
+ switch (ntohs (mh->type))
+ {
+ case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
+ handle_p2p_ibf (eo, mh);
+ break;
+ case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
+ handle_p2p_strata_estimator (eo, mh);
+ break;
+ case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
+ handle_p2p_elements (eo, mh);
+ break;
+ case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
+ handle_p2p_element_requests (eo, mh);
+ break;
+ case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
+ handle_p2p_done (eo, mh);
+ break;
+ default:
+ /* something wrong with mesh's message handlers? */
+ GNUNET_assert (0);
+ }
+ /* never kill mesh! */
+ return GNUNET_OK;
+}
--- /dev/null
+/*
+ This file is part of GNUnet.
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file set/gnunet-set-ibf-profiler.c
+ * @brief tool for profiling the invertible bloom filter implementation
+ * @author Florian Dold
+ */
+
+
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_container_lib.h"
+#include "gnunet_util_lib.h"
+
+#include "ibf.h"
+
+static unsigned int asize = 10;
+static unsigned int bsize = 10;
+static unsigned int csize = 10;
+static unsigned int hash_num = 4;
+static unsigned int ibf_size = 80;
+
+/* FIXME: add parameter for this */
+static enum GNUNET_CRYPTO_Quality random_quality = GNUNET_CRYPTO_QUALITY_WEAK;
+
+static struct GNUNET_CONTAINER_MultiHashMap *set_a;
+static struct GNUNET_CONTAINER_MultiHashMap *set_b;
+/* common elements in a and b */
+static struct GNUNET_CONTAINER_MultiHashMap *set_c;
+
+static struct GNUNET_CONTAINER_MultiHashMap *key_to_hashcode;
+
+static struct InvertibleBloomFilter *ibf_a;
+static struct InvertibleBloomFilter *ibf_b;
+
+
+static void
+register_hashcode (struct GNUNET_HashCode *hash)
+{
+ struct GNUNET_HashCode replicated;
+ struct IBF_Key key;
+ key = ibf_key_from_hashcode (hash);
+ ibf_hashcode_from_key (key, &replicated);
+ GNUNET_CONTAINER_multihashmap_put (key_to_hashcode, &replicated, GNUNET_memdup (hash, sizeof *hash),
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+}
+
+static void
+iter_hashcodes (struct IBF_Key key, GNUNET_CONTAINER_HashMapIterator iter, void *cls)
+{
+ struct GNUNET_HashCode replicated;
+ ibf_hashcode_from_key (key, &replicated);
+ GNUNET_CONTAINER_multihashmap_get_multiple (key_to_hashcode, &replicated, iter, cls);
+}
+
+
+static int
+insert_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct InvertibleBloomFilter *ibf = (struct InvertibleBloomFilter *) cls;
+ ibf_insert (ibf, ibf_key_from_hashcode (key));
+ return GNUNET_YES;
+}
+
+
+static int
+remove_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GNUNET_CONTAINER_MultiHashMap *hashmap = cls;
+ /* if remove fails, there just was a collision with another key */
+ (void) GNUNET_CONTAINER_multihashmap_remove (hashmap, value, NULL);
+ return GNUNET_YES;
+}
+
+
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ struct GNUNET_HashCode id;
+ struct IBF_Key ibf_key;
+ int i;
+ int side;
+ int res;
+ struct GNUNET_TIME_Absolute start_time;
+ struct GNUNET_TIME_Relative delta_time;
+
+ set_a = GNUNET_CONTAINER_multihashmap_create (((asize == 0) ? 1 : (asize + csize)),
+ GNUNET_NO);
+ set_b = GNUNET_CONTAINER_multihashmap_create (((bsize == 0) ? 1 : (bsize + csize)),
+ GNUNET_NO);
+ set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize),
+ GNUNET_NO);
+
+ key_to_hashcode = GNUNET_CONTAINER_multihashmap_create (((asize+bsize+csize == 0) ? 1 : (asize+bsize+csize)),
+ GNUNET_NO);
+
+ printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n",
+ hash_num, ibf_size, asize, bsize, csize);
+
+ i = 0;
+ while (i < asize)
+ {
+ GNUNET_CRYPTO_hash_create_random (random_quality, &id);
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
+ continue;
+ GNUNET_CONTAINER_multihashmap_put (
+ set_a, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ register_hashcode (&id);
+ i++;
+ }
+ i = 0;
+ while (i < bsize)
+ {
+ GNUNET_CRYPTO_hash_create_random (random_quality, &id);
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
+ continue;
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
+ continue;
+ GNUNET_CONTAINER_multihashmap_put (
+ set_b, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ register_hashcode (&id);
+ i++;
+ }
+ i = 0;
+ while (i < csize)
+ {
+ GNUNET_CRYPTO_hash_create_random (random_quality, &id);
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
+ continue;
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
+ continue;
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_c, &id))
+ continue;
+ GNUNET_CONTAINER_multihashmap_put (
+ set_c, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ register_hashcode (&id);
+ i++;
+ }
+
+ ibf_a = ibf_create (ibf_size, hash_num);
+ ibf_b = ibf_create (ibf_size, hash_num);
+
+ printf ("generated sets\n");
+
+ start_time = GNUNET_TIME_absolute_get ();
+
+ GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a);
+ GNUNET_CONTAINER_multihashmap_iterate (set_b, &insert_iterator, ibf_b);
+ GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_a);
+ GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_b);
+
+ delta_time = GNUNET_TIME_absolute_get_duration (start_time);
+
+ printf ("encoded in: %s\n", GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
+
+ ibf_subtract (ibf_a, ibf_b);
+
+
+ start_time = GNUNET_TIME_absolute_get ();
+
+ for (i = 0; i <= asize + bsize; i++)
+ {
+ res = ibf_decode (ibf_a, &side, &ibf_key);
+ if (GNUNET_SYSERR == res)
+ {
+ printf ("decode failed, %u/%u elements left\n",
+ GNUNET_CONTAINER_multihashmap_size (set_a) + GNUNET_CONTAINER_multihashmap_size (set_b),
+ asize + bsize);
+ return;
+ }
+ if (GNUNET_NO == res)
+ {
+ if ((0 == GNUNET_CONTAINER_multihashmap_size (set_b)) &&
+ (0 == GNUNET_CONTAINER_multihashmap_size (set_a)))
+ {
+ delta_time = GNUNET_TIME_absolute_get_duration (start_time);
+ printf ("decoded successfully in: %s\n", GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
+ }
+ else
+ {
+ printf ("decode missed elements (should never happen)\n");
+ }
+ return;
+ }
+
+ if (side == 1)
+ iter_hashcodes (ibf_key, remove_iterator, set_a);
+ if (side == -1)
+ iter_hashcodes (ibf_key, remove_iterator, set_b);
+ }
+ printf("cyclic IBF, %u/%u elements left\n",
+ GNUNET_CONTAINER_multihashmap_size (set_a) + GNUNET_CONTAINER_multihashmap_size (set_b),
+ asize + bsize);
+}
+
+int
+main (int argc, char **argv)
+{
+ static const struct GNUNET_GETOPT_CommandLineOption options[] = {
+ {'A', "asize", NULL,
+ gettext_noop ("number of element in set A-B"), 1,
+ &GNUNET_GETOPT_set_uint, &asize},
+ {'B', "bsize", NULL,
+ gettext_noop ("number of element in set B-A"), 1,
+ &GNUNET_GETOPT_set_uint, &bsize},
+ {'C', "csize", NULL,
+ gettext_noop ("number of common elements in A and B"), 1,
+ &GNUNET_GETOPT_set_uint, &csize},
+ {'k', "hash-num", NULL,
+ gettext_noop ("hash num"), 1,
+ &GNUNET_GETOPT_set_uint, &hash_num},
+ {'s', "ibf-size", NULL,
+ gettext_noop ("ibf size"), 1,
+ &GNUNET_GETOPT_set_uint, &ibf_size},
+ GNUNET_GETOPT_OPTION_END
+ };
+ GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus-ibf",
+ "help",
+ options, &run, NULL, GNUNET_YES);
+ return 0;
+}
+
+++ /dev/null
-/*
- This file is part of GNUnet.
- (C) 2012 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file consensus/gnunet-consensus-ibf.c
- * @brief tool for reconciling data with invertible bloom filters
- * @author Florian Dold
- */
-
-
-#include "platform.h"
-#include "gnunet_common.h"
-#include "gnunet_container_lib.h"
-#include "gnunet_util_lib.h"
-
-#include "ibf.h"
-
-static unsigned int asize = 10;
-static unsigned int bsize = 10;
-static unsigned int csize = 10;
-static unsigned int hash_num = 3;
-static unsigned int ibf_size = 80;
-
-/* FIXME: add parameter for this */
-static enum GNUNET_CRYPTO_Quality random_quality = GNUNET_CRYPTO_QUALITY_WEAK;
-
-static struct GNUNET_CONTAINER_MultiHashMap *set_a;
-static struct GNUNET_CONTAINER_MultiHashMap *set_b;
-/* common elements in a and b */
-static struct GNUNET_CONTAINER_MultiHashMap *set_c;
-
-static struct GNUNET_CONTAINER_MultiHashMap *key_to_hashcode;
-
-static struct InvertibleBloomFilter *ibf_a;
-static struct InvertibleBloomFilter *ibf_b;
-
-
-static void
-register_hashcode (struct GNUNET_HashCode *hash)
-{
- struct GNUNET_HashCode replicated;
- struct IBF_Key key;
- key = ibf_key_from_hashcode (hash);
- ibf_hashcode_from_key (key, &replicated);
- GNUNET_CONTAINER_multihashmap_put (key_to_hashcode, &replicated, GNUNET_memdup (hash, sizeof *hash),
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-}
-
-static void
-iter_hashcodes (struct IBF_Key key, GNUNET_CONTAINER_HashMapIterator iter, void *cls)
-{
- struct GNUNET_HashCode replicated;
- ibf_hashcode_from_key (key, &replicated);
- GNUNET_CONTAINER_multihashmap_get_multiple (key_to_hashcode, &replicated, iter, cls);
-}
-
-
-static int
-insert_iterator (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct InvertibleBloomFilter *ibf = (struct InvertibleBloomFilter *) cls;
- ibf_insert (ibf, ibf_key_from_hashcode (key));
- return GNUNET_YES;
-}
-
-
-static int
-remove_iterator (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct GNUNET_CONTAINER_MultiHashMap *hashmap = cls;
- /* if remove fails, there just was a collision with another key */
- (void) GNUNET_CONTAINER_multihashmap_remove (hashmap, value, NULL);
- return GNUNET_YES;
-}
-
-
-static void
-run (void *cls, char *const *args, const char *cfgfile,
- const struct GNUNET_CONFIGURATION_Handle *cfg)
-{
- struct GNUNET_HashCode id;
- struct IBF_Key ibf_key;
- int i;
- int side;
- int res;
- struct GNUNET_TIME_Absolute start_time;
- struct GNUNET_TIME_Relative delta_time;
-
- set_a = GNUNET_CONTAINER_multihashmap_create (((asize == 0) ? 1 : (asize + csize)),
- GNUNET_NO);
- set_b = GNUNET_CONTAINER_multihashmap_create (((bsize == 0) ? 1 : (bsize + csize)),
- GNUNET_NO);
- set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize),
- GNUNET_NO);
-
- key_to_hashcode = GNUNET_CONTAINER_multihashmap_create (((asize+bsize+csize == 0) ? 1 : (asize+bsize+csize)),
- GNUNET_NO);
-
- printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n",
- hash_num, ibf_size, asize, bsize, csize);
-
- i = 0;
- while (i < asize)
- {
- GNUNET_CRYPTO_hash_create_random (random_quality, &id);
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
- continue;
- GNUNET_CONTAINER_multihashmap_put (
- set_a, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
- register_hashcode (&id);
- i++;
- }
- i = 0;
- while (i < bsize)
- {
- GNUNET_CRYPTO_hash_create_random (random_quality, &id);
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
- continue;
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
- continue;
- GNUNET_CONTAINER_multihashmap_put (
- set_b, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
- register_hashcode (&id);
- i++;
- }
- i = 0;
- while (i < csize)
- {
- GNUNET_CRYPTO_hash_create_random (random_quality, &id);
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
- continue;
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
- continue;
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_c, &id))
- continue;
- GNUNET_CONTAINER_multihashmap_put (
- set_c, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
- register_hashcode (&id);
- i++;
- }
-
- ibf_a = ibf_create (ibf_size, hash_num);
- ibf_b = ibf_create (ibf_size, hash_num);
-
- printf ("generated sets\n");
-
- start_time = GNUNET_TIME_absolute_get ();
-
- GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a);
- GNUNET_CONTAINER_multihashmap_iterate (set_b, &insert_iterator, ibf_b);
- GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_a);
- GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_b);
-
- delta_time = GNUNET_TIME_absolute_get_duration (start_time);
-
- printf ("encoded in: %s\n", GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
-
- ibf_subtract (ibf_a, ibf_b);
-
-
- start_time = GNUNET_TIME_absolute_get ();
-
- for (;;)
- {
- res = ibf_decode (ibf_a, &side, &ibf_key);
- if (GNUNET_SYSERR == res)
- {
- printf ("decode failed\n");
- return;
- }
- if (GNUNET_NO == res)
- {
- if ((0 == GNUNET_CONTAINER_multihashmap_size (set_b)) &&
- (0 == GNUNET_CONTAINER_multihashmap_size (set_a)))
- {
- delta_time = GNUNET_TIME_absolute_get_duration (start_time);
- printf ("decoded successfully in: %s\n", GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
- }
- else
- printf ("decode missed elements\n");
- return;
- }
-
- if (side == 1)
- iter_hashcodes (ibf_key, remove_iterator, set_a);
- if (side == -1)
- iter_hashcodes (ibf_key, remove_iterator, set_b);
- }
-}
-
-int
-main (int argc, char **argv)
-{
- static const struct GNUNET_GETOPT_CommandLineOption options[] = {
- {'A', "asize", NULL,
- gettext_noop ("number of element in set A-B"), 1,
- &GNUNET_GETOPT_set_uint, &asize},
- {'B', "bsize", NULL,
- gettext_noop ("number of element in set B-A"), 1,
- &GNUNET_GETOPT_set_uint, &bsize},
- {'C', "csize", NULL,
- gettext_noop ("number of common elements in A and B"), 1,
- &GNUNET_GETOPT_set_uint, &csize},
- {'k', "hash-num", NULL,
- gettext_noop ("hash num"), 1,
- &GNUNET_GETOPT_set_uint, &hash_num},
- {'s', "ibf-size", NULL,
- gettext_noop ("ibf size"), 1,
- &GNUNET_GETOPT_set_uint, &ibf_size},
- GNUNET_GETOPT_OPTION_END
- };
- GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus-ibf",
- "help",
- options, &run, NULL, GNUNET_YES);
- return 0;
-}
-
--- /dev/null
+/*
+ This file is part of GNUnet
+ (C) 2013 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * @file set/gnunet-set-profiler.c
+ * @brief profiling tool for set
+ * @author Florian Dold
+ */
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_set_service.h"
+#include "gnunet_testbed_service.h"
+
+
+static int ret;
+
+static unsigned int num_a = 5;
+static unsigned int num_b = 5;
+static unsigned int num_c = 20;
+
+static unsigned int salt = 42;
+
+static char* op_str = "union";
+
+const static struct GNUNET_CONFIGURATION_Handle *config;
+
+struct GNUNET_CONTAINER_MultiHashMap *map_a;
+struct GNUNET_CONTAINER_MultiHashMap *map_b;
+struct GNUNET_CONTAINER_MultiHashMap *map_c;
+
+
+/**
+ * Elements that set a received, should match map_c
+ * in the end.
+ */
+struct GNUNET_CONTAINER_MultiHashMap *map_a_received;
+
+/**
+ * Elements that set b received, should match map_c
+ * in the end.
+ */
+struct GNUNET_CONTAINER_MultiHashMap *map_b_received;
+
+struct GNUNET_SET_Handle *set_a;
+struct GNUNET_SET_Handle *set_b;
+
+struct GNUNET_HashCode app_id;
+
+struct GNUNET_PeerIdentity local_peer;
+
+struct GNUNET_SET_ListenHandle *set_listener;
+
+struct GNUNET_SET_OperationHandle *set_oh1;
+struct GNUNET_SET_OperationHandle *set_oh2;
+
+
+int a_done;
+int b_done;
+
+
+
+static int
+map_remove_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GNUNET_CONTAINER_MultiHashMap *m = cls;
+ int ret;
+
+ ret = GNUNET_CONTAINER_multihashmap_remove (m, key, NULL);
+ GNUNET_assert (GNUNET_OK == ret);
+ return GNUNET_YES;
+
+}
+
+
+static void
+set_result_cb_1 (void *cls,
+ const struct GNUNET_SET_Element *element,
+ enum GNUNET_SET_Status status)
+{
+ GNUNET_assert (GNUNET_NO == a_done);
+ GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode));
+ switch (status)
+ {
+ case GNUNET_SET_STATUS_DONE:
+ case GNUNET_SET_STATUS_HALF_DONE:
+ a_done = GNUNET_YES;
+ GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, map_a_received);
+ GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_a_received));
+ return;
+ case GNUNET_SET_STATUS_FAILURE:
+ GNUNET_assert (0);
+ return;
+ case GNUNET_SET_STATUS_OK:
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+ GNUNET_CONTAINER_multihashmap_put (map_a_received,
+ element->data, NULL,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+}
+
+
+static void
+set_result_cb_2 (void *cls,
+ const struct GNUNET_SET_Element *element,
+ enum GNUNET_SET_Status status)
+{
+ GNUNET_assert (GNUNET_NO == b_done);
+ GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode));
+ switch (status)
+ {
+ case GNUNET_SET_STATUS_DONE:
+ case GNUNET_SET_STATUS_HALF_DONE:
+ b_done = GNUNET_YES;
+ GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, map_b_received);
+ GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_b_received));
+ return;
+ case GNUNET_SET_STATUS_FAILURE:
+ GNUNET_assert (0);
+ return;
+ case GNUNET_SET_STATUS_OK:
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+ GNUNET_CONTAINER_multihashmap_put (map_b_received,
+ element->data, NULL,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+}
+
+
+static void
+set_listen_cb (void *cls,
+ const struct GNUNET_PeerIdentity *other_peer,
+ const struct GNUNET_MessageHeader *context_msg,
+ struct GNUNET_SET_Request *request)
+{
+ GNUNET_assert (NULL == set_oh2);
+ set_oh2 = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
+ set_result_cb_2, NULL);
+ GNUNET_SET_commit (set_oh2, set_b);
+}
+
+
+
+static int
+set_insert_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GNUNET_SET_Handle *set = cls;
+ struct GNUNET_SET_Element *el;
+
+ el = GNUNET_malloc (sizeof *el + sizeof *key);
+ el->type = 0;
+ memcpy (&el[1], key, sizeof *key);
+ el->data = &el[1];
+ el->size = sizeof *key;
+ GNUNET_SET_add_element (set, el, NULL, NULL);
+ GNUNET_free (el);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Signature of the 'main' function for a (single-peer) testcase that
+ * is run using 'GNUNET_TESTING_peer_run'.
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer that was started
+ * @param peer identity of the peer that was created
+ */
+static void
+test_main (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
+ struct GNUNET_TESTING_Peer *peer)
+{
+ unsigned int i;
+ struct GNUNET_HashCode hash;
+
+ config = cfg;
+
+ if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &local_peer))
+ {
+ GNUNET_assert (0);
+ return;
+ }
+
+ map_a = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO);
+ map_b = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO);
+ map_c = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO);
+
+ for (i = 0; i < num_a; i++)
+ {
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
+ {
+ i--;
+ continue;
+ }
+ GNUNET_CONTAINER_multihashmap_put (map_a, &hash, &hash,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ }
+
+ for (i = 0; i < num_b; i++)
+ {
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
+ {
+ i--;
+ continue;
+ }
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash))
+ {
+ i--;
+ continue;
+ }
+ GNUNET_CONTAINER_multihashmap_put (map_b, &hash, NULL,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+ }
+
+ for (i = 0; i < num_c; i++)
+ {
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
+ {
+ i--;
+ continue;
+ }
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash))
+ {
+ i--;
+ continue;
+ }
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_c, &hash))
+ {
+ i--;
+ continue;
+ }
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
+ GNUNET_CONTAINER_multihashmap_put (map_c, &hash, NULL,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+ }
+
+ /* use last hash for app id */
+ app_id = hash;
+
+ /* FIXME: also implement intersection etc. */
+ set_a = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
+ set_b = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
+
+ GNUNET_CONTAINER_multihashmap_iterate (map_a, set_insert_iterator, set_a);
+ GNUNET_CONTAINER_multihashmap_iterate (map_b, set_insert_iterator, set_b);
+ GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_a);
+ GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_b);
+
+ set_listener = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
+ &app_id, set_listen_cb, NULL);
+
+ set_oh1 = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt, GNUNET_SET_RESULT_ADDED,
+ set_result_cb_1, NULL);
+ GNUNET_SET_commit (set_oh1, set_a);
+}
+
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+
+ ret = GNUNET_TESTING_peer_run ("test_set_api",
+ "test_set.conf",
+ &test_main, NULL);
+}
+
+
+int
+main (int argc, char **argv)
+{
+ static const struct GNUNET_GETOPT_CommandLineOption options[] = {
+ { 'A', "num-first", NULL,
+ gettext_noop ("number of values"),
+ GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_a },
+ { 'B', "num-second", NULL,
+ gettext_noop ("number of values"),
+ GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_b },
+ { 'B', "num-common", NULL,
+ gettext_noop ("number of values"),
+ GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_c },
+ { 'x', "operation", NULL,
+ gettext_noop ("oeration to execute"),
+ GNUNET_YES, &GNUNET_GETOPT_set_string, &op_str },
+ GNUNET_GETOPT_OPTION_END
+ };
+ GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus",
+ "help",
+ options, &run, NULL, GNUNET_YES);
+ return ret;
+}
+
+++ /dev/null
-/*
- This file is part of GNUnet
- (C) 2012 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
- */
-
-/**
- * @file set/gnunet-set.c
- * @brief profiling tool for the set service
- * @author Florian Dold
- */
-#include "platform.h"
-#include "gnunet_common.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_testbed_service.h"
-#include "gnunet_set_service.h"
-
-
-static struct GNUNET_PeerIdentity local_id;
-
-static struct GNUNET_HashCode app_id;
-static struct GNUNET_SET_Handle *set1;
-static struct GNUNET_SET_Handle *set2;
-static struct GNUNET_SET_ListenHandle *listen_handle;
-const static struct GNUNET_CONFIGURATION_Handle *config;
-
-int num_done;
-
-
-static void
-result_cb_set1 (void *cls, const struct GNUNET_SET_Element *element,
- enum GNUNET_SET_Status status)
-{
- switch (status)
- {
- case GNUNET_SET_STATUS_OK:
- printf ("set 1: got element\n");
- break;
- case GNUNET_SET_STATUS_FAILURE:
- printf ("set 1: failure\n");
- break;
- case GNUNET_SET_STATUS_DONE:
- printf ("set 1: done\n");
- GNUNET_SET_destroy (set1);
- break;
- default:
- GNUNET_assert (0);
- }
-}
-
-
-static void
-result_cb_set2 (void *cls, const struct GNUNET_SET_Element *element,
- enum GNUNET_SET_Status status)
-{
- switch (status)
- {
- case GNUNET_SET_STATUS_OK:
- printf ("set 2: got element\n");
- break;
- case GNUNET_SET_STATUS_FAILURE:
- printf ("set 2: failure\n");
- break;
- case GNUNET_SET_STATUS_DONE:
- printf ("set 2: done\n");
- GNUNET_SET_destroy (set2);
- break;
- default:
- GNUNET_assert (0);
- }
-}
-
-
-static void
-listen_cb (void *cls,
- const struct GNUNET_PeerIdentity *other_peer,
- const struct GNUNET_MessageHeader *context_msg,
- struct GNUNET_SET_Request *request)
-{
- struct GNUNET_SET_OperationHandle *oh;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
- GNUNET_SET_listen_cancel (listen_handle);
-
- oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
- GNUNET_SET_conclude (oh, set2);
-}
-
-
-/**
- * Start the set operation.
- *
- * @param cls closure, unused
- */
-static void
-start (void *cls)
-{
- struct GNUNET_SET_OperationHandle *oh;
-
- listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
- &app_id, listen_cb, NULL);
- oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
- GNUNET_SET_RESULT_ADDED,
- result_cb_set1, NULL);
- GNUNET_SET_conclude (oh, set1);
-}
-
-
-/**
- * Initialize the second set, continue
- *
- * @param cls closure, unused
- */
-static void
-init_set2 (void *cls)
-{
- struct GNUNET_SET_Element element;
-
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
-
- element.data = "hello";
- element.size = strlen(element.data);
- GNUNET_SET_add_element (set2, &element, NULL, NULL);
- element.data = "quux";
- element.size = strlen(element.data);
- GNUNET_SET_add_element (set2, &element, start, NULL);
-}
-
-
-/**
- * Initialize the first set, continue.
- */
-static void
-init_set1 (void)
-{
- struct GNUNET_SET_Element element;
-
- element.data = "hello";
- element.size = strlen(element.data);
- GNUNET_SET_add_element (set1, &element, NULL, NULL);
- element.data = "bar";
- element.size = strlen(element.data);
- GNUNET_SET_add_element (set1, &element, init_set2, NULL);
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
-}
-
-
-/**
- * Main function that will be run.
- *
- * @param cls closure
- * @param args remaining command-line arguments
- * @param cfgfile name of the configuration file used (for saving, can be NULL!)
- * @param cfg configuration
- */
-static void
-run (void *cls, char *const *args,
- const char *cfgfile,
- const struct GNUNET_CONFIGURATION_Handle *cfg)
-{
- static const char* app_str = "gnunet-set";
-
- config = cfg;
-
- GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id);
-
- GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
-
- set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
- set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
-
- init_set1 ();
-}
-
-
-
-int
-main (int argc, char **argv)
-{
- static const struct GNUNET_GETOPT_CommandLineOption options[] = {
- GNUNET_GETOPT_OPTION_END
- };
- GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set",
- "help",
- options, &run, NULL, GNUNET_NO);
- return 0;
-}
-
*/
/**
- * @file consensus/ibf.c
+ * @file set/ibf.c
* @brief implementation of the invertible bloom filter
* @author Florian Dold
*/
#include "ibf.h"
+/**
+ * Compute the key's hash from the key.
+ * Redefine to use a different hash function.
+ */
+#define IBF_KEY_HASH_VAL(k) (GNUNET_CRYPTO_crc32_n (&(k), sizeof (struct IBF_KeyHash)))
+
/**
* Create a key from a hashcode.
*
ibf_get_indices (const struct InvertibleBloomFilter *ibf,
struct IBF_Key key, int *dst)
{
- struct GNUNET_HashCode bucket_indices;
- unsigned int filled;
- int i;
- GNUNET_CRYPTO_hash (&key, sizeof key, &bucket_indices);
- filled = 0;
- for (i = 0; filled < ibf->hash_num; i++)
+ uint32_t filled;
+ uint32_t i;
+ uint32_t bucket = key.key_val & 0xFFFFFFFF;
+
+ for (i = 0, filled=0; filled < ibf->hash_num; i++)
{
- unsigned int bucket;
unsigned int j;
- if ( (0 != i) && (0 == (i % 16)) )
- GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), &bucket_indices);
- bucket = bucket_indices.bits[i % 16] % ibf->size;
+ uint64_t x;
for (j = 0; j < filled; j++)
if (dst[j] == bucket)
goto try_next;
- dst[filled++] = bucket;
+ dst[filled++] = bucket % ibf->size;
try_next: ;
+ x = ((uint64_t) bucket << 32) | i;
+ bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x);
}
}
const int *buckets, int side)
{
int i;
- struct GNUNET_HashCode key_hash_sha;
- struct IBF_KeyHash key_hash;
- GNUNET_CRYPTO_hash (&key, sizeof key, &key_hash_sha);
- key_hash.key_hash_val = key_hash_sha.bits[0];
+
for (i = 0; i < ibf->hash_num; i++)
{
const int bucket = buckets[i];
ibf->count[bucket].count_val += side;
ibf->key_sum[bucket].key_val ^= key.key_val;
- ibf->key_hash_sum[bucket].key_hash_val ^= key_hash.key_hash_val;
+ ibf->key_hash_sum[bucket].key_hash_val
+ ^= IBF_KEY_HASH_VAL (key);
}
}
{
struct IBF_KeyHash hash;
int i;
- struct GNUNET_HashCode key_hash_sha;
int buckets[ibf->hash_num];
GNUNET_assert (NULL != ibf);
if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val))
continue;
- GNUNET_CRYPTO_hash (&ibf->key_sum[i], sizeof (struct IBF_Key), &key_hash_sha);
- hash.key_hash_val = key_hash_sha.bits[0];
+ hash.key_hash_val = IBF_KEY_HASH_VAL (ibf->key_sum[i]);
/* test if the hash matches the key */
if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val)
*/
/**
- * @file consensus/ibf.h
+ * @file set/ibf.h
* @brief invertible bloom filter
* @author Florian Dold
*/
struct GNUNET_SET_Handle
{
struct GNUNET_CLIENT_Connection *client;
- struct GNUNET_MQ_MessageQueue *mq;
+ struct GNUNET_MQ_Handle *mq;
unsigned int messages_since_ack;
};
* Message sent to the server on calling conclude,
* NULL if conclude has been called.
*/
- struct GNUNET_MQ_Message *conclude_mqm;
+ struct GNUNET_MQ_Envelope *conclude_mqm;
/**
* Address of the request if in the conclude message,
struct GNUNET_SET_ListenHandle
{
struct GNUNET_CLIENT_Connection *client;
- struct GNUNET_MQ_MessageQueue* mq;
+ struct GNUNET_MQ_Handle* mq;
GNUNET_SET_ListenCallback listen_cb;
void *listen_cls;
};
if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK);
GNUNET_MQ_send (set->mq, mqm);
}
if (GNUNET_NO == req->accepted)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_AcceptRejectMessage *amsg;
mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT);
enum GNUNET_SET_OperationType op)
{
struct GNUNET_SET_Handle *set;
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_CreateMessage *msg;
- static const struct GNUNET_MQ_Handler mq_handlers[] = {
+ static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
{handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT},
GNUNET_MQ_HANDLERS_END
};
GNUNET_SET_Continuation cont,
void *cont_cls)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_ElementMessage *msg;
mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD);
GNUNET_SET_Continuation cont,
void *cont_cls)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_ElementMessage *msg;
mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE);
/**
- * Create a set operation for evaluation with another peer.
+ * Prepare a set operation to be evaluated with another peer.
* The evaluation will not start until the client provides
- * a local set with GNUNET_SET_conclude.
+ * a local set with GNUNET_SET_commit.
*
* @param other_peer peer with the other set
* @param app_id hash for the application using the set
* @return a handle to cancel the operation
*/
struct GNUNET_SET_OperationHandle *
-GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer,
- const struct GNUNET_HashCode *app_id,
- const struct GNUNET_MessageHeader *context_msg,
- uint16_t salt,
- enum GNUNET_SET_ResultMode result_mode,
- GNUNET_SET_ResultIterator result_cb,
- void *result_cls)
+GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
+ const struct GNUNET_HashCode *app_id,
+ const struct GNUNET_MessageHeader *context_msg,
+ uint16_t salt,
+ enum GNUNET_SET_ResultMode result_mode,
+ GNUNET_SET_ResultIterator result_cb,
+ void *result_cls)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_OperationHandle *oh;
struct GNUNET_SET_EvaluateMessage *msg;
mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, context_msg);
- if (NULL != context_msg)
- LOG (GNUNET_ERROR_TYPE_INFO, "passed context msg\n");
-
msg->app_id = *app_id;
msg->target_peer = *other_peer;
msg->salt = salt;
void *listen_cls)
{
struct GNUNET_SET_ListenHandle *lh;
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_ListenMessage *msg;
- static const struct GNUNET_MQ_Handler mq_handlers[] = {
+ static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
{handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
GNUNET_MQ_HANDLERS_END
};
* @param result_mode specified how results will be returned,
* see 'GNUNET_SET_ResultMode'.
* @param result_cb callback for the results
- * @param result_cls closure for result_cb
+ * @param cls closure for result_cb
* @return a handle to cancel the operation
*/
struct GNUNET_SET_OperationHandle *
GNUNET_SET_ResultIterator result_cb,
void *cls)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_OperationHandle *oh;
struct GNUNET_SET_AcceptRejectMessage *msg;
void
GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_OperationHandle *h_assoc;
if (NULL != oh->set)
/**
- * Conclude the given set operation using the given set.
+ * Commit a set to be used with a set operation.
* This function is called once we have fully constructed
* the set that we want to use for the operation. At this
* time, the P2P protocol can then begin to exchange the
* @param set the set to use for the operation
*/
void
-GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh,
+GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
struct GNUNET_SET_Handle *set)
{
GNUNET_assert (NULL == oh->set);
GNUNET_assert (NULL != oh->conclude_mqm);
oh->set = set;
- oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, NULL, oh);
+ oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, oh);
*oh->request_id_addr = htonl (oh->request_id);
GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm);
oh->conclude_mqm = NULL;
*/
/**
- * @file consensus/ibf.h
+ * @file set/ibf.h
* @brief invertible bloom filter
* @author Florian Dold
*/
*/
/**
- * @file consensus/strata_estimator.h
+ * @file set/strata_estimator.h
* @brief estimator of set difference
* @author Florian Dold
*/
GNUNET_SET_listen_cancel (listen_handle);
oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
- GNUNET_SET_conclude (oh, set2);
+ GNUNET_SET_commit (oh, set2);
}
listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
&app_id, listen_cb, NULL);
- oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
- GNUNET_SET_RESULT_ADDED,
- result_cb_set1, NULL);
- GNUNET_SET_conclude (oh, set1);
+ oh = GNUNET_SET_prepare (&local_id, &app_id, NULL, 42,
+ GNUNET_SET_RESULT_ADDED,
+ result_cb_set1, NULL);
+ GNUNET_SET_commit (oh, set1);
}
* @param size the number of bytes written
*/
static void
-mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status,
+ size_t size)
{
- struct GNUNET_MQ_MessageQueue *mq = cls;
- struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Handle *mq = cls;
+ struct MQStreamState *mss = GNUNET_MQ_impl_state (mq);
switch (status)
{
/* FIXME: call shutdown handler */
return;
case GNUNET_STREAM_TIMEOUT:
- if (NULL == mq->error_handler)
- LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n");
- else
- mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
+ GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT);
return;
case GNUNET_STREAM_SYSERR:
- if (NULL == mq->error_handler)
- LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n");
- else
- mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE);
+ GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
return;
default:
GNUNET_assert (0);
return;
}
-
- /* call cb for message we finished sending */
- mqm = mq->current_msg;
- GNUNET_assert (NULL != mq->current_msg);
- if (NULL != mqm->sent_cb)
- mqm->sent_cb (mqm->sent_cls);
- GNUNET_free (mqm);
mss->wh = NULL;
- mqm = mq->msg_head;
- mq->current_msg = mqm;
- if (NULL == mqm)
- return;
- GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
- mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- mq_stream_write_queued, mq);
- GNUNET_assert (NULL != mss->wh);
+ GNUNET_MQ_impl_send_continue (mq);
}
static void
-mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq,
- struct GNUNET_MQ_Message *mqm)
+mq_stream_send_impl (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg, void *impl_state)
{
- struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
+ struct MQStreamState *mss = impl_state;
- if (NULL != mq->current_msg)
- {
- GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
- return;
- }
- mq->current_msg = mqm;
- mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
+ /* no way to cancel sending now */
+ GNUNET_MQ_impl_send_commit (mq);
+
+ mss->wh = GNUNET_STREAM_write (mss->socket, msg, ntohs (msg->size),
GNUNET_TIME_UNIT_FOREVER_REL,
mq_stream_write_queued, mq);
}
*/
static int
mq_stream_mst_callback (void *cls, void *client,
- const struct GNUNET_MessageHeader *message)
+ const struct GNUNET_MessageHeader *message)
{
- struct GNUNET_MQ_MessageQueue *mq = cls;
+ struct GNUNET_MQ_Handle *mq = cls;
GNUNET_assert (NULL != message);
- GNUNET_MQ_dispatch (mq, message);
+ GNUNET_MQ_inject_message (mq, message);
return GNUNET_OK;
}
const void *data,
size_t size)
{
- struct GNUNET_MQ_MessageQueue *mq = cls;
- struct MQStreamState *mss;
+ struct GNUNET_MQ_Handle *mq = cls;
+ struct MQStreamState *mss = GNUNET_MQ_impl_state (mq);
int ret;
switch (status)
/* FIXME: call shutdown handler */
return 0;
case GNUNET_STREAM_TIMEOUT:
- if (NULL == mq->error_handler)
- LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n");
- else
- mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
+ GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT);
return 0;
case GNUNET_STREAM_SYSERR:
- if (NULL == mq->error_handler)
- LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n");
- else
- mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+ GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
return 0;
default:
GNUNET_assert (0);
return 0;
}
- mss = (struct MQStreamState *) mq->impl_state;
- GNUNET_assert (GNUNET_STREAM_OK == status);
ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
if (GNUNET_OK != ret)
{
- if (NULL == mq->error_handler)
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "read error (message stream malformed), but no error handler installed for message queue\n");
- else
- mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+ GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
return 0;
}
- /* we always read all data */
mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
mq_stream_data_processor, mq);
+ /* we always read all data */
return size;
}
static void
-mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
+mq_stream_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
{
- struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
+ struct MQStreamState *mss = impl_state;
if (NULL != mss->rh)
{
* @param error_handler callback for errors
* @return the message queue for the socket
*/
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket,
- const struct GNUNET_MQ_Handler *msg_handlers,
+ const struct GNUNET_MQ_MessageHandler *msg_handlers,
GNUNET_MQ_ErrorHandler error_handler,
void *cls)
{
- struct GNUNET_MQ_MessageQueue *mq;
+ struct GNUNET_MQ_Handle *mq;
struct MQStreamState *mss;
- mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
mss = GNUNET_new (struct MQStreamState);
mss->socket = socket;
- mq->impl_state = mss;
- mq->send_impl = mq_stream_send_impl;
- mq->destroy_impl = mq_stream_destroy_impl;
- mq->handlers = msg_handlers;
- mq->handlers_cls = cls;
- mq->error_handler = error_handler;
+ mq = GNUNET_MQ_queue_for_callbacks (mq_stream_send_impl,
+ mq_stream_destroy_impl,
+ NULL,
+ mss, msg_handlers, error_handler, cls);
if (NULL != msg_handlers)
{
mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);
#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
+struct GNUNET_MQ_Envelope
+{
+ /**
+ * Messages are stored in a linked list.
+ * Each queue has its own list of envelopes.
+ */
+ struct GNUNET_MQ_Envelope *next;
+
+ /**
+ * Messages are stored in a linked list
+ * Each queue has its own list of envelopes.
+ */
+ struct GNUNET_MQ_Envelope *prev;
+
+ /**
+ * Actual allocated message header,
+ * usually points to the end of the containing GNUNET_MQ_Envelope
+ */
+ struct GNUNET_MessageHeader *mh;
+
+ /**
+ * Queue the message is queued in, NULL if message is not queued.
+ */
+ struct GNUNET_MQ_Handle *parent_queue;
+
+ /**
+ * Called after the message was sent irrevocably.
+ */
+ GNUNET_MQ_NotifyCallback sent_cb;
+
+ /**
+ * Closure for send_cb
+ */
+ void *sent_cls;
+};
+
+
+/**
+ * Handle to a message queue.
+ */
+struct GNUNET_MQ_Handle
+{
+ /**
+ * Handlers array, or NULL if the queue should not receive messages
+ */
+ const struct GNUNET_MQ_MessageHandler *handlers;
+
+ /**
+ * Closure for the handler callbacks,
+ * as well as for the error handler.
+ */
+ void *handlers_cls;
+
+ /**
+ * Actual implementation of message sending,
+ * called when a message is added
+ */
+ GNUNET_MQ_SendImpl send_impl;
+
+ /**
+ * Implementation-dependent queue destruction function
+ */
+ GNUNET_MQ_DestroyImpl destroy_impl;
+
+ /**
+ * Implementation-specific state
+ */
+ void *impl_state;
+
+ /**
+ * Callback will be called when an error occurs.
+ */
+ GNUNET_MQ_ErrorHandler error_handler;
+
+ /**
+ * Linked list of messages pending to be sent
+ */
+ struct GNUNET_MQ_Envelope *envelope_head;
+
+ /**
+ * Linked list of messages pending to be sent
+ */
+ struct GNUNET_MQ_Envelope *envelope_tail;
+
+ /**
+ * Message that is currently scheduled to be
+ * sent. Not the head of the message queue, as the implementation
+ * needs to know if sending has been already scheduled or not.
+ */
+ struct GNUNET_MQ_Envelope *current_envelope;
+
+ /**
+ * Has the current envelope been commited?
+ * Either GNUNET_YES or GNUNET_NO.
+ */
+ int commited;
+
+ /**
+ * Map of associations, lazily allocated
+ */
+ struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
+
+ /**
+ * Next id that should be used for the assoc_map,
+ * initialized lazily to a random value together with
+ * assoc_map
+ */
+ uint32_t assoc_id;
+};
+
+
+
struct ServerClientSocketState
{
struct ClientConnectionState
{
/**
- * Did we call receive?
+ * Did we call receive alread alreadyy?
*/
int receive_active;
+
+ /**
+ * Do we also want to receive?
+ */
+ int receive_requested;
struct GNUNET_CLIENT_Connection *connection;
struct GNUNET_CLIENT_TransmitHandle *th;
};
* @param mh message to dispatch
*/
void
-GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
+GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
{
- const struct GNUNET_MQ_Handler *handler;
+ const struct GNUNET_MQ_MessageHandler *handler;
int handled = GNUNET_NO;
handler = mq->handlers;
}
+/**
+ * Call the right callback for an error condition.
+ *
+ * @param mq message queue
+ */
void
-GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
+GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
+ enum GNUNET_MQ_Error error)
+{
+ if (NULL == mq->error_handler)
+ {
+ /* FIXME: log what kind of error occured */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n");
+ return;
+ }
+ mq->error_handler (mq->handlers_cls, error);
+}
+
+
+void
+GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
{
GNUNET_assert (NULL == mqm->parent_queue);
GNUNET_free (mqm);
* May only be called once per message.
*
* @param mq message queue
- * @param mqm the message to send.
+ * @param ev the message to send.
+ */
+void
+GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
+{
+ GNUNET_assert (NULL != mq);
+ GNUNET_assert (NULL == ev->parent_queue);
+
+ /* is the implementation busy? queue it! */
+ if (NULL != mq->current_envelope)
+ {
+ GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
+ return;
+ }
+ mq->current_envelope = ev;
+ mq->send_impl (mq, ev->mh, mq->impl_state);
+}
+
+
+/**
+ * Call the send implementation for the next queued message,
+ * if any.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue to send the next message with
*/
void
-GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
+GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
{
+ /* call is only valid if we're actually currently sending
+ * a message */
GNUNET_assert (NULL != mq);
- mq->send_impl (mq, mqm);
+ GNUNET_assert (NULL != mq->current_envelope);
+ GNUNET_assert (GNUNET_YES == mq->commited);
+ mq->commited = GNUNET_NO;
+ GNUNET_free (mq->current_envelope);
+ if (NULL == mq->envelope_head)
+ {
+ mq->current_envelope = NULL;
+ return;
+ }
+
+
+ GNUNET_assert (NULL != mq->envelope_tail);
+ GNUNET_assert (NULL != mq->envelope_head);
+ mq->current_envelope = mq->envelope_head;
+ GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
+ mq->current_envelope);
+ mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
}
-struct GNUNET_MQ_Message *
+/**
+ * Create a message queue for the specified handlers.
+ *
+ * @param send function the implements sending messages
+ * @param destroy function that implements destroying the queue
+ * @param destroy function that implements canceling a message
+ * @param state for the queue, passed to 'send' and 'destroy'
+ * @param handlers array of message handlers
+ * @param error_handler handler for read and write errors
+ * @return a new message queue
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
+ GNUNET_MQ_DestroyImpl destroy,
+ GNUNET_MQ_CancelImpl cancel,
+ void *impl_state,
+ const struct GNUNET_MQ_MessageHandler *handlers,
+ GNUNET_MQ_ErrorHandler error_handler,
+ void *cls)
+{
+ struct GNUNET_MQ_Handle *mq;
+
+ mq = GNUNET_new (struct GNUNET_MQ_Handle);
+ mq->send_impl = send;
+ mq->destroy_impl = destroy;
+ mq->handlers = handlers;
+ mq->handlers_cls = cls;
+ mq->impl_state = impl_state;
+
+ return mq;
+}
+
+
+/**
+ * Get the message that should currently be sent.
+ * Fails if there is no current message.
+ * Only useful for implementing message queues,
+ * results in undefined behavior if not used carefully.
+ *
+ * @param mq message queue with the current message
+ * @return message to send, never NULL
+ */
+const struct GNUNET_MessageHeader *
+GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
+{
+ if (NULL == mq->current_envelope)
+ GNUNET_abort ();
+ if (NULL == mq->current_envelope->mh)
+ GNUNET_abort ();
+ return mq->current_envelope->mh;
+}
+
+
+/**
+ * Get the implementation state associated with the
+ * message queue.
+ *
+ * While the GNUNET_MQ_Impl* callbacks receive the
+ * implementation state, continuations that are scheduled
+ * by the implementation function often only have one closure
+ * argument, with this function it is possible to get at the
+ * implementation state when only passing the GNUNET_MQ_Handle
+ * as closure.
+ *
+ * @param mq message queue with the current message
+ * @return message to send, never NULL
+ */
+void *
+GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
+{
+ return mq->impl_state;
+}
+
+
+
+/**
+ * Mark the current message as irrevocably sent, but do not
+ * proceed with sending the next message.
+ * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
+ *
+ * @param mq message queue
+ */
+void
+GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
+{
+ GNUNET_assert (NULL != mq->current_envelope);
+ GNUNET_assert (GNUNET_NO == mq->commited);
+ mq->commited = GNUNET_YES;
+ if (NULL != mq->current_envelope->sent_cb)
+ mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
+}
+
+
+struct GNUNET_MQ_Envelope *
GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
mqm = GNUNET_malloc (sizeof *mqm + size);
mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
}
-struct GNUNET_MQ_Message *
+struct GNUNET_MQ_Envelope *
GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
const struct GNUNET_MessageHeader *nested_mh)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
uint16_t size;
if (NULL == nested_mh)
transmit_queued (void *cls, size_t size,
void *buf)
{
- struct GNUNET_MQ_MessageQueue *mq = cls;
- struct GNUNET_MQ_Message *mqm = mq->current_msg;
- struct ServerClientSocketState *state = mq->impl_state;
+ struct GNUNET_MQ_Handle *mq = cls;
+ struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
+ const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
size_t msg_size;
GNUNET_assert (NULL != buf);
- if (NULL != mqm->sent_cb)
- {
- mqm->sent_cb (mqm->sent_cls);
- }
-
- mq->current_msg = NULL;
- GNUNET_assert (NULL != mqm);
- msg_size = ntohs (mqm->mh->size);
+ msg_size = ntohs (msg->size);
GNUNET_assert (size >= msg_size);
- memcpy (buf, mqm->mh, msg_size);
- GNUNET_free (mqm);
+ memcpy (buf, msg, msg_size);
state->th = NULL;
- if (NULL != mq->msg_head)
- {
- mq->current_msg = mq->msg_head;
- GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
- state->th =
- GNUNET_SERVER_notify_transmit_ready (state->client, msg_size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_queued, mq);
- }
+ GNUNET_MQ_impl_send_continue (mq);
+
return msg_size;
}
static void
-server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
+server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
+ void *impl_state)
{
- struct ServerClientSocketState *state;
+ struct ServerClientSocketState *state = impl_state;
GNUNET_assert (NULL != mq);
- state = mq->impl_state;
GNUNET_assert (NULL != state);
GNUNET_SERVER_client_drop (state->client);
GNUNET_free (state);
}
static void
-server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
+server_client_send_impl (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg, void *impl_state)
{
- struct ServerClientSocketState *state;
- int msize;
+ struct ServerClientSocketState *state = impl_state;
GNUNET_assert (NULL != mq);
- state = mq->impl_state;
GNUNET_assert (NULL != state);
- if (NULL != state->th)
- {
- GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
- return;
- }
- GNUNET_assert (NULL == mq->msg_head);
- GNUNET_assert (NULL == mq->current_msg);
- msize = ntohs (mqm->mh->size);
- mq->current_msg = mqm;
+ GNUNET_MQ_impl_send_commit (mq);
+
state->th =
- GNUNET_SERVER_notify_transmit_ready (state->client, msize,
+ GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
GNUNET_TIME_UNIT_FOREVER_REL,
&transmit_queued, mq);
}
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
{
- struct GNUNET_MQ_MessageQueue *mq;
+ struct GNUNET_MQ_Handle *mq;
struct ServerClientSocketState *scss;
- mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
+ mq = GNUNET_new (struct GNUNET_MQ_Handle);
scss = GNUNET_new (struct ServerClientSocketState);
mq->impl_state = scss;
scss->client = client;
handle_client_message (void *cls,
const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_MQ_MessageQueue *mq = cls;
+ struct GNUNET_MQ_Handle *mq = cls;
struct ClientConnectionState *state;
state = mq->impl_state;
if (NULL == msg)
{
- if (NULL == mq->error_handler)
- LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
- else
- mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+ GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
return;
}
GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
GNUNET_TIME_UNIT_FOREVER_REL);
- GNUNET_MQ_dispatch (mq, msg);
+ GNUNET_MQ_inject_message (mq, msg);
}
connection_client_transmit_queued (void *cls, size_t size,
void *buf)
{
- struct GNUNET_MQ_MessageQueue *mq = cls;
- struct GNUNET_MQ_Message *mqm = mq->current_msg;
+ struct GNUNET_MQ_Handle *mq = cls;
+ const struct GNUNET_MessageHeader *msg;
struct ClientConnectionState *state = mq->impl_state;
size_t msg_size;
+ GNUNET_assert (NULL != mq);
+ msg = GNUNET_MQ_impl_current (mq);
+
if (NULL == buf)
{
- if (NULL == mq->error_handler)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n");
- return 0;
- }
- mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+ GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
return 0;
}
- if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active))
+ if ( (GNUNET_YES == state->receive_requested) &&
+ (GNUNET_NO == state->receive_active) )
{
state->receive_active = GNUNET_YES;
GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
}
- GNUNET_assert (NULL != mqm);
-
- if (NULL != mqm->sent_cb)
- {
- mqm->sent_cb (mqm->sent_cls);
- }
-
- mq->current_msg = NULL;
- GNUNET_assert (NULL != buf);
- msg_size = ntohs (mqm->mh->size);
+ msg_size = ntohs (msg->size);
GNUNET_assert (size >= msg_size);
- memcpy (buf, mqm->mh, msg_size);
- GNUNET_free (mqm);
+ memcpy (buf, msg, msg_size);
state->th = NULL;
- if (NULL != mq->msg_head)
- {
- mq->current_msg = mq->msg_head;
- GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
- state->th =
- GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size),
- GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
- &connection_client_transmit_queued, mq);
- }
+
+ GNUNET_MQ_impl_send_continue (mq);
+
return msg_size;
}
static void
-connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
+connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
{
- GNUNET_free (mq->impl_state);
+ GNUNET_free (impl_state);
}
static void
-connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
- struct GNUNET_MQ_Message *mqm)
+connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg, void *impl_state)
{
- struct ClientConnectionState *state = mq->impl_state;
- int msize;
+ struct ClientConnectionState *state = impl_state;
GNUNET_assert (NULL != state);
+ GNUNET_assert (NULL == state->th);
+
+ GNUNET_MQ_impl_send_commit (mq);
- if (NULL != state->th)
- {
- GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
- return;
- }
- GNUNET_assert (NULL == mq->current_msg);
- mq->current_msg = mqm;
- msize = ntohs (mqm->mh->size);
state->th =
- GNUNET_CLIENT_notify_transmit_ready (state->connection, msize,
+ GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
&connection_client_transmit_queued, mq);
}
-
-
-
-struct GNUNET_MQ_MessageQueue *
+struct GNUNET_MQ_Handle *
GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
- const struct GNUNET_MQ_Handler *handlers,
+ const struct GNUNET_MQ_MessageHandler *handlers,
void *cls)
{
- struct GNUNET_MQ_MessageQueue *mq;
+ struct GNUNET_MQ_Handle *mq;
struct ClientConnectionState *state;
GNUNET_assert (NULL != connection);
- mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
+ mq = GNUNET_new (struct GNUNET_MQ_Handle);
mq->handlers = handlers;
mq->handlers_cls = cls;
state = GNUNET_new (struct ClientConnectionState);
mq->impl_state = state;
mq->send_impl = connection_client_send_impl;
mq->destroy_impl = connection_client_destroy_impl;
+ if (NULL != handlers)
+ state->receive_requested = GNUNET_YES;
return mq;
}
void
-GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
- const struct GNUNET_MQ_Handler *new_handlers,
+GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MQ_MessageHandler *new_handlers,
void *cls)
{
+ /* FIXME: notify implementation? */
+ /* FIXME: what about NULL handlers? abort receive? */
mq->handlers = new_handlers;
mq->handlers_cls = cls;
}
* @param assoc_data to associate
*/
uint32_t
-GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
- struct GNUNET_MQ_Message *mqm,
+GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
void *assoc_data)
{
uint32_t id;
void *
-GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
+GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
{
if (NULL == mq->assoc_map)
return NULL;
void *
-GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
+GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
{
void *val;
void
-GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
+GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
GNUNET_MQ_NotifyCallback cb,
void *cls)
{
void
-GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
+GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
{
/* FIXME: destroy all pending messages in the queue */
if (NULL != mq->destroy_impl)
{
- mq->destroy_impl (mq);
+ mq->destroy_impl (mq, mq->impl_state);
}
GNUNET_free (mq);
-
struct GNUNET_MessageHeader *
GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
{
void
test1 (void)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct MyMessage *mm;
mm = NULL;
void
test2 (void)
{
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_MessageHeader *mh;
mqm = GNUNET_MQ_msg_header (42);
return;
}
+ /* can happen if notify does not work */
+ GNUNET_assert (received < 2);
+
GNUNET_SERVER_receive_done (argclient, GNUNET_YES);
}
void send_cb (void *cls)
{
+ /* the notify should only be called once */
+ GNUNET_assert (GNUNET_NO == notify);
printf ("notify sent\n");
notify = GNUNET_YES;
}
void test_mq (struct GNUNET_CLIENT_Connection *client)
{
- struct GNUNET_MQ_MessageQueue *mq;
- struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MQ_Handle *mq;
+ struct GNUNET_MQ_Envelope *mqm;
/* FIXME: test handling responses */
mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL);