/*
* This file is part of GNUnet
- * (C) 2013 Christian Grothoff (and other contributing authors)
+ * Copyright (C) 2013 GNUnet e.V.
*
* GNUnet is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published
*
* You should have received a copy of the GNU General Public License
* along with GNUnet; see the file COPYING. If not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
*/
/**
#include "gnunet_multicast_service.h"
#include "gnunet_psycstore_service.h"
#include "gnunet_psyc_service.h"
+#include "gnunet_psyc_util_lib.h"
#include "psyc.h"
uint64_t id;
/**
- * Size of @a buf
+ * Size of message.
*/
uint16_t size;
/**
- * @see enum MessageState
+ * Type of first message part.
*/
- uint8_t state;
+ uint16_t first_ptype;
+
+ /**
+ * Type of last message part.
+ */
+ uint16_t last_ptype;
/* Followed by message */
};
*/
uint8_t state;
+ /**
+ * Whether the state is already modified in PSYCstore.
+ */
+ uint8_t state_is_modified;
+
/**
* Is the message queued for delivery to the client?
* i.e. added to the recv_msgs queue
*/
- uint8_t queued;
+ uint8_t is_queued;
};
/**
* List of connected clients.
*/
-struct ClientList
+struct Client
+{
+ struct Client *prev;
+ struct Client *next;
+
+ struct GNUNET_SERVER_Client *client;
+};
+
+
+struct Operation
{
- struct ClientList *prev;
- struct ClientList *next;
+ struct Operation *prev;
+ struct Operation *next;
+
struct GNUNET_SERVER_Client *client;
+ struct Channel *chn;
+ uint64_t op_id;
+ uint32_t flags;
};
*/
struct Channel
{
- struct ClientList *clients_head;
- struct ClientList *clients_tail;
+ struct Client *clients_head;
+ struct Client *clients_tail;
+
+ struct Operation *op_head;
+ struct Operation *op_tail;
struct TransmitMessage *tmit_head;
struct TransmitMessage *tmit_tail;
*/
struct GNUNET_CONTAINER_Heap *recv_msgs;
- /**
- * FIXME: needed?
- */
- GNUNET_SCHEDULER_TaskIdentifier tmit_task;
-
/**
* Public key of the channel.
*/
*/
uint32_t tmit_mod_value_size;
- /**
- * @see enum MessageState
- */
- uint8_t tmit_state;
-
- /**
- * FIXME: needed?
- */
- uint8_t in_transmit;
-
/**
* Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
*/
uint8_t is_master;
/**
- * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
+ * Is this channel ready to receive messages from client?
+ * #GNUNET_YES or #GNUNET_NO
*/
- uint8_t ready;
+ uint8_t is_ready;
/**
- * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
+ * Is the client disconnected?
+ * #GNUNET_YES or #GNUNET_NO
*/
- uint8_t disconnected;
+ uint8_t is_disconnected;
};
/**
* Channel struct common for Master and Slave
*/
- struct Channel ch;
+ struct Channel chn;
/**
* Private key of the channel.
/**
* Incoming join requests from multicast.
- * member_key -> struct GNUNET_MULTICAST_JoinHandle *
+ * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
*/
struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
/**
* Channel struct common for Master and Slave
*/
- struct Channel ch;
+ struct Channel chn;
/**
* Private key of the slave.
*/
- struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
+ struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
/**
* Public key of the slave.
*/
- struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+ struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
/**
* Hash of @a pub_key.
/**
* Join request to be transmitted to the master on join.
*/
- struct GNUNET_MessageHeader *join_req;
+ struct GNUNET_PSYC_Message *join_msg;
/**
* Join decision received from multicast.
*/
- struct SlaveJoinDecision *join_dcsn;
+ struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
/**
* Maximum request ID for this channel.
*/
uint64_t max_request_id;
+
+ /**
+ * Join flags.
+ */
+ enum GNUNET_PSYC_SlaveJoinFlags join_flags;
};
-static inline void
-transmit_message (struct Channel *ch);
+static void
+transmit_message (struct Channel *chn);
+static uint64_t
+message_queue_run (struct Channel *chn);
static uint64_t
-message_queue_drop (struct Channel *ch);
+message_queue_drop (struct Channel *chn);
+
+
+static void
+schedule_transmit_message (void *cls)
+{
+ struct Channel *chn = cls;
+
+ transmit_message (chn);
+}
/**
* Task run during shutdown.
*
* @param cls unused
- * @param tc unused
*/
static void
-shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls)
{
if (NULL != nc)
{
}
+static struct Operation *
+op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
+ uint64_t op_id, uint32_t flags)
+{
+ struct Operation *op = GNUNET_malloc (sizeof (*op));
+ op->client = client;
+ op->chn = chn;
+ op->op_id = op_id;
+ op->flags = flags;
+ GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
+ return op;
+}
+
+
+static void
+op_remove (struct Operation *op)
+{
+ GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
+ GNUNET_free (op);
+}
+
+
/**
* Clean up master data structures after a client disconnected.
*/
static void
cleanup_master (struct Master *mst)
{
- struct Channel *ch = &mst->ch;
+ struct Channel *chn = &mst->chn;
if (NULL != mst->origin)
- GNUNET_MULTICAST_origin_stop (mst->origin);
+ GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
- GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
+ GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
}
static void
cleanup_slave (struct Slave *slv)
{
- struct Channel *ch = &slv->ch;
+ struct Channel *chn = &slv->chn;
struct GNUNET_CONTAINER_MultiHashMap *
- ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
- &ch->pub_key_hash);
- GNUNET_assert (NULL != ch_slv);
- GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv);
+ chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
+ &chn->pub_key_hash);
+ GNUNET_assert (NULL != chn_slv);
+ GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
- if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv))
+ if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
{
- GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash,
- ch_slv);
- GNUNET_CONTAINER_multihashmap_destroy (ch_slv);
+ GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
+ chn_slv);
+ GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
}
- GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv);
+ GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
- if (NULL != slv->join_req)
- GNUNET_free (slv->join_req);
+ if (NULL != slv->join_msg)
+ {
+ GNUNET_free (slv->join_msg);
+ slv->join_msg = NULL;
+ }
if (NULL != slv->relays)
+ {
GNUNET_free (slv->relays);
+ slv->relays = NULL;
+ }
if (NULL != slv->member)
- GNUNET_MULTICAST_member_part (slv->member);
- GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
+ {
+ GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
+ slv->member = NULL;
+ }
+ GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
}
* Clean up channel data structures after a client disconnected.
*/
static void
-cleanup_channel (struct Channel *ch)
+cleanup_channel (struct Channel *chn)
{
- message_queue_drop (ch);
- GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &ch->pub_key_hash);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Cleaning up channel %s. master? %u\n",
+ chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
+ message_queue_drop (chn);
+ GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
+ chn->recv_frags = NULL;
- if (NULL != ch->store_op)
- GNUNET_PSYCSTORE_operation_cancel (ch->store_op);
+ if (NULL != chn->store_op)
+ {
+ GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
+ chn->store_op = NULL;
+ }
- (GNUNET_YES == ch->is_master)
- ? cleanup_master ((struct Master *) ch)
- : cleanup_slave ((struct Slave *) ch);
- GNUNET_free (ch);
+ (GNUNET_YES == chn->is_master)
+ ? cleanup_master ((struct Master *) chn)
+ : cleanup_slave ((struct Slave *) chn);
+ GNUNET_free (chn);
}
return;
struct Channel *
- ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Client (%s) disconnected from channel %s\n",
- ch, (GNUNET_YES == ch->is_master) ? "master" : "slave",
- GNUNET_h2s (&ch->pub_key_hash));
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- if (NULL == ch)
+ if (NULL == chn)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "%p User context is NULL in client_disconnect()\n", ch);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p User context is NULL in client_disconnect()\n", chn);
GNUNET_break (0);
return;
}
- struct ClientList *cl = ch->clients_head;
- while (NULL != cl)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Client (%s) disconnected from channel %s\n",
+ chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
+ GNUNET_h2s (&chn->pub_key_hash));
+
+ struct Client *cli = chn->clients_head;
+ while (NULL != cli)
+ {
+ if (cli->client == client)
+ {
+ GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
+ GNUNET_free (cli);
+ break;
+ }
+ cli = cli->next;
+ }
+
+ struct Operation *op = chn->op_head;
+ while (NULL != op)
{
- if (cl->client == client)
+ if (op->client == client)
{
- GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl);
- GNUNET_free (cl);
+ op->client = NULL;
break;
}
- cl = cl->next;
+ op = op->next;
}
- if (NULL == ch->clients_head)
+ if (NULL == chn->clients_head)
{ /* Last client disconnected. */
- if (NULL != ch->tmit_head)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Last client (%s) disconnected from channel %s\n",
+ chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
+ GNUNET_h2s (&chn->pub_key_hash));
+ chn->is_disconnected = GNUNET_YES;
+ if (NULL != chn->tmit_head)
{ /* Send pending messages to multicast before cleanup. */
- transmit_message (ch);
+ transmit_message (chn);
}
else
{
- cleanup_channel (ch);
+ cleanup_channel (chn);
}
}
}
* Send message to all clients connected to the channel.
*/
static void
-msg_to_clients (const struct Channel *ch,
- const struct GNUNET_MessageHeader *msg)
+client_send_msg (const struct Channel *chn,
+ const struct GNUNET_MessageHeader *msg)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Sending message to clients.\n", ch);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Sending message to clients.\n", chn);
- struct ClientList *cl = ch->clients_head;
- while (NULL != cl)
+ struct Client *cli = chn->clients_head;
+ while (NULL != cli)
{
- GNUNET_SERVER_notification_context_add (nc, cl->client);
- GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO);
- cl = cl->next;
+ GNUNET_SERVER_notification_context_add (nc, cli->client);
+ GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
+ cli = cli->next;
}
}
+/**
+ * Send a result code back to the client.
+ *
+ * @param client
+ * Client that should receive the result code.
+ * @param result_code
+ * Code to transmit.
+ * @param op_id
+ * Operation ID in network byte order.
+ * @param data
+ * Data payload or NULL.
+ * @param data_size
+ * Size of @a data.
+ */
+static void
+client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
+ int64_t result_code, const void *data, uint16_t data_size)
+{
+ struct GNUNET_OperationResultMessage *res;
+
+ res = GNUNET_malloc (sizeof (*res) + data_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
+ res->header.size = htons (sizeof (*res) + data_size);
+ res->result_code = GNUNET_htonll (result_code);
+ res->op_id = op_id;
+ if (0 < data_size)
+ memcpy (&res[1], data, data_size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Sending result to client for operation #%" PRIu64 ": "
+ "%" PRId64 " (size: %u)\n",
+ client, GNUNET_ntohll (op_id), result_code, data_size);
+
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
+ GNUNET_NO);
+ GNUNET_free (res);
+}
+
+
/**
* Closure for join_mem_test_cb()
*/
struct JoinMemTestClosure
{
- struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
- struct Channel *ch;
+ struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
+ struct Channel *chn;
struct GNUNET_MULTICAST_JoinHandle *jh;
- struct MasterJoinRequest *master_join_req;
+ struct GNUNET_PSYC_JoinRequestMessage *join_msg;
};
* Membership test result callback used for join requests.
*/
static void
-join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
+join_mem_test_cb (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct JoinMemTestClosure *jcls = cls;
- if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master)
+ if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
{ /* Pass on join request to client if this is a master channel */
- struct Master *mst = (struct Master *) jcls->ch;
- struct GNUNET_HashCode slave_key_hash;
- GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
- &slave_key_hash);
- GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
+ struct Master *mst = (struct Master *) jcls->chn;
+ struct GNUNET_HashCode slave_pub_hash;
+ GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
+ &slave_pub_hash);
+ GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- msg_to_clients (jcls->ch, &jcls->master_join_req->header);
+ client_send_msg (jcls->chn, &jcls->join_msg->header);
}
else
{
+ if (GNUNET_SYSERR == result)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Could not perform membership test (%.*s)\n",
+ err_msg_size, err_msg);
+ }
// FIXME: add relays
GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
}
- GNUNET_free (jcls->master_join_req);
+ GNUNET_free (jcls->join_msg);
GNUNET_free (jcls);
}
* Incoming join request from multicast.
*/
static void
-mcast_join_request_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- const struct GNUNET_MessageHeader *join_msg,
- struct GNUNET_MULTICAST_JoinHandle *jh)
+mcast_recv_join_request (void *cls,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
+ const struct GNUNET_MessageHeader *join_msg,
+ struct GNUNET_MULTICAST_JoinHandle *jh)
{
- struct Channel *ch = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch);
+ struct Channel *chn = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
uint16_t join_msg_size = 0;
if (NULL != join_msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"%p Got join message with invalid type %u.\n",
- ch, ntohs (join_msg->type));
+ chn, ntohs (join_msg->type));
}
}
- struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_msg_size);
+ struct GNUNET_PSYC_JoinRequestMessage *
+ req = GNUNET_malloc (sizeof (*req) + join_msg_size);
req->header.size = htons (sizeof (*req) + join_msg_size);
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
- req->slave_key = *slave_key;
+ req->slave_pub_key = *slave_pub_key;
if (0 < join_msg_size)
memcpy (&req[1], join_msg, join_msg_size);
struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
- jcls->slave_key = *slave_key;
- jcls->ch = ch;
+ jcls->slave_pub_key = *slave_pub_key;
+ jcls->chn = chn;
jcls->jh = jh;
- jcls->master_join_req = req;
+ jcls->join_msg = req;
- GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key,
- ch->max_message_id, 0,
+ GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
+ chn->max_message_id, 0,
&join_mem_test_cb, jcls);
}
* Join decision received from multicast.
*/
static void
-mcast_join_decision_cb (void *cls, int is_admitted,
- const struct GNUNET_PeerIdentity *peer,
- uint16_t relay_count,
- const struct GNUNET_PeerIdentity *relays,
- const struct GNUNET_MessageHeader *join_resp)
+mcast_recv_join_decision (void *cls, int is_admitted,
+ const struct GNUNET_PeerIdentity *peer,
+ uint16_t relay_count,
+ const struct GNUNET_PeerIdentity *relays,
+ const struct GNUNET_MessageHeader *join_resp)
{
struct Slave *slv = cls;
- struct Channel *ch = &slv->ch;
+ struct Channel *chn = &slv->chn;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Got join decision: %d\n", slv, is_admitted);
+ if (GNUNET_YES == chn->is_ready)
+ {
+ /* Already admitted */
+ return;
+ }
uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
- struct SlaveJoinDecision *
+ struct GNUNET_PSYC_JoinDecisionMessage *
dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
if (0 < join_resp_size)
memcpy (&dcsn[1], join_resp, join_resp_size);
- msg_to_clients (ch, &dcsn->header);
+ client_send_msg (chn, &dcsn->header);
- if (GNUNET_YES == is_admitted)
- {
- ch->ready = GNUNET_YES;
- }
- else
+ if (GNUNET_YES == is_admitted
+ && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
{
- slv->member = NULL;
+ chn->is_ready = GNUNET_YES;
}
}
-static void
-mcast_membership_test_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- uint64_t message_id, uint64_t group_generation,
- struct GNUNET_MULTICAST_MembershipTestHandle *mth)
+static int
+store_recv_fragment_replay (void *cls,
+ struct GNUNET_MULTICAST_MessageHeader *msg,
+ enum GNUNET_PSYCSTORE_MessageFlags flags)
{
+ struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
+ GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
+ return GNUNET_YES;
}
+/**
+ * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
+ */
static void
-mcast_replay_fragment_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- uint64_t fragment_id, uint64_t flags,
- struct GNUNET_MULTICAST_ReplayHandle *rh)
-
+store_recv_fragment_replay_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
+ struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
+ rh, result, err_msg_size, err_msg);
+
+ switch (result)
+ {
+ case GNUNET_YES:
+ break;
+ case GNUNET_NO:
+ GNUNET_MULTICAST_replay_response (rh, NULL,
+ GNUNET_MULTICAST_REC_NOT_FOUND);
+ break;
+
+ case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
+ GNUNET_MULTICAST_replay_response (rh, NULL,
+ GNUNET_MULTICAST_REC_ACCESS_DENIED);
+ break;
+
+ case GNUNET_SYSERR:
+ GNUNET_MULTICAST_replay_response (rh, NULL,
+ GNUNET_MULTICAST_REC_INTERNAL_ERROR);
+ break;
+ }
+ GNUNET_MULTICAST_replay_response_end (rh);
}
+/**
+ * Incoming fragment replay request from multicast.
+ */
static void
-mcast_replay_message_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- uint64_t message_id,
- uint64_t fragment_offset,
- uint64_t flags,
- struct GNUNET_MULTICAST_ReplayHandle *rh)
-{
+mcast_recv_replay_fragment (void *cls,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
+ uint64_t fragment_id, uint64_t flags,
+ struct GNUNET_MULTICAST_ReplayHandle *rh)
+{
+ struct Channel *chn = cls;
+ GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
+ fragment_id, fragment_id,
+ &store_recv_fragment_replay,
+ &store_recv_fragment_replay_result, rh);
}
+/**
+ * Incoming message replay request from multicast.
+ */
static void
-fragment_store_result (void *cls, int64_t result, const char *err_msg)
+mcast_recv_replay_message (void *cls,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
+ uint64_t message_id,
+ uint64_t fragment_offset,
+ uint64_t flags,
+ struct GNUNET_MULTICAST_ReplayHandle *rh)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "fragment_store() returned %l (%s)\n", result, err_msg);
+ struct Channel *chn = cls;
+ GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
+ message_id, message_id, 1, NULL,
+ &store_recv_fragment_replay,
+ &store_recv_fragment_replay_result, rh);
}
n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
- *key = (struct GNUNET_HashCode) {{ 0 }};
+ *key = (struct GNUNET_HashCode) {};
*((uint64_t *) key)
= (n << 32) | (n >> 32);
}
#if __BYTE_ORDER == __BIG_ENDIAN
hash_key_from_nll (key, n);
#elif __BYTE_ORDER == __LITTLE_ENDIAN
- *key = (struct GNUNET_HashCode) {{ 0 }};
+ *key = (struct GNUNET_HashCode) {};
*((uint64_t *) key) = n;
#else
#error byteorder undefined
/**
- * Send multicast message to all clients connected to the channel.
+ * Initialize PSYC message header.
*/
-static void
-mmsg_to_clients (struct Channel *ch,
- const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+static inline void
+psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
{
uint16_t size = ntohs (mmsg->header.size);
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+ pmsg->header.size = htons (psize);
+ pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ pmsg->message_id = mmsg->message_id;
+ pmsg->fragment_offset = mmsg->fragment_offset;
+ pmsg->flags = htonl (flags);
+
+ memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+}
+
+
+/**
+ * Create a new PSYC message from a multicast message for sending it to clients.
+ */
+static inline struct GNUNET_PSYC_MessageHeader *
+psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
+{
struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t size = ntohs (mmsg->header.size);
uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+ pmsg = GNUNET_malloc (psize);
+ psyc_msg_init (pmsg, mmsg, flags);
+ return pmsg;
+}
+
+
+/**
+ * Send multicast message to all clients connected to the channel.
+ */
+static void
+client_send_mcast_msg (struct Channel *chn,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg,
+ uint32_t flags)
+{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Sending message to client. "
+ "%p Sending multicast message to client. "
"fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
- ch, GNUNET_ntohll (mmsg->fragment_id),
+ chn, GNUNET_ntohll (mmsg->fragment_id),
GNUNET_ntohll (mmsg->message_id));
+ struct GNUNET_PSYC_MessageHeader *
+ pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
+ client_send_msg (chn, &pmsg->header);
+ GNUNET_free (pmsg);
+}
+
+
+/**
+ * Send multicast request to all clients connected to the channel.
+ */
+static void
+client_send_mcast_req (struct Master *mst,
+ const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+ struct Channel *chn = &mst->chn;
+
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t size = ntohs (req->header.size);
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Sending multicast request to client. "
+ "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
+ chn, GNUNET_ntohll (req->fragment_id),
+ GNUNET_ntohll (req->request_id));
+
pmsg = GNUNET_malloc (psize);
pmsg->header.size = htons (psize);
pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
- pmsg->message_id = mmsg->message_id;
+ pmsg->message_id = req->request_id;
+ pmsg->fragment_offset = req->fragment_offset;
+ pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
+ pmsg->slave_pub_key = req->member_pub_key;
+ memcpy (&pmsg[1], &req[1], size - sizeof (*req));
+
+ client_send_msg (chn, &pmsg->header);
+
+ /* FIXME: save req to PSYCstore so that it can be resent later to clients */
- memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
- msg_to_clients (ch, &pmsg->header);
GNUNET_free (pmsg);
}
/**
* Insert a multicast message fragment into the queue belonging to the message.
*
- * @param ch Channel.
+ * @param chn Channel.
* @param mmsg Multicast message fragment.
* @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
* @param first_ptype First PSYC message part type in @a mmsg.
* @param last_ptype Last PSYC message part type in @a mmsg.
*/
static void
-fragment_queue_insert (struct Channel *ch,
+fragment_queue_insert (struct Channel *chn,
const struct GNUNET_MULTICAST_MessageHeader *mmsg,
uint16_t first_ptype, uint16_t last_ptype)
{
const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
struct GNUNET_CONTAINER_MultiHashMap
*chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
- &ch->pub_key_hash);
+ &chn->pub_key_hash);
struct GNUNET_HashCode msg_id_hash;
hash_key_from_nll (&msg_id_hash, mmsg->message_id);
struct FragmentQueue
- *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
+ *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
if (NULL == fragq)
{
fragq->fragments
= GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq,
+ GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
if (NULL == chan_msgs)
{
chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs,
+ GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
}
}
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Adding message fragment to cache. "
- "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
- "header_size: %" PRIu64 " + %u).\n",
- ch, GNUNET_ntohll (mmsg->message_id),
- GNUNET_ntohll (mmsg->fragment_id),
- fragq->header_size, size);
+ "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
+ chn, GNUNET_ntohll (mmsg->message_id),
+ GNUNET_ntohll (mmsg->fragment_id));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p header_size: %" PRIu64 " + %u\n",
+ chn, fragq->header_size, size);
cache_entry = GNUNET_new (struct RecvCacheEntry);
cache_entry->ref_count = 1;
cache_entry->mmsg = GNUNET_malloc (size);
"%p Message fragment is already in cache. "
"message_id: %" PRIu64 ", fragment_id: %" PRIu64
", ref_count: %u\n",
- ch, GNUNET_ntohll (mmsg->message_id),
+ chn, GNUNET_ntohll (mmsg->message_id),
GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
}
{ /* header is now complete */
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Header of message %" PRIu64 " is complete.\n",
- ch, GNUNET_ntohll (mmsg->message_id));
+ chn, GNUNET_ntohll (mmsg->message_id));
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Adding message %" PRIu64 " to queue.\n",
- ch, GNUNET_ntohll (mmsg->message_id));
+ chn, GNUNET_ntohll (mmsg->message_id));
fragq->state = MSG_FRAG_STATE_DATA;
}
else
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Header of message %" PRIu64 " is NOT complete yet: "
"%" PRIu64 " != %" PRIu64 "\n",
- ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
- fragq->header_size);
+ chn, GNUNET_ntohll (mmsg->message_id),
+ frag_offset, fragq->header_size);
}
}
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Message %" PRIu64 " is NOT complete yet: "
"%" PRIu64 " != %" PRIu64 "\n",
- ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
- fragq->size);
+ chn, GNUNET_ntohll (mmsg->message_id),
+ frag_offset, fragq->size);
break;
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
case MSG_FRAG_STATE_DATA:
case MSG_FRAG_STATE_END:
case MSG_FRAG_STATE_CANCEL:
- if (GNUNET_NO == fragq->queued)
+ if (GNUNET_NO == fragq->is_queued)
{
- GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL,
+ GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
GNUNET_ntohll (mmsg->message_id));
- fragq->queued = GNUNET_YES;
+ fragq->is_queued = GNUNET_YES;
}
}
* Send fragments of a message in order to client, after all modifiers arrived
* from multicast.
*
- * @param ch Channel.
- * @param msg_id ID of the message @a fragq belongs to.
- * @param fragq Fragment queue of the message.
- * @param drop Drop message without delivering to client?
- * #GNUNET_YES or #GNUNET_NO.
+ * @param chn
+ * Channel.
+ * @param msg_id
+ * ID of the message @a fragq belongs to.
+ * @param fragq
+ * Fragment queue of the message.
+ * @param drop
+ * Drop message without delivering to client?
+ * #GNUNET_YES or #GNUNET_NO.
*/
static void
-fragment_queue_run (struct Channel *ch, uint64_t msg_id,
+fragment_queue_run (struct Channel *chn, uint64_t msg_id,
struct FragmentQueue *fragq, uint8_t drop)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Running message fragment queue for message %" PRIu64
" (state: %u).\n",
- ch, msg_id, fragq->state);
+ chn, msg_id, fragq->state);
struct GNUNET_CONTAINER_MultiHashMap
*chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
- &ch->pub_key_hash);
+ &chn->pub_key_hash);
GNUNET_assert (NULL != chan_msgs);
uint64_t frag_id;
{
if (GNUNET_NO == drop)
{
- mmsg_to_clients (ch, cache_entry->mmsg);
+ client_send_mcast_msg (chn, cache_entry->mmsg, 0);
}
if (cache_entry->ref_count <= 1)
{
if (MSG_FRAG_STATE_END <= fragq->state)
{
struct GNUNET_HashCode msg_id_hash;
- hash_key_from_nll (&msg_id_hash, msg_id);
+ hash_key_from_hll (&msg_id_hash, msg_id);
- GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq);
+ GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
GNUNET_CONTAINER_heap_destroy (fragq->fragments);
GNUNET_free (fragq);
}
else
{
- fragq->queued = GNUNET_NO;
+ fragq->is_queued = GNUNET_NO;
+ }
+}
+
+
+struct StateModifyClosure
+{
+ struct Channel *chn;
+ uint64_t msg_id;
+ struct GNUNET_HashCode msg_id_hash;
+};
+
+
+void
+store_recv_state_modify_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
+{
+ struct StateModifyClosure *mcls = cls;
+ struct Channel *chn = mcls->chn;
+ uint64_t msg_id = mcls->msg_id;
+
+ struct FragmentQueue *
+ fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
+ chn, result, err_msg_size, err_msg);
+
+ switch (result)
+ {
+ case GNUNET_OK:
+ case GNUNET_NO:
+ if (NULL != fragq)
+ fragq->state_is_modified = GNUNET_YES;
+ if (chn->max_state_message_id < msg_id)
+ chn->max_state_message_id = msg_id;
+ if (chn->max_message_id < msg_id)
+ chn->max_message_id = msg_id;
+
+ if (NULL != fragq)
+ fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
+ GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
+ message_queue_run (chn);
+ break;
+
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
+ chn, result, err_msg_size, err_msg);
+ /** @todo FIXME: handle state_modify error */
}
}
* - A stateful message is only sent if the previous stateful message
* has already been delivered to the client.
*
- * @param ch Channel.
+ * @param chn Channel.
*
* @return Number of messages removed from queue and sent to client.
*/
static uint64_t
-message_queue_run (struct Channel *ch)
+message_queue_run (struct Channel *chn)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Running message queue.\n", ch);
+ "%p Running message queue.\n", chn);
uint64_t n = 0;
uint64_t msg_id;
- while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
+
+ while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
&msg_id))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id);
+ "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
struct GNUNET_HashCode msg_id_hash;
hash_key_from_hll (&msg_id_hash, msg_id);
struct FragmentQueue *
- fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
+ fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p No fragq (%p) or header not complete.\n",
- ch, fragq);
+ chn, fragq);
break;
}
- if (MSG_FRAG_STATE_HEADER == fragq->state)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Fragment queue entry: state: %u, state delta: "
+ "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
+ chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
+
+ if (MSG_FRAG_STATE_DATA <= fragq->state)
{
/* Check if there's a missing message before the current one */
if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
+
if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
- && msg_id - 1 != ch->max_message_id)
+ && (chn->max_message_id != msg_id - 1
+ && chn->max_message_id != msg_id))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Out of order message. "
- "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
- ch, msg_id, ch->max_message_id);
+ "(%" PRIu64 " != %" PRIu64 " - 1)\n",
+ chn, chn->max_message_id, msg_id);
break;
+ // FIXME: keep track of messages processed in this queue run,
+ // and only stop after reaching the end
}
}
else
{
- if (msg_id - fragq->state_delta != ch->max_state_message_id)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
+ if (GNUNET_YES != fragq->state_is_modified)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Out of order stateful message. "
- "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
- ch, msg_id, fragq->state_delta, ch->max_state_message_id);
- break;
+ if (msg_id - fragq->state_delta != chn->max_state_message_id)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Out of order stateful message. "
+ "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
+ chn, msg_id, fragq->state_delta, chn->max_state_message_id);
+ break;
+ // FIXME: keep track of messages processed in this queue run,
+ // and only stop after reaching the end
+ }
+
+ struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
+ mcls->chn = chn;
+ mcls->msg_id = msg_id;
+ mcls->msg_id_hash = msg_id_hash;
+
+ /* Apply modifiers to state in PSYCstore */
+ GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
+ fragq->state_delta,
+ store_recv_state_modify_result, mcls);
+ break; // continue after asynchronous state modify result
}
-#if TODO
- /* FIXME: apply modifiers to state in PSYCstore */
- GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id,
- state_modify_result_cb, cls);
-#endif
- ch->max_state_message_id = msg_id;
}
- ch->max_message_id = msg_id;
+ chn->max_message_id = msg_id;
}
- fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
- GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
+ fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
+ GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
n++;
}
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
+ "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
return n;
}
*
* Remove all messages in queue without sending it to clients.
*
- * @param ch Channel.
+ * @param chn Channel.
*
* @return Number of messages removed from queue.
*/
static uint64_t
-message_queue_drop (struct Channel *ch)
+message_queue_drop (struct Channel *chn)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping message queue.\n", ch);
+ "%p Dropping message queue.\n", chn);
uint64_t n = 0;
uint64_t msg_id;
- while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
+ while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
&msg_id))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping message %" PRIu64 " from queue.\n", ch, msg_id);
+ "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
struct GNUNET_HashCode msg_id_hash;
hash_key_from_hll (&msg_id_hash, msg_id);
struct FragmentQueue *
- fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
-
- fragment_queue_run (ch, msg_id, fragq, GNUNET_YES);
- GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
+ fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
+ GNUNET_assert (NULL != fragq);
+ fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
+ GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
n++;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
+ "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
return n;
}
/**
- * Handle incoming message from multicast.
- *
- * @param ch Channel.
- * @param mmsg Multicast message.
- *
- * @return #GNUNET_OK or #GNUNET_SYSERR
+ * Received result of GNUNET_PSYCSTORE_fragment_store().
*/
-static int
-client_multicast_message (struct Channel *ch,
- const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+static void
+store_recv_fragment_store_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
- GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
-
- uint16_t size = ntohs (mmsg->header.size);
- uint16_t first_ptype = 0, last_ptype = 0;
-
- if (GNUNET_SYSERR
- == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
- (const char *) &mmsg[1],
- &first_ptype, &last_ptype))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Received message with invalid parts from multicast. "
- "Dropping message.\n", ch);
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
-
+ struct Channel *chn = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Message parts: first: type %u, last: type %u\n",
- first_ptype, last_ptype);
-
- fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
- message_queue_run (ch);
-
- return GNUNET_OK;
+ "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
+ chn, result, err_msg_size, err_msg);
}
/**
- * Incoming message fragment from multicast.
+ * Handle incoming message fragment from multicast.
*
- * Store it using PSYCstore and send it to the client of the channel.
+ * Store it using PSYCstore and send it to the clients of the channel in order.
*/
static void
-mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
{
- struct Channel *ch = cls;
- uint16_t type = ntohs (msg->type);
- uint16_t size = ntohs (msg->size);
+ struct Channel *chn = cls;
+ uint16_t size = ntohs (mmsg->header.size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received message of type %u and size %u from multicast.\n",
- ch, type, size);
+ "%p Received multicast message of size %u. "
+ "fragment_id=%" PRIu64 ", message_id=%" PRIu64
+ ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
+ chn, size,
+ GNUNET_ntohll (mmsg->fragment_id),
+ GNUNET_ntohll (mmsg->message_id),
+ GNUNET_ntohll (mmsg->fragment_offset),
+ GNUNET_ntohll (mmsg->flags));
+
+ GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
+ &store_recv_fragment_store_result, chn);
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
+ uint16_t first_ptype = 0, last_ptype = 0;
+ int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
+ (const char *) &mmsg[1],
+ &first_ptype, &last_ptype);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Message check result %d, first part type %u, last part type %u\n",
+ chn, check, first_ptype, last_ptype);
+ if (GNUNET_SYSERR == check)
{
- client_multicast_message (ch, (const struct
- GNUNET_MULTICAST_MessageHeader *) msg);
- break;
- }
- default:
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping unknown message of type %u and size %u.\n",
- ch, type, size);
+ "%p Dropping incoming multicast message with invalid parts.\n",
+ chn);
+ GNUNET_break_op (0);
+ return;
}
+
+ fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
+ message_queue_run (chn);
}
/**
* Incoming request fragment from multicast for a master.
*
- * @param cls Master.
- * @param slave_key Sending slave's public key.
- * @param msg The message.
- * @param flags Request flags.
+ * @param cls Master.
+ * @param req The request.
*/
static void
-mcast_request_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- const struct GNUNET_MessageHeader *msg,
- enum GNUNET_MULTICAST_MessageFlags flags)
+mcast_recv_request (void *cls,
+ const struct GNUNET_MULTICAST_RequestHeader *req)
{
struct Master *mst = cls;
- struct Channel *ch = &mst->ch;
-
- uint16_t type = ntohs (msg->type);
- uint16_t size = ntohs (msg->size);
+ uint16_t size = ntohs (req->header.size);
+ char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received request of type %u and size %u from multicast.\n",
- ch, type, size);
+ "%p Received multicast request of size %u from %s.\n",
+ mst, size, str);
+ GNUNET_free (str);
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
+ uint16_t first_ptype = 0, last_ptype = 0;
+ if (GNUNET_SYSERR
+ == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
+ (const char *) &req[1],
+ &first_ptype, &last_ptype))
{
- const struct GNUNET_MULTICAST_RequestHeader *req
- = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
-
- /* FIXME: see message_cb() */
- if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
- (const char *) &req[1],
- NULL, NULL))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping request with invalid parts "
- "received from multicast.\n", ch);
- GNUNET_break_op (0);
- break;
- }
-
- struct GNUNET_PSYC_MessageHeader *pmsg;
- uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
- pmsg = GNUNET_malloc (psize);
- pmsg->header.size = htons (psize);
- pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
- pmsg->message_id = req->request_id;
- pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
-
- memcpy (&pmsg[1], &req[1], size - sizeof (*req));
- msg_to_clients (ch, &pmsg->header);
- GNUNET_free (pmsg);
- break;
- }
- default:
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Dropping unknown request of type %u and size %u.\n",
- ch, type, size);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Dropping incoming multicast request with invalid parts.\n",
+ mst);
GNUNET_break_op (0);
+ return;
}
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Message parts: first: type %u, last: type %u\n",
+ first_ptype, last_ptype);
+
+ /* FIXME: in-order delivery */
+ client_send_mcast_req (mst, req);
}
* Response from PSYCstore with the current counter values for a channel master.
*/
static void
-master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
- uint64_t max_message_id, uint64_t max_group_generation,
- uint64_t max_state_message_id)
+store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
+ uint64_t max_message_id, uint64_t max_group_generation,
+ uint64_t max_state_message_id)
{
struct Master *mst = cls;
- struct Channel *ch = &mst->ch;
- ch->store_op = NULL;
+ struct Channel *chn = &mst->chn;
+ chn->store_op = NULL;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (result);
if (GNUNET_OK == result || GNUNET_NO == result)
{
mst->max_message_id = max_message_id;
- ch->max_message_id = max_message_id;
- ch->max_state_message_id = max_state_message_id;
+ chn->max_message_id = max_message_id;
+ chn->max_state_message_id = max_state_message_id;
mst->max_group_generation = max_group_generation;
mst->origin
= GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
- &mcast_join_request_cb,
- &mcast_membership_test_cb,
- &mcast_replay_fragment_cb,
- &mcast_replay_message_cb,
- &mcast_request_cb,
- &mcast_message_cb, ch);
- ch->ready = GNUNET_YES;
+ mcast_recv_join_request,
+ mcast_recv_replay_fragment,
+ mcast_recv_replay_message,
+ mcast_recv_request,
+ mcast_recv_message, chn);
+ chn->is_ready = GNUNET_YES;
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"%p GNUNET_PSYCSTORE_counters_get() "
"returned %d for channel %s.\n",
- ch, result, GNUNET_h2s (&ch->pub_key_hash));
+ chn, result, GNUNET_h2s (&chn->pub_key_hash));
}
- msg_to_clients (ch, &res.header);
+ client_send_msg (chn, &res.header);
}
* Response from PSYCstore with the current counter values for a channel slave.
*/
void
-slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
- uint64_t max_message_id, uint64_t max_group_generation,
- uint64_t max_state_message_id)
+store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
+ uint64_t max_message_id, uint64_t max_group_generation,
+ uint64_t max_state_message_id)
{
struct Slave *slv = cls;
- struct Channel *ch = &slv->ch;
- ch->store_op = NULL;
+ struct Channel *chn = &slv->chn;
+ chn->store_op = NULL;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (result);
if (GNUNET_OK == result || GNUNET_NO == result)
{
- ch->max_message_id = max_message_id;
- ch->max_state_message_id = max_state_message_id;
+ chn->max_message_id = max_message_id;
+ chn->max_state_message_id = max_state_message_id;
slv->member
- = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
+ = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
&slv->origin,
slv->relay_count, slv->relays,
- slv->join_req,
- &mcast_join_request_cb,
- &mcast_join_decision_cb,
- &mcast_membership_test_cb,
- &mcast_replay_fragment_cb,
- &mcast_replay_message_cb,
- &mcast_message_cb, ch);
+ &slv->join_msg->header,
+ mcast_recv_join_request,
+ mcast_recv_join_decision,
+ mcast_recv_replay_fragment,
+ mcast_recv_replay_message,
+ mcast_recv_message, chn);
+ if (NULL != slv->join_msg)
+ {
+ GNUNET_free (slv->join_msg);
+ slv->join_msg = NULL;
+ }
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"%p GNUNET_PSYCSTORE_counters_get() "
"returned %d for channel %s.\n",
- ch, result, GNUNET_h2s (&ch->pub_key_hash));
+ chn, result, GNUNET_h2s (&chn->pub_key_hash));
}
- msg_to_clients (ch, &res.header);
+ client_send_msg (chn, &res.header);
}
static void
-channel_init (struct Channel *ch)
+channel_init (struct Channel *chn)
{
- ch->recv_msgs
+ chn->recv_msgs
= GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+ chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
}
* Handle a connecting client starting a channel master.
*/
static void
-client_master_start (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
const struct MasterStartRequest *req
= (const struct MasterStartRequest *) msg;
struct Master *
mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
- struct Channel *ch;
+ struct Channel *chn;
if (NULL == mst)
{
mst->priv_key = req->channel_key;
mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- ch = &mst->ch;
- ch->is_master = GNUNET_YES;
- ch->pub_key = pub_key;
- ch->pub_key_hash = pub_key_hash;
- channel_init (ch);
+ chn = &mst->chn;
+ chn->is_master = GNUNET_YES;
+ chn->pub_key = pub_key;
+ chn->pub_key_hash = pub_key_hash;
+ channel_init (chn);
- GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
+ GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
- master_counters_cb, mst);
+ chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
+ store_recv_master_counters, mst);
}
else
{
- ch = &mst->ch;
+ chn = &mst->chn;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (GNUNET_OK);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Client connected as master to channel %s.\n",
- mst, GNUNET_h2s (&ch->pub_key_hash));
+ mst, GNUNET_h2s (&chn->pub_key_hash));
- struct ClientList *cl = GNUNET_new (struct ClientList);
- cl->client = client;
- GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
+ struct Client *cli = GNUNET_new (struct Client);
+ cli->client = client;
+ GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
- GNUNET_SERVER_client_set_user_context (client, ch);
+ GNUNET_SERVER_client_set_user_context (client, chn);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
* Handle a connecting client joining as a channel slave.
*/
static void
-client_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
const struct SlaveJoinRequest *req
= (const struct SlaveJoinRequest *) msg;
+ uint16_t req_size = ntohs (req->header.size);
- struct GNUNET_CRYPTO_EddsaPublicKey slv_pub_key;
- struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
+ struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
+ struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
- GNUNET_CRYPTO_eddsa_key_get_public (&req->slave_key, &slv_pub_key);
- GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
- GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
+ GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
+ GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
+ GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
struct GNUNET_CONTAINER_MultiHashMap *
- ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
+ chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
struct Slave *slv = NULL;
- struct Channel *ch;
+ struct Channel *chn;
- if (NULL != ch_slv)
+ if (NULL != chn_slv)
{
- slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash);
+ slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
}
if (NULL == slv)
{
slv = GNUNET_new (struct Slave);
slv->priv_key = req->slave_key;
slv->pub_key = slv_pub_key;
- slv->pub_key_hash = slv_pub_key_hash;
+ slv->pub_key_hash = slv_pub_hash;
slv->origin = req->origin;
slv->relay_count = ntohl (req->relay_count);
+ slv->join_flags = ntohl (req->flags);
+
+ const struct GNUNET_PeerIdentity *
+ relays = (const struct GNUNET_PeerIdentity *) &req[1];
+ uint16_t relay_size = slv->relay_count * sizeof (*relays);
+ uint16_t join_msg_size = 0;
+
+ if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
+ <= req_size)
+ {
+ struct GNUNET_PSYC_Message *
+ join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
+ join_msg_size = ntohs (join_msg->header.size);
+ slv->join_msg = GNUNET_malloc (join_msg_size);
+ memcpy (slv->join_msg, join_msg, join_msg_size);
+ }
+ if (sizeof (*req) + relay_size + join_msg_size != req_size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%u + %u + %u != %u\n",
+ sizeof (*req), relay_size, join_msg_size, req_size);
+ GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (client);
+ GNUNET_free (slv);
+ return;
+ }
if (0 < slv->relay_count)
{
- const struct GNUNET_PeerIdentity *relays
- = (const struct GNUNET_PeerIdentity *) &req[1];
- slv->relays
- = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
- uint32_t i;
- for (i = 0; i < slv->relay_count; i++)
- memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+ slv->relays = GNUNET_malloc (relay_size);
+ memcpy (slv->relays, &req[1], relay_size);
}
- ch = &slv->ch;
- ch->is_master = GNUNET_NO;
- ch->pub_key = req->channel_key;
- ch->pub_key_hash = pub_key_hash;
- channel_init (ch);
+ chn = &slv->chn;
+ chn->is_master = GNUNET_NO;
+ chn->pub_key = req->channel_pub_key;
+ chn->pub_key_hash = pub_key_hash;
+ channel_init (chn);
- if (NULL == ch_slv)
+ if (NULL == chn_slv)
{
- ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
- GNUNET_CONTAINER_multihashmap_put (channel_slaves, &ch->pub_key_hash, ch_slv,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
}
- GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv->pub_key_hash, ch,
+ GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
- GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
+ GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
- slave_counters_cb, slv);
+ chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
+ &store_recv_slave_counters, slv);
}
else
{
- ch = &slv->ch;
+ chn = &slv->chn;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (GNUNET_OK);
- res.max_message_id = GNUNET_htonll (ch->max_message_id);
+ res.max_message_id = GNUNET_htonll (chn->max_message_id);
GNUNET_SERVER_notification_context_add (nc, client);
GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
GNUNET_NO);
- if (NULL == slv->member)
+ if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
+ {
+ mcast_recv_join_decision (slv, GNUNET_YES,
+ NULL, 0, NULL, NULL);
+ }
+ else if (NULL == slv->member)
{
slv->member
- = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
+ = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
&slv->origin,
slv->relay_count, slv->relays,
- slv->join_req,
- &mcast_join_request_cb,
- &mcast_join_decision_cb,
- &mcast_membership_test_cb,
- &mcast_replay_fragment_cb,
- &mcast_replay_message_cb,
- &mcast_message_cb, ch);
-
+ &slv->join_msg->header,
+ &mcast_recv_join_request,
+ &mcast_recv_join_decision,
+ &mcast_recv_replay_fragment,
+ &mcast_recv_replay_message,
+ &mcast_recv_message, chn);
+ if (NULL != slv->join_msg)
+ {
+ GNUNET_free (slv->join_msg);
+ slv->join_msg = NULL;
+ }
}
else if (NULL != slv->join_dcsn)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Client connected as slave to channel %s.\n",
- slv, GNUNET_h2s (&ch->pub_key_hash));
+ slv, GNUNET_h2s (&chn->pub_key_hash));
- struct ClientList *cl = GNUNET_new (struct ClientList);
- cl->client = client;
- GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
+ struct Client *cli = GNUNET_new (struct Client);
+ cli->client = client;
+ GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
- GNUNET_SERVER_client_set_user_context (client, &slv->ch);
+ GNUNET_SERVER_client_set_user_context (client, chn);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
/**
- * Iterator callback for responding to join requests of a slave.
+ * Iterator callback for sending join decisions to multicast.
*/
static int
-send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
- void *jh)
+mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
+ void *value)
{
struct JoinDecisionClosure *jcls = cls;
+ struct GNUNET_MULTICAST_JoinHandle *jh = value;
// FIXME: add relays
GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
return GNUNET_YES;
* Join decision from client.
*/
static void
-client_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
- struct Channel *
- ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (GNUNET_YES == ch->is_master);
- struct Master *mst = (struct Master *) ch;
-
- struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg;
+ const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
+ = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
+ struct Channel *chn;
+ struct Master *mst;
struct JoinDecisionClosure jcls;
+
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ if (NULL == chn)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ GNUNET_assert (GNUNET_YES == chn->is_master);
+ mst = (struct Master *) chn;
jcls.is_admitted = ntohl (dcsn->is_admitted);
jcls.msg
- = (sizeof (*dcsn) + sizeof (struct GNUNET_PSYC_MessageHeader)
- <= ntohs (msg->size))
+ = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
? (struct GNUNET_MessageHeader *) &dcsn[1]
: NULL;
- struct GNUNET_HashCode slave_key_hash;
- GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
- &slave_key_hash);
+ struct GNUNET_HashCode slave_pub_hash;
+ GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
+ &slave_pub_hash);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Got join decision (%d) from client for channel %s..\n",
- mst, jcls.is_admitted, GNUNET_h2s (&ch->pub_key_hash));
+ mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p ..and slave %s.\n",
- mst, GNUNET_h2s (&slave_key_hash));
+ mst, GNUNET_h2s (&slave_pub_hash));
- GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
- &send_join_decision_cb, &jcls);
- GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
+ GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
+ &mcast_send_join_decision, &jcls);
+ GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
*
* Sent after a message fragment has been passed on to multicast.
*
- * @param ch The channel struct for the client.
+ * @param chn The channel struct for the client.
*/
static void
-send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client)
+send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
{
struct GNUNET_MessageHeader res;
res.size = htons (sizeof (res));
static int
transmit_notify (void *cls, size_t *data_size, void *data)
{
- struct Channel *ch = cls;
- struct TransmitMessage *tmit_msg = ch->tmit_head;
+ struct Channel *chn = cls;
+ struct TransmitMessage *tmit_msg = chn->tmit_head;
if (NULL == tmit_msg || *data_size < tmit_msg->size)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p transmit_notify: nothing to send.\n", ch);
+ "%p transmit_notify: nothing to send.\n", chn);
+ if (NULL != tmit_msg && *data_size < tmit_msg->size)
+ GNUNET_break (0);
*data_size = 0;
return GNUNET_NO;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
+ "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
*data_size = tmit_msg->size;
memcpy (data, &tmit_msg[1], *data_size);
- int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
+ int ret
+ = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
+ ? GNUNET_NO
+ : GNUNET_YES;
+
+ /* FIXME: handle disconnecting clients */
if (NULL != tmit_msg->client)
- send_message_ack (ch, tmit_msg->client);
+ send_message_ack (chn, tmit_msg->client);
- GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
+ GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
GNUNET_free (tmit_msg);
- if (0 == ch->tmit_task)
+ if (NULL != chn->tmit_head)
{
- if (NULL != ch->tmit_head)
- {
- transmit_message (ch);
- }
- else if (ch->disconnected)
- {
- /* FIXME: handle partial message (when still in_transmit) */
- cleanup_channel (ch);
- }
+ GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn);
+ }
+ else if (GNUNET_YES == chn->is_disconnected
+ && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
+ {
+ /* FIXME: handle partial message (when still in_transmit) */
+ return GNUNET_SYSERR;
}
-
return ret;
}
static void
master_transmit_message (struct Master *mst)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
- mst->ch.tmit_task = 0;
+ struct Channel *chn = &mst->chn;
+ struct TransmitMessage *tmit_msg = chn->tmit_head;
+ if (NULL == tmit_msg)
+ return;
if (NULL == mst->tmit_handle)
{
mst->tmit_handle
- = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
+ = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id,
mst->max_group_generation,
master_transmit_notify, mst);
}
static void
slave_transmit_message (struct Slave *slv)
{
- slv->ch.tmit_task = 0;
+ if (NULL == slv->chn.tmit_head)
+ return;
if (NULL == slv->tmit_handle)
{
slv->tmit_handle
- = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
+ = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
slave_transmit_notify, slv);
}
else
}
-static inline void
-transmit_message (struct Channel *ch)
+static void
+transmit_message (struct Channel *chn)
{
- ch->is_master
- ? master_transmit_message ((struct Master *) ch)
- : slave_transmit_message ((struct Slave *) ch);
+ chn->is_master
+ ? master_transmit_message ((struct Master *) chn)
+ : slave_transmit_message ((struct Slave *) chn);
}
* Queue a message from a channel master for sending to the multicast group.
*/
static void
-master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
- uint16_t first_ptype, uint16_t last_ptype)
+master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
{
tmit_msg->id = ++mst->max_message_id;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p master_queue_message: message_id=%" PRIu64 "\n",
+ mst, tmit_msg->id);
struct GNUNET_PSYC_MessageMethod *pmeth
= (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
}
else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p master_queue_message: state_delta=%" PRIu64 "\n",
+ mst, tmit_msg->id - mst->max_state_message_id);
pmeth->state_delta = GNUNET_htonll (tmit_msg->id
- mst->max_state_message_id);
+ mst->max_state_message_id = tmit_msg->id;
}
else
{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p master_queue_message: state not modified\n", mst);
pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
}
+
+ if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
+ {
+ /// @todo add state_hash to PSYC header
+ }
}
}
* Queue a message from a channel slave for sending to the multicast group.
*/
static void
-slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
- uint16_t first_ptype, uint16_t last_ptype)
+slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
{
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
{
struct GNUNET_PSYC_MessageMethod *pmeth
= (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
/**
* Queue PSYC message parts for sending to multicast.
*
- * @param ch Channel to send to.
- * @param client Client the message originates from.
- * @param data_size Size of @a data.
- * @param data Concatenated message parts.
- * @param first_ptype First message part type in @a data.
- * @param last_ptype Last message part type in @a data.
+ * @param chn
+ * Channel to send to.
+ * @param client
+ * Client the message originates from.
+ * @param data_size
+ * Size of @a data.
+ * @param data
+ * Concatenated message parts.
+ * @param first_ptype
+ * First message part type in @a data.
+ * @param last_ptype
+ * Last message part type in @a data.
*/
-static void
-queue_message (struct Channel *ch,
+static struct TransmitMessage *
+queue_message (struct Channel *chn,
struct GNUNET_SERVER_Client *client,
size_t data_size,
const void *data,
memcpy (&tmit_msg[1], data, data_size);
tmit_msg->client = client;
tmit_msg->size = data_size;
- tmit_msg->state = ch->tmit_state;
+ tmit_msg->first_ptype = first_ptype;
+ tmit_msg->last_ptype = last_ptype;
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
+ /* FIXME: separate queue per message ID */
- ch->is_master
- ? master_queue_message ((struct Master *) ch, tmit_msg,
- first_ptype, last_ptype)
- : slave_queue_message ((struct Slave *) ch, tmit_msg,
- first_ptype, last_ptype);
+ GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
+
+ chn->is_master
+ ? master_queue_message ((struct Master *) chn, tmit_msg)
+ : slave_queue_message ((struct Slave *) chn, tmit_msg);
+ return tmit_msg;
}
/**
* Cancel transmission of current message.
*
- * @param ch Channel to send to.
+ * @param chn Channel to send to.
* @param client Client the message originates from.
*/
static void
-transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client)
+transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
{
uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
msg.size = htons (sizeof (msg));
msg.type = htons (type);
- queue_message (ch, client, sizeof (msg), &msg, type, type);
- transmit_message (ch);
+ queue_message (chn, client, sizeof (msg), &msg, type, type);
+ transmit_message (chn);
/* FIXME: cleanup */
}
* Incoming message from a master or slave client.
*/
static void
-client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
struct Channel *
- ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != ch);
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received message from client.\n", ch);
+ "%p Received message from client.\n", chn);
GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
- if (GNUNET_YES != ch->ready)
+ if (GNUNET_YES != chn->is_ready)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Channel is not ready, dropping message from client.\n", ch);
+ "%p Channel is not ready yet, disconnecting client.\n", chn);
+ GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
uint16_t size = ntohs (msg->size);
if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", ch);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p Message payload too large: %u < %u.\n",
+ chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
GNUNET_break (0);
- transmit_cancel (ch, client);
+ transmit_cancel (chn, client);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
uint16_t first_ptype = 0, last_ptype = 0;
if (GNUNET_SYSERR
- == GNUNET_PSYC_check_message_parts (size - sizeof (*msg),
+ == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
(const char *) &msg[1],
&first_ptype, &last_ptype))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "%p Received invalid message part from client.\n", ch);
+ "%p Received invalid message part from client.\n", chn);
GNUNET_break (0);
- transmit_cancel (ch, client);
+ transmit_cancel (chn, client);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received message with first part type %u and last part type %u.\n",
+ chn, first_ptype, last_ptype);
- queue_message (ch, client, size - sizeof (*msg), &msg[1],
+ queue_message (chn, client, size - sizeof (*msg), &msg[1],
first_ptype, last_ptype);
- transmit_message (ch);
+ transmit_message (chn);
+ /* FIXME: send a few ACKs even before transmit_notify is called */
GNUNET_SERVER_receive_done (client, GNUNET_OK);
};
/**
- * Client requests to add a slave to the membership database.
+ * Received result of GNUNET_PSYCSTORE_membership_store()
*/
static void
-client_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+store_recv_membership_store_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
+ struct Operation *op = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
+ op->chn, result, err_msg_size, err_msg);
+ if (NULL != op->client)
+ client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
+ op_remove (op);
}
/**
- * Client requests to remove a slave from the membership database.
+ * Client requests to add/remove a slave in the membership database.
*/
static void
-client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
+ struct Channel *
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
+
+ const struct ChannelMembershipStoreRequest *
+ req = (const struct ChannelMembershipStoreRequest *) msg;
+
+ struct Operation *op = op_add (chn, client, req->op_id, 0);
+
+ uint64_t announced_at = GNUNET_ntohll (req->announced_at);
+ uint64_t effective_since = GNUNET_ntohll (req->effective_since);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received membership store request from client.\n", chn);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
+ chn, req->did_join, announced_at, effective_since);
+ GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
+ req->did_join, announced_at, effective_since,
+ 0, /* FIXME: group_generation */
+ &store_recv_membership_store_result, op);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
/**
- * Client requests channel history from PSYCstore.
+ * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
+ * in response to a history request from a client.
+ */
+static int
+store_recv_fragment_history (void *cls,
+ struct GNUNET_MULTICAST_MessageHeader *mmsg,
+ enum GNUNET_PSYCSTORE_MessageFlags flags)
+{
+ struct Operation *op = cls;
+ if (NULL == op->client)
+ { /* Requesting client already disconnected. */
+ return GNUNET_NO;
+ }
+ struct Channel *chn = op->chn;
+
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t msize = ntohs (mmsg->header.size);
+ uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
+
+ struct GNUNET_OperationResultMessage *
+ res = GNUNET_malloc (sizeof (*res) + psize);
+ res->header.size = htons (sizeof (*res) + psize);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
+ res->op_id = op->op_id;
+ res->result_code = GNUNET_htonll (GNUNET_OK);
+
+ pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
+ GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
+ memcpy (&res[1], pmsg, psize);
+
+ /** @todo FIXME: send only to requesting client */
+ client_send_msg (chn, &res->header);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Received the result of GNUNET_PSYCSTORE_fragment_get(),
+ * in response to a history request from a client.
*/
static void
-client_story_request (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+store_recv_fragment_history_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
+ struct Operation *op = cls;
+ if (NULL == op->client)
+ { /* Requesting client already disconnected. */
+ return;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p History replay #%" PRIu64 ": "
+ "PSYCSTORE returned %" PRId64 " (%.*s)\n",
+ op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
+ if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
+ {
+ /** @todo Multicast replay request for messages not found locally. */
+ }
+
+ client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
+ op_remove (op);
}
/**
- * Client requests best matching state variable from PSYCstore.
+ * Client requests channel history.
*/
static void
-client_state_get (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
+ struct Channel *
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
+
+ const struct GNUNET_PSYC_HistoryRequestMessage *
+ req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
+ uint16_t size = ntohs (msg->size);
+ const char *method_prefix = (const char *) &req[1];
+ if (size < sizeof (*req) + 1
+ || '\0' != method_prefix[size - sizeof (*req) - 1])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p History replay #%" PRIu64 ": "
+ "invalid method prefix. size: %u < %u?\n",
+ chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
+
+ if (0 == req->message_limit)
+ GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
+ GNUNET_ntohll (req->start_message_id),
+ GNUNET_ntohll (req->end_message_id),
+ 0, method_prefix,
+ &store_recv_fragment_history,
+ &store_recv_fragment_history_result, op);
+ else
+ GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
+ GNUNET_ntohll (req->message_limit),
+ method_prefix,
+ &store_recv_fragment_history,
+ &store_recv_fragment_history_result,
+ op);
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
/**
- * Client requests state variables with a given prefix from PSYCstore.
+ * Received state var from PSYCstore, send it to client.
+ */
+static int
+store_recv_state_var (void *cls, const char *name,
+ const void *value, uint32_t value_size)
+{
+ struct Operation *op = cls;
+ struct GNUNET_OperationResultMessage *res;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
+ op->chn, GNUNET_ntohll (op->op_id), name);
+
+ if (NULL != name) /* First part */
+ {
+ uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
+ struct GNUNET_PSYC_MessageModifier *mod;
+ res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
+ res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+ res->op_id = op->op_id;
+
+ mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
+ mod->header.size = htons (sizeof (*mod) + name_size + value_size);
+ mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
+ mod->name_size = htons (name_size);
+ mod->value_size = htonl (value_size);
+ mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
+ memcpy (&mod[1], name, name_size);
+ memcpy (((char *) &mod[1]) + name_size, value, value_size);
+ }
+ else /* Continuation */
+ {
+ struct GNUNET_MessageHeader *mod;
+ res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
+ res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+ res->op_id = op->op_id;
+
+ mod = (struct GNUNET_MessageHeader *) &res[1];
+ mod->size = htons (sizeof (*mod) + value_size);
+ mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
+ memcpy (&mod[1], value, value_size);
+ }
+
+ // FIXME: client might have been disconnected
+ GNUNET_SERVER_notification_context_add (nc, op->client);
+ GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
+ GNUNET_NO);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Received result of GNUNET_PSYCSTORE_state_get()
+ * or GNUNET_PSYCSTORE_state_get_prefix()
*/
static void
-client_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+store_recv_state_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
+ struct Operation *op = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p state_get #%" PRIu64 ": "
+ "PSYCSTORE returned %" PRId64 " (%.*s)\n",
+ op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
+ // FIXME: client might have been disconnected
+ client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
+ op_remove (op);
}
/**
- * Initialize the PSYC service.
- *
- * @param cls Closure.
- * @param server The initialized server.
- * @param c Configuration to use.
+ * Client requests best matching state variable from PSYCstore.
*/
static void
-run (void *cls, struct GNUNET_SERVER_Handle *server,
- const struct GNUNET_CONFIGURATION_Handle *c)
+client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
- static const struct GNUNET_SERVER_MessageHandler handlers[] = {
- { &client_master_start, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
+ struct Channel *
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
+
+ const struct StateRequest *
+ req = (const struct StateRequest *) msg;
+
+ uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
+ const char *name = (const char *) &req[1];
+ if (0 == name_size || '\0' != name[name_size - 1])
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ struct Operation *op = op_add (chn, client, req->op_id, 0);
+ GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
+ &store_recv_state_var,
+ &store_recv_state_result, op);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Client requests state variables with a given prefix from PSYCstore.
+ */
+static void
+client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct Channel *
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
- { &client_slave_join, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
+ const struct StateRequest *
+ req = (const struct StateRequest *) msg;
- { &client_join_decision, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
+ uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
+ const char *name = (const char *) &req[1];
+ if (0 == name_size || '\0' != name[name_size - 1])
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
- { &client_psyc_message, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
+ struct Operation *op = op_add (chn, client, req->op_id, 0);
+ GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
+ &store_recv_state_var,
+ &store_recv_state_result, op);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
- { &client_slave_add, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
- { &client_slave_remove, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
+static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
+ { &client_recv_master_start, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
- { &client_story_request, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
+ { &client_recv_slave_join, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
- { &client_state_get, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
+ { &client_recv_join_decision, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
- { &client_state_get_prefix, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
- };
+ { &client_recv_psyc_message, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
+
+ { &client_recv_membership_store, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
+
+ { &client_recv_history_replay, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
+
+ { &client_recv_state_get, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
+
+ { &client_recv_state_get_prefix, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
+
+ { NULL, NULL, 0, 0 }
+};
+
+/**
+ * Initialize the PSYC service.
+ *
+ * @param cls Closure.
+ * @param server The initialized server.
+ * @param c Configuration to use.
+ */
+static void
+run (void *cls, struct GNUNET_SERVER_Handle *server,
+ const struct GNUNET_CONFIGURATION_Handle *c)
+{
cfg = c;
store = GNUNET_PSYCSTORE_connect (cfg);
stats = GNUNET_STATISTICS_create ("psyc", cfg);
masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
nc = GNUNET_SERVER_notification_context_create (server, 1);
- GNUNET_SERVER_add_handlers (server, handlers);
+ GNUNET_SERVER_add_handlers (server, server_handlers);
GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
&shutdown_task, NULL);