/*
* This file is part of GNUnet
- * (C) 2013 Christian Grothoff (and other contributing authors)
+ * Copyright (C) 2013 GNUnet e.V.
*
* GNUnet is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published
*
* You should have received a copy of the GNU General Public License
* along with GNUnet; see the file COPYING. If not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
*/
/**
uint16_t size;
/**
- * @see enum MessageState
+ * Type of first message part.
*/
- uint8_t state;
+ uint16_t first_ptype;
/**
- * Whether a message ACK has already been sent to the client.
- * #GNUNET_YES or #GNUNET_NO
+ * Type of last message part.
*/
- uint8_t ack_sent;
+ uint16_t last_ptype;
/* Followed by message */
};
*/
uint8_t state;
+ /**
+ * Whether the state is already modified in PSYCstore.
+ */
+ uint8_t state_is_modified;
+
/**
* Is the message queued for delivery to the client?
* i.e. added to the recv_msgs queue
/**
* List of connected clients.
*/
-struct ClientListItem
+struct Client
{
- struct ClientListItem *prev;
- struct ClientListItem *next;
+ struct Client *prev;
+ struct Client *next;
+
struct GNUNET_SERVER_Client *client;
};
+struct Operation
+{
+ struct Operation *prev;
+ struct Operation *next;
+
+ struct GNUNET_SERVER_Client *client;
+ struct Channel *chn;
+ uint64_t op_id;
+ uint32_t flags;
+};
+
+
/**
* Common part of the client context for both a channel master and slave.
*/
struct Channel
{
- struct ClientListItem *clients_head;
- struct ClientListItem *clients_tail;
+ struct Client *clients_head;
+ struct Client *clients_tail;
+
+ struct Operation *op_head;
+ struct Operation *op_tail;
struct TransmitMessage *tmit_head;
struct TransmitMessage *tmit_tail;
*/
uint32_t tmit_mod_value_size;
- /**
- * @see enum MessageState
- */
- uint8_t tmit_state;
-
- /**
- * FIXME: needed?
- */
- uint8_t in_transmit;
-
/**
* Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
*/
/**
* Incoming join requests from multicast.
- * member_key -> struct GNUNET_MULTICAST_JoinHandle *
+ * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
*/
struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
* Maximum request ID for this channel.
*/
uint64_t max_request_id;
+
+ /**
+ * Join flags.
+ */
+ enum GNUNET_PSYC_SlaveJoinFlags join_flags;
};
static void
transmit_message (struct Channel *chn);
+static uint64_t
+message_queue_run (struct Channel *chn);
static uint64_t
message_queue_drop (struct Channel *chn);
+static void
+schedule_transmit_message (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Channel *chn = cls;
+ transmit_message (chn);
+}
+
+
/**
* Task run during shutdown.
*
}
+static struct Operation *
+op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
+ uint64_t op_id, uint32_t flags)
+{
+ struct Operation *op = GNUNET_malloc (sizeof (*op));
+ op->client = client;
+ op->chn = chn;
+ op->op_id = op_id;
+ op->flags = flags;
+ GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
+ return op;
+}
+
+
+static void
+op_remove (struct Operation *op)
+{
+ GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
+ GNUNET_free (op);
+}
+
+
/**
* Clean up master data structures after a client disconnected.
*/
if (NULL != mst->origin)
GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
- GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
+ GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
}
GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
slv->member = NULL;
}
- GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
+ GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
}
static void
cleanup_channel (struct Channel *chn)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Cleaning up channel %s. master? %u\n",
+ chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
message_queue_drop (chn);
- GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
+ GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
+ chn->recv_frags = NULL;
if (NULL != chn->store_op)
{
chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
GNUNET_h2s (&chn->pub_key_hash));
- struct ClientListItem *cli = chn->clients_head;
+ struct Client *cli = chn->clients_head;
while (NULL != cli)
{
if (cli->client == client)
cli = cli->next;
}
+ struct Operation *op = chn->op_head;
+ while (NULL != op)
+ {
+ if (op->client == client)
+ {
+ op->client = NULL;
+ break;
+ }
+ op = op->next;
+ }
+
if (NULL == chn->clients_head)
{ /* Last client disconnected. */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Last client (%s) disconnected from channel %s\n",
+ chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
+ GNUNET_h2s (&chn->pub_key_hash));
+ chn->is_disconnected = GNUNET_YES;
if (NULL != chn->tmit_head)
{ /* Send pending messages to multicast before cleanup. */
transmit_message (chn);
client_send_msg (const struct Channel *chn,
const struct GNUNET_MessageHeader *msg)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Sending message to clients.\n", chn);
- struct ClientListItem *cli = chn->clients_head;
+ struct Client *cli = chn->clients_head;
while (NULL != cli)
{
GNUNET_SERVER_notification_context_add (nc, cli->client);
}
+/**
+ * Send a result code back to the client.
+ *
+ * @param client
+ * Client that should receive the result code.
+ * @param result_code
+ * Code to transmit.
+ * @param op_id
+ * Operation ID in network byte order.
+ * @param data
+ * Data payload or NULL.
+ * @param data_size
+ * Size of @a data.
+ */
+static void
+client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
+ int64_t result_code, const void *data, uint16_t data_size)
+{
+ struct GNUNET_OperationResultMessage *res;
+
+ res = GNUNET_malloc (sizeof (*res) + data_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
+ res->header.size = htons (sizeof (*res) + data_size);
+ res->result_code = GNUNET_htonll (result_code);
+ res->op_id = op_id;
+ if (0 < data_size)
+ memcpy (&res[1], data, data_size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Sending result to client for operation #%" PRIu64 ": "
+ "%" PRId64 " (size: %u)\n",
+ client, GNUNET_ntohll (op_id), result_code, data_size);
+
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
+ GNUNET_NO);
+ GNUNET_free (res);
+}
+
+
/**
* Closure for join_mem_test_cb()
*/
struct JoinMemTestClosure
{
- struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
+ struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
struct Channel *chn;
struct GNUNET_MULTICAST_JoinHandle *jh;
struct GNUNET_PSYC_JoinRequestMessage *join_msg;
* Membership test result callback used for join requests.
*/
static void
-join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
+join_mem_test_cb (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct JoinMemTestClosure *jcls = cls;
if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
{ /* Pass on join request to client if this is a master channel */
struct Master *mst = (struct Master *) jcls->chn;
- struct GNUNET_HashCode slave_key_hash;
- GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
- &slave_key_hash);
- GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
+ struct GNUNET_HashCode slave_pub_hash;
+ GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
+ &slave_pub_hash);
+ GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
client_send_msg (jcls->chn, &jcls->join_msg->header);
}
else
{
+ if (GNUNET_SYSERR == result)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Could not perform membership test (%.*s)\n",
+ err_msg_size, err_msg);
+ }
// FIXME: add relays
GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
}
*/
static void
mcast_recv_join_request (void *cls,
- const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
const struct GNUNET_MessageHeader *join_msg,
struct GNUNET_MULTICAST_JoinHandle *jh)
{
req = GNUNET_malloc (sizeof (*req) + join_msg_size);
req->header.size = htons (sizeof (*req) + join_msg_size);
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
- req->slave_key = *slave_key;
+ req->slave_pub_key = *slave_pub_key;
if (0 < join_msg_size)
memcpy (&req[1], join_msg, join_msg_size);
struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
- jcls->slave_key = *slave_key;
+ jcls->slave_pub_key = *slave_pub_key;
jcls->chn = chn;
jcls->jh = jh;
jcls->join_msg = req;
- GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
+ GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
chn->max_message_id, 0,
&join_mem_test_cb, jcls);
}
*/
static void
mcast_recv_join_decision (void *cls, int is_admitted,
- const struct GNUNET_PeerIdentity *peer,
- uint16_t relay_count,
- const struct GNUNET_PeerIdentity *relays,
- const struct GNUNET_MessageHeader *join_resp)
+ const struct GNUNET_PeerIdentity *peer,
+ uint16_t relay_count,
+ const struct GNUNET_PeerIdentity *relays,
+ const struct GNUNET_MessageHeader *join_resp)
{
struct Slave *slv = cls;
struct Channel *chn = &slv->chn;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Got join decision: %d\n", slv, is_admitted);
+ if (GNUNET_YES == chn->is_ready)
+ {
+ /* Already admitted */
+ return;
+ }
uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
struct GNUNET_PSYC_JoinDecisionMessage *
client_send_msg (chn, &dcsn->header);
- if (GNUNET_YES == is_admitted)
+ if (GNUNET_YES == is_admitted
+ && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
{
chn->is_ready = GNUNET_YES;
}
- else
- {
- slv->member = NULL;
- }
}
+static int
+store_recv_fragment_replay (void *cls,
+ struct GNUNET_MULTICAST_MessageHeader *msg,
+ enum GNUNET_PSYCSTORE_MessageFlags flags)
+{
+ struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
+
+ GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
+ */
static void
-mcast_recv_membership_test (void *cls,
- const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
- uint64_t message_id, uint64_t group_generation,
- struct GNUNET_MULTICAST_MembershipTestHandle *mth)
+store_recv_fragment_replay_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
+ struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
+ rh, result, err_msg_size, err_msg);
+
+ switch (result)
+ {
+ case GNUNET_YES:
+ break;
+ case GNUNET_NO:
+ GNUNET_MULTICAST_replay_response (rh, NULL,
+ GNUNET_MULTICAST_REC_NOT_FOUND);
+ break;
+
+ case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
+ GNUNET_MULTICAST_replay_response (rh, NULL,
+ GNUNET_MULTICAST_REC_ACCESS_DENIED);
+ break;
+
+ case GNUNET_SYSERR:
+ GNUNET_MULTICAST_replay_response (rh, NULL,
+ GNUNET_MULTICAST_REC_INTERNAL_ERROR);
+ break;
+ }
+ GNUNET_MULTICAST_replay_response_end (rh);
}
+/**
+ * Incoming fragment replay request from multicast.
+ */
static void
mcast_recv_replay_fragment (void *cls,
- const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
uint64_t fragment_id, uint64_t flags,
struct GNUNET_MULTICAST_ReplayHandle *rh)
{
-
+ struct Channel *chn = cls;
+ GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
+ fragment_id, fragment_id,
+ &store_recv_fragment_replay,
+ &store_recv_fragment_replay_result, rh);
}
+/**
+ * Incoming message replay request from multicast.
+ */
static void
mcast_recv_replay_message (void *cls,
- const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
uint64_t message_id,
uint64_t fragment_offset,
uint64_t flags,
struct GNUNET_MULTICAST_ReplayHandle *rh)
{
-
+ struct Channel *chn = cls;
+ GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
+ message_id, message_id, 1, NULL,
+ &store_recv_fragment_replay,
+ &store_recv_fragment_replay_result, rh);
}
/**
- * Send multicast message to all clients connected to the channel.
+ * Initialize PSYC message header.
*/
-static void
-client_send_mcast_msg (struct Channel *chn,
- const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+static inline void
+psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
+{
+ uint16_t size = ntohs (mmsg->header.size);
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+ pmsg->header.size = htons (psize);
+ pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ pmsg->message_id = mmsg->message_id;
+ pmsg->fragment_offset = mmsg->fragment_offset;
+ pmsg->flags = htonl (flags);
+
+ memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+}
+
+
+/**
+ * Create a new PSYC message from a multicast message for sending it to clients.
+ */
+static inline struct GNUNET_PSYC_MessageHeader *
+psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
{
struct GNUNET_PSYC_MessageHeader *pmsg;
uint16_t size = ntohs (mmsg->header.size);
uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+ pmsg = GNUNET_malloc (psize);
+ psyc_msg_init (pmsg, mmsg, flags);
+ return pmsg;
+}
+
+
+/**
+ * Send multicast message to all clients connected to the channel.
+ */
+static void
+client_send_mcast_msg (struct Channel *chn,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg,
+ uint32_t flags)
+{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Sending multicast message to client. "
"fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
chn, GNUNET_ntohll (mmsg->fragment_id),
GNUNET_ntohll (mmsg->message_id));
- pmsg = GNUNET_malloc (psize);
- pmsg->header.size = htons (psize);
- pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
- pmsg->message_id = mmsg->message_id;
- pmsg->fragment_offset = mmsg->fragment_offset;
-
- memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+ struct GNUNET_PSYC_MessageHeader *
+ pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
client_send_msg (chn, &pmsg->header);
GNUNET_free (pmsg);
}
pmsg->message_id = req->request_id;
pmsg->fragment_offset = req->fragment_offset;
pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
+ pmsg->slave_pub_key = req->member_pub_key;
memcpy (&pmsg[1], &req[1], size - sizeof (*req));
client_send_msg (chn, &pmsg->header);
/**
* Insert a multicast message fragment into the queue belonging to the message.
*
- * @param chn Channel.
+ * @param chn Channel.
* @param mmsg Multicast message fragment.
* @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
* @param first_ptype First PSYC message part type in @a mmsg.
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Header of message %" PRIu64 " is NOT complete yet: "
"%" PRIu64 " != %" PRIu64 "\n",
- chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
- fragq->header_size);
+ chn, GNUNET_ntohll (mmsg->message_id),
+ frag_offset, fragq->header_size);
}
}
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Message %" PRIu64 " is NOT complete yet: "
"%" PRIu64 " != %" PRIu64 "\n",
- chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
- fragq->size);
+ chn, GNUNET_ntohll (mmsg->message_id),
+ frag_offset, fragq->size);
break;
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
* Send fragments of a message in order to client, after all modifiers arrived
* from multicast.
*
- * @param chn Channel.
- * @param msg_id ID of the message @a fragq belongs to.
- * @param fragq Fragment queue of the message.
- * @param drop Drop message without delivering to client?
- * #GNUNET_YES or #GNUNET_NO.
+ * @param chn
+ * Channel.
+ * @param msg_id
+ * ID of the message @a fragq belongs to.
+ * @param fragq
+ * Fragment queue of the message.
+ * @param drop
+ * Drop message without delivering to client?
+ * #GNUNET_YES or #GNUNET_NO.
*/
static void
fragment_queue_run (struct Channel *chn, uint64_t msg_id,
{
if (GNUNET_NO == drop)
{
- client_send_mcast_msg (chn, cache_entry->mmsg);
+ client_send_mcast_msg (chn, cache_entry->mmsg, 0);
}
if (cache_entry->ref_count <= 1)
{
}
+struct StateModifyClosure
+{
+ struct Channel *chn;
+ uint64_t msg_id;
+ struct GNUNET_HashCode msg_id_hash;
+};
+
+
+void
+store_recv_state_modify_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
+{
+ struct StateModifyClosure *mcls = cls;
+ struct Channel *chn = mcls->chn;
+ uint64_t msg_id = mcls->msg_id;
+
+ struct FragmentQueue *
+ fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
+ chn, result, err_msg_size, err_msg);
+
+ switch (result)
+ {
+ case GNUNET_OK:
+ case GNUNET_NO:
+ if (NULL != fragq)
+ fragq->state_is_modified = GNUNET_YES;
+ if (chn->max_state_message_id < msg_id)
+ chn->max_state_message_id = msg_id;
+ if (chn->max_message_id < msg_id)
+ chn->max_message_id = msg_id;
+
+ if (NULL != fragq)
+ fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
+ GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
+ message_queue_run (chn);
+ break;
+
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
+ chn, result, err_msg_size, err_msg);
+ /** @todo FIXME: handle state_modify error */
+ }
+}
+
+
/**
* Run message queue.
*
"%p Running message queue.\n", chn);
uint64_t n = 0;
uint64_t msg_id;
+
while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
&msg_id))
{
break;
}
- if (MSG_FRAG_STATE_HEADER == fragq->state)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Fragment queue entry: state: %u, state delta: "
+ "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
+ chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
+
+ if (MSG_FRAG_STATE_DATA <= fragq->state)
{
/* Check if there's a missing message before the current one */
if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
+
if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
- && msg_id - 1 != chn->max_message_id)
+ && (chn->max_message_id != msg_id - 1
+ && chn->max_message_id != msg_id))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Out of order message. "
- "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
- chn, msg_id, chn->max_message_id);
+ "(%" PRIu64 " != %" PRIu64 " - 1)\n",
+ chn, chn->max_message_id, msg_id);
break;
+ // FIXME: keep track of messages processed in this queue run,
+ // and only stop after reaching the end
}
}
else
{
- if (msg_id - fragq->state_delta != chn->max_state_message_id)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
+ if (GNUNET_YES != fragq->state_is_modified)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Out of order stateful message. "
- "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
- chn, msg_id, fragq->state_delta, chn->max_state_message_id);
- break;
+ if (msg_id - fragq->state_delta != chn->max_state_message_id)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Out of order stateful message. "
+ "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
+ chn, msg_id, fragq->state_delta, chn->max_state_message_id);
+ break;
+ // FIXME: keep track of messages processed in this queue run,
+ // and only stop after reaching the end
+ }
+
+ struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
+ mcls->chn = chn;
+ mcls->msg_id = msg_id;
+ mcls->msg_id_hash = msg_id_hash;
+
+ /* Apply modifiers to state in PSYCstore */
+ GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
+ fragq->state_delta,
+ store_recv_state_modify_result, mcls);
+ break; // continue after asynchronous state modify result
}
-#if TODO
- /* FIXME: apply modifiers to state in PSYCstore */
- GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
- store_recv_state_modify_result, cls);
-#endif
- chn->max_state_message_id = msg_id;
}
chn->max_message_id = msg_id;
}
GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
n++;
}
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Removed %" PRIu64 " messages from queue.\n", chn, n);
return n;
struct FragmentQueue *
fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
-
+ GNUNET_assert (NULL != fragq);
fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
n++;
/**
- * Handle the result of a GNUNET_PSYCSTORE_fragment_store() operation.
+ * Received result of GNUNET_PSYCSTORE_fragment_store().
*/
static void
-store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg)
+store_recv_fragment_store_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct Channel *chn = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n",
- chn, result, err_msg);
+ "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
+ chn, result, err_msg_size, err_msg);
}
uint16_t size = ntohs (mmsg->header.size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received multicast message of size %u.\n",
- chn, size);
+ "%p Received multicast message of size %u. "
+ "fragment_id=%" PRIu64 ", message_id=%" PRIu64
+ ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
+ chn, size,
+ GNUNET_ntohll (mmsg->fragment_id),
+ GNUNET_ntohll (mmsg->message_id),
+ GNUNET_ntohll (mmsg->fragment_offset),
+ GNUNET_ntohll (mmsg->flags));
GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
&store_recv_fragment_store_result, chn);
uint16_t first_ptype = 0, last_ptype = 0;
- if (GNUNET_SYSERR
- == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
- (const char *) &mmsg[1],
- &first_ptype, &last_ptype))
+ int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
+ (const char *) &mmsg[1],
+ &first_ptype, &last_ptype);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Message check result %d, first part type %u, last part type %u\n",
+ chn, check, first_ptype, last_ptype);
+ if (GNUNET_SYSERR == check)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Dropping incoming multicast message with invalid parts.\n",
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Message parts: first: type %u, last: type %u\n",
- first_ptype, last_ptype);
-
fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
message_queue_run (chn);
}
struct Master *mst = cls;
uint16_t size = ntohs (req->header.size);
+ char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received multicast request of size %u.\n",
- mst, size);
+ "%p Received multicast request of size %u from %s.\n",
+ mst, size, str);
+ GNUNET_free (str);
uint16_t first_ptype = 0, last_ptype = 0;
if (GNUNET_SYSERR
struct Channel *chn = &mst->chn;
chn->store_op = NULL;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (result);
mst->max_group_generation = max_group_generation;
mst->origin
= GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
- &mcast_recv_join_request,
- &mcast_recv_membership_test,
- &mcast_recv_replay_fragment,
- &mcast_recv_replay_message,
- &mcast_recv_request,
- &mcast_recv_message, chn);
+ mcast_recv_join_request,
+ mcast_recv_replay_fragment,
+ mcast_recv_replay_message,
+ mcast_recv_request,
+ mcast_recv_message, chn);
chn->is_ready = GNUNET_YES;
}
else
struct Channel *chn = &slv->chn;
chn->store_op = NULL;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (result);
&slv->origin,
slv->relay_count, slv->relays,
&slv->join_msg->header,
- &mcast_recv_join_request,
- &mcast_recv_join_decision,
- &mcast_recv_membership_test,
- &mcast_recv_replay_fragment,
- &mcast_recv_replay_message,
- &mcast_recv_message, chn);
+ mcast_recv_join_request,
+ mcast_recv_join_decision,
+ mcast_recv_replay_fragment,
+ mcast_recv_replay_message,
+ mcast_recv_message, chn);
if (NULL != slv->join_msg)
{
GNUNET_free (slv->join_msg);
{
chn = &mst->chn;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (GNUNET_OK);
"%p Client connected as master to channel %s.\n",
mst, GNUNET_h2s (&chn->pub_key_hash));
- struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
+ struct Client *cli = GNUNET_new (struct Client);
cli->client = client;
GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
uint16_t req_size = ntohs (req->header.size);
struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
- struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
+ struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
- GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
- GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
+ GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
+ GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
struct GNUNET_CONTAINER_MultiHashMap *
chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
if (NULL != chn_slv)
{
- slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
+ slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
}
if (NULL == slv)
{
slv = GNUNET_new (struct Slave);
slv->priv_key = req->slave_key;
slv->pub_key = slv_pub_key;
- slv->pub_key_hash = slv_pub_key_hash;
+ slv->pub_key_hash = slv_pub_hash;
slv->origin = req->origin;
slv->relay_count = ntohl (req->relay_count);
+ slv->join_flags = ntohl (req->flags);
const struct GNUNET_PeerIdentity *
relays = (const struct GNUNET_PeerIdentity *) &req[1];
if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
<= req_size)
{
- join_msg_size = ntohs (slv->join_msg->header.size);
+ struct GNUNET_PSYC_Message *
+ join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
+ join_msg_size = ntohs (join_msg->header.size);
slv->join_msg = GNUNET_malloc (join_msg_size);
- memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
+ memcpy (slv->join_msg, join_msg, join_msg_size);
}
if (sizeof (*req) + relay_size + join_msg_size != req_size)
{
sizeof (*req), relay_size, join_msg_size, req_size);
GNUNET_break (0);
GNUNET_SERVER_client_disconnect (client);
+ GNUNET_free (slv);
return;
}
if (0 < slv->relay_count)
chn = &slv->chn;
chn->is_master = GNUNET_NO;
- chn->pub_key = req->channel_key;
+ chn->pub_key = req->channel_pub_key;
chn->pub_key_hash = pub_key_hash;
channel_init (chn);
{
chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
}
GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
{
chn = &slv->chn;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (GNUNET_OK);
GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
GNUNET_NO);
- if (NULL == slv->member)
+ if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
+ {
+ mcast_recv_join_decision (slv, GNUNET_YES,
+ NULL, 0, NULL, NULL);
+ }
+ else if (NULL == slv->member)
{
slv->member
= GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
&slv->join_msg->header,
&mcast_recv_join_request,
&mcast_recv_join_decision,
- &mcast_recv_membership_test,
&mcast_recv_replay_fragment,
&mcast_recv_replay_message,
&mcast_recv_message, chn);
"%p Client connected as slave to channel %s.\n",
slv, GNUNET_h2s (&chn->pub_key_hash));
- struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
+ struct Client *cli = GNUNET_new (struct Client);
cli->client = client;
GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (GNUNET_YES == chn->is_master);
- struct Master *mst = (struct Master *) chn;
-
- struct GNUNET_PSYC_JoinDecisionMessage *
- dcsn = (struct GNUNET_PSYC_JoinDecisionMessage *) msg;
+ const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
+ = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
+ struct Channel *chn;
+ struct Master *mst;
struct JoinDecisionClosure jcls;
+
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ if (NULL == chn)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ GNUNET_assert (GNUNET_YES == chn->is_master);
+ mst = (struct Master *) chn;
jcls.is_admitted = ntohl (dcsn->is_admitted);
jcls.msg
= (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
? (struct GNUNET_MessageHeader *) &dcsn[1]
: NULL;
- struct GNUNET_HashCode slave_key_hash;
- GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
- &slave_key_hash);
+ struct GNUNET_HashCode slave_pub_hash;
+ GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
+ &slave_pub_hash);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Got join decision (%d) from client for channel %s..\n",
mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p ..and slave %s.\n",
- mst, GNUNET_h2s (&slave_key_hash));
+ mst, GNUNET_h2s (&slave_pub_hash));
- GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
+ GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
&mcast_send_join_decision, &jcls);
- GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
+ GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p transmit_notify: nothing to send.\n", chn);
+ if (NULL != tmit_msg && *data_size < tmit_msg->size)
+ GNUNET_break (0);
*data_size = 0;
return GNUNET_NO;
}
*data_size = tmit_msg->size;
memcpy (data, &tmit_msg[1], *data_size);
- int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
+ int ret
+ = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
+ ? GNUNET_NO
+ : GNUNET_YES;
- if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
+ /* FIXME: handle disconnecting clients */
+ if (NULL != tmit_msg->client)
send_message_ack (chn, tmit_msg->client);
GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
if (NULL != chn->tmit_head)
{
- transmit_message (chn);
+ GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn);
}
- else if (GNUNET_YES == chn->is_disconnected)
+ else if (GNUNET_YES == chn->is_disconnected
+ && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
{
/* FIXME: handle partial message (when still in_transmit) */
- cleanup_channel (chn);
+ return GNUNET_SYSERR;
}
return ret;
}
static void
master_transmit_message (struct Master *mst)
{
+ if (NULL == mst->chn.tmit_head)
+ return;
if (NULL == mst->tmit_handle)
{
mst->tmit_handle
- = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
+ = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->chn.tmit_head->id,
mst->max_group_generation,
master_transmit_notify, mst);
}
static void
slave_transmit_message (struct Slave *slv)
{
+ if (NULL == slv->chn.tmit_head)
+ return;
if (NULL == slv->tmit_handle)
{
slv->tmit_handle
- = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
+ = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
slave_transmit_notify, slv);
}
else
* Queue a message from a channel master for sending to the multicast group.
*/
static void
-master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
- uint16_t first_ptype, uint16_t last_ptype)
+master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
{
tmit_msg->id = ++mst->max_message_id;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p master_queue_message: message_id=%" PRIu64 "\n",
+ mst, tmit_msg->id);
struct GNUNET_PSYC_MessageMethod *pmeth
= (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
}
else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p master_queue_message: state_delta=%" PRIu64 "\n",
+ mst, tmit_msg->id - mst->max_state_message_id);
pmeth->state_delta = GNUNET_htonll (tmit_msg->id
- mst->max_state_message_id);
+ mst->max_state_message_id = tmit_msg->id;
}
else
{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p master_queue_message: state not modified\n", mst);
pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
}
+
+ if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
+ {
+ /// @todo add state_hash to PSYC header
+ }
}
}
* Queue a message from a channel slave for sending to the multicast group.
*/
static void
-slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
- uint16_t first_ptype, uint16_t last_ptype)
+slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
{
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
{
struct GNUNET_PSYC_MessageMethod *pmeth
= (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
memcpy (&tmit_msg[1], data, data_size);
tmit_msg->client = client;
tmit_msg->size = data_size;
- tmit_msg->state = chn->tmit_state;
+ tmit_msg->first_ptype = first_ptype;
+ tmit_msg->last_ptype = last_ptype;
/* FIXME: separate queue per message ID */
GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
chn->is_master
- ? master_queue_message ((struct Master *) chn, tmit_msg,
- first_ptype, last_ptype)
- : slave_queue_message ((struct Slave *) chn, tmit_msg,
- first_ptype, last_ptype);
+ ? master_queue_message ((struct Master *) chn, tmit_msg)
+ : slave_queue_message ((struct Slave *) chn, tmit_msg);
return tmit_msg;
}
uint16_t size = ntohs (msg->size);
if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p Message payload too large: %u < %u.\n",
+ chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
GNUNET_break (0);
transmit_cancel (chn, client);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
/**
- * Client requests to add a slave to the membership database.
+ * Received result of GNUNET_PSYCSTORE_membership_store()
*/
static void
-client_recv_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+store_recv_membership_store_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
+ struct Operation *op = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
+ op->chn, result, err_msg_size, err_msg);
+ if (NULL != op->client)
+ client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
+ op_remove (op);
}
/**
- * Client requests to remove a slave from the membership database.
+ * Client requests to add/remove a slave in the membership database.
*/
static void
-client_recv_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
+ struct Channel *
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
+
+ const struct ChannelMembershipStoreRequest *
+ req = (const struct ChannelMembershipStoreRequest *) msg;
+ struct Operation *op = op_add (chn, client, req->op_id, 0);
+
+ uint64_t announced_at = GNUNET_ntohll (req->announced_at);
+ uint64_t effective_since = GNUNET_ntohll (req->effective_since);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received membership store request from client.\n", chn);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
+ chn, req->did_join, announced_at, effective_since);
+
+ GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
+ req->did_join, announced_at, effective_since,
+ 0, /* FIXME: group_generation */
+ &store_recv_membership_store_result, op);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
/**
- * Client requests channel history from PSYCstore.
+ * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
+ * in response to a history request from a client.
+ */
+static int
+store_recv_fragment_history (void *cls,
+ struct GNUNET_MULTICAST_MessageHeader *mmsg,
+ enum GNUNET_PSYCSTORE_MessageFlags flags)
+{
+ struct Operation *op = cls;
+ if (NULL == op->client)
+ { /* Requesting client already disconnected. */
+ return GNUNET_NO;
+ }
+ struct Channel *chn = op->chn;
+
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t msize = ntohs (mmsg->header.size);
+ uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
+
+ struct GNUNET_OperationResultMessage *
+ res = GNUNET_malloc (sizeof (*res) + psize);
+ res->header.size = htons (sizeof (*res) + psize);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
+ res->op_id = op->op_id;
+ res->result_code = GNUNET_htonll (GNUNET_OK);
+
+ pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
+ GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
+ memcpy (&res[1], pmsg, psize);
+
+ /** @todo FIXME: send only to requesting client */
+ client_send_msg (chn, &res->header);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Received the result of GNUNET_PSYCSTORE_fragment_get(),
+ * in response to a history request from a client.
*/
static void
-client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+store_recv_fragment_history_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
+ struct Operation *op = cls;
+ if (NULL == op->client)
+ { /* Requesting client already disconnected. */
+ return;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p History replay #%" PRIu64 ": "
+ "PSYCSTORE returned %" PRId64 " (%.*s)\n",
+ op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
+
+ if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
+ {
+ /** @todo Multicast replay request for messages not found locally. */
+ }
+ client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
+ op_remove (op);
+}
+
+
+/**
+ * Client requests channel history.
+ */
+static void
+client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct Channel *
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
+
+ const struct GNUNET_PSYC_HistoryRequestMessage *
+ req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
+ uint16_t size = ntohs (msg->size);
+ const char *method_prefix = (const char *) &req[1];
+
+ if (size < sizeof (*req) + 1
+ || '\0' != method_prefix[size - sizeof (*req) - 1])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p History replay #%" PRIu64 ": "
+ "invalid method prefix. size: %u < %u?\n",
+ chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
+
+ if (0 == req->message_limit)
+ GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
+ GNUNET_ntohll (req->start_message_id),
+ GNUNET_ntohll (req->end_message_id),
+ 0, method_prefix,
+ &store_recv_fragment_history,
+ &store_recv_fragment_history_result, op);
+ else
+ GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
+ GNUNET_ntohll (req->message_limit),
+ method_prefix,
+ &store_recv_fragment_history,
+ &store_recv_fragment_history_result,
+ op);
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Received state var from PSYCstore, send it to client.
+ */
+static int
+store_recv_state_var (void *cls, const char *name,
+ const void *value, uint32_t value_size)
+{
+ struct Operation *op = cls;
+ struct GNUNET_OperationResultMessage *res;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
+ op->chn, GNUNET_ntohll (op->op_id), name);
+
+ if (NULL != name) /* First part */
+ {
+ uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
+ struct GNUNET_PSYC_MessageModifier *mod;
+ res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
+ res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+ res->op_id = op->op_id;
+
+ mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
+ mod->header.size = htons (sizeof (*mod) + name_size + value_size);
+ mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
+ mod->name_size = htons (name_size);
+ mod->value_size = htonl (value_size);
+ mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
+ memcpy (&mod[1], name, name_size);
+ memcpy (((char *) &mod[1]) + name_size, value, value_size);
+ }
+ else /* Continuation */
+ {
+ struct GNUNET_MessageHeader *mod;
+ res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
+ res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+ res->op_id = op->op_id;
+
+ mod = (struct GNUNET_MessageHeader *) &res[1];
+ mod->size = htons (sizeof (*mod) + value_size);
+ mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
+ memcpy (&mod[1], value, value_size);
+ }
+
+ // FIXME: client might have been disconnected
+ GNUNET_SERVER_notification_context_add (nc, op->client);
+ GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
+ GNUNET_NO);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Received result of GNUNET_PSYCSTORE_state_get()
+ * or GNUNET_PSYCSTORE_state_get_prefix()
+ */
+static void
+store_recv_state_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
+{
+ struct Operation *op = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p state_get #%" PRIu64 ": "
+ "PSYCSTORE returned %" PRId64 " (%.*s)\n",
+ op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
+
+ // FIXME: client might have been disconnected
+ client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
+ op_remove (op);
}
client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
+ struct Channel *
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
+
+ const struct StateRequest *
+ req = (const struct StateRequest *) msg;
+ uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
+ const char *name = (const char *) &req[1];
+ if (0 == name_size || '\0' != name[name_size - 1])
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ struct Operation *op = op_add (chn, client, req->op_id, 0);
+ GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
+ &store_recv_state_var,
+ &store_recv_state_result, op);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
+ struct Channel *
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
+
+ const struct StateRequest *
+ req = (const struct StateRequest *) msg;
+
+ uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
+ const char *name = (const char *) &req[1];
+ if (0 == name_size || '\0' != name[name_size - 1])
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ struct Operation *op = op_add (chn, client, req->op_id, 0);
+ GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
+ &store_recv_state_var,
+ &store_recv_state_result, op);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
+static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
+ { &client_recv_master_start, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
+
+ { &client_recv_slave_join, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
+
+ { &client_recv_join_decision, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
+
+ { &client_recv_psyc_message, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
+
+ { &client_recv_membership_store, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
+
+ { &client_recv_history_replay, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
+
+ { &client_recv_state_get, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
+
+ { &client_recv_state_get_prefix, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
+
+ { NULL, NULL, 0, 0 }
+};
+
+
/**
* Initialize the PSYC service.
*
run (void *cls, struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *c)
{
- static const struct GNUNET_SERVER_MessageHandler handlers[] = {
- { &client_recv_master_start, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
-
- { &client_recv_slave_join, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
-
- { &client_recv_join_decision, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
-
- { &client_recv_psyc_message, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
-
- { &client_recv_slave_add, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
-
- { &client_recv_slave_remove, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
-
- { &client_recv_story_request, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
-
- { &client_recv_state_get, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
-
- { &client_recv_state_get_prefix, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
-
- { NULL, NULL, 0, 0 }
- };
-
cfg = c;
store = GNUNET_PSYCSTORE_connect (cfg);
stats = GNUNET_STATISTICS_create ("psyc", cfg);
masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
nc = GNUNET_SERVER_notification_context_create (server, 1);
- GNUNET_SERVER_add_handlers (server, handlers);
+ GNUNET_SERVER_add_handlers (server, server_handlers);
GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
&shutdown_task, NULL);