* @author Gabor X Toth
*/
+#include <inttypes.h>
+
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_constants.h"
static struct GNUNET_PSYCSTORE_Handle *store;
/**
- * channel's pub_key_hash -> struct Channel
+ * All connected masters and slaves.
+ * Channel's pub_key_hash -> struct Channel
*/
static struct GNUNET_CONTAINER_MultiHashMap *clients;
+
/**
* Message in the transmission queue.
*/
struct TransmitMessage *prev;
struct TransmitMessage *next;
+ /**
+ * Buffer with message to be transmitted.
+ */
char *buf;
- uint16_t size;
- uint8_t status;
+
+ /**
+ * Size of @a buf
+ */
+ uint16_t size
+;
+ /**
+ * @see enum MessageState
+ */
+ uint8_t state;
};
+
+/**
+ * Cache for received message fragments.
+ * Message fragments are only sent to clients after all modifiers arrived.
+ *
+ * chan_key -> MultiHashMap chan_msgs
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
+
+
+/**
+ * Entry in the chan_msgs hashmap of @a recv_cache:
+ * fragment_id -> FragmentEntry
+ */
+struct FragmentEntry
+{
+ struct GNUNET_MULTICAST_MessageHeader *mmsg;
+ uint16_t ref_count;
+};
+
+
+/**
+ * Entry in the @a recv_msgs hash map of a @a Channel.
+ * message_id -> FragmentCache
+ */
+struct FragmentCache
+{
+ /**
+ * Total size of header fragments (METHOD & MODIFIERs)
+ */
+ uint64_t header_size;
+
+ /**
+ * Fragment IDs stored in @a recv_cache.
+ */
+ struct GNUNET_CONTAINER_Heap *fragments;
+};
+
+
/**
* Common part of the client context for both a master and slave channel.
*/
struct TransmitMessage *tmit_head;
struct TransmitMessage *tmit_tail;
- char *tmit_buf;
+ /**
+ * Received fragments not yet sent to the client.
+ * message_id -> FragmentCache
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *recv_msgs;
+
+ /**
+ * FIXME
+ */
GNUNET_SCHEDULER_TaskIdentifier tmit_task;
- uint32_t tmit_mod_count;
- uint32_t tmit_mod_recvd;
- uint16_t tmit_size;
- uint8_t tmit_status;
+ /**
+ * Public key of the channel.
+ */
+ struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+
+ /**
+ * Hash of @a pub_key.
+ */
+ struct GNUNET_HashCode pub_key_hash;
+
+ /**
+ * Expected value size for the modifier being received from the PSYC service.
+ */
+ uint32_t tmit_mod_value_size_expected;
+
+ /**
+ * Actual value size for the modifier being received from the PSYC service.
+ */
+ uint32_t tmit_mod_value_size;
+
+ /**
+ * @see enum MessageState
+ */
+ uint8_t tmit_state;
+
+ /**
+ * FIXME
+ */
uint8_t in_transmit;
+
+ /**
+ * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
+ */
uint8_t is_master;
+
+ /**
+ * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
+ */
+ uint8_t ready;
+
+ /**
+ * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
+ */
+ uint8_t disconnected;
};
+
/**
* Client context for a channel master.
*/
struct Master
{
+ /**
+ * Channel struct common for Master and Slave
+ */
struct Channel channel;
- struct GNUNET_CRYPTO_EccPrivateKey priv_key;
- struct GNUNET_CRYPTO_EccPublicSignKey pub_key;
- struct GNUNET_HashCode pub_key_hash;
+ /**
+ * Private key of the channel.
+ */
+ struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
+
+ /**
+ * Handle for the multicast origin.
+ */
struct GNUNET_MULTICAST_Origin *origin;
+
+ /**
+ * Transmit handle for multicast.
+ */
struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
+ /**
+ * Maximum message ID for this channel.
+ *
+ * Incremented before sending a message, thus the message_id in messages sent
+ * starts from 1.
+ */
uint64_t max_message_id;
+
+ /**
+ * ID of the last message that contains any state operations.
+ * 0 if there is no such message.
+ */
uint64_t max_state_message_id;
+
+ /**
+ * Maximum group generation for this channel.
+ */
uint64_t max_group_generation;
/**
- * enum GNUNET_PSYC_Policy
+ * @see enum GNUNET_PSYC_Policy
*/
uint32_t policy;
};
*/
struct Slave
{
+ /**
+ * Channel struct common for Master and Slave
+ */
struct Channel channel;
- struct GNUNET_CRYPTO_EccPrivateKey slave_key;
- struct GNUNET_CRYPTO_EccPublicSignKey chan_key;
- struct GNUNET_HashCode chan_key_hash;
+ /**
+ * Private key of the slave.
+ */
+ struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
+
+ /**
+ * Handle for the multicast member.
+ */
struct GNUNET_MULTICAST_Member *member;
+
+ /**
+ * Transmit handle for multicast.
+ */
struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
+ /**
+ * Peer identity of the origin.
+ */
struct GNUNET_PeerIdentity origin;
+
+ /**
+ * Number of items in @a relays.
+ */
+ uint32_t relay_count;
+
+ /**
+ * Relays that multicast can use to connect.
+ */
struct GNUNET_PeerIdentity *relays;
+
+ /**
+ * Join request to be transmitted to the master on join.
+ */
struct GNUNET_MessageHeader *join_req;
+ /**
+ * Maximum message ID for this channel.
+ */
uint64_t max_message_id;
- uint64_t max_request_id;
- uint32_t relay_count;
+ /**
+ * Maximum request ID for this channel.
+ */
+ uint64_t max_request_id;
};
+static inline void
+transmit_message (struct Channel *ch, uint8_t inc_msg_id);
+
+
/**
* Task run during shutdown.
*
}
}
+
+static void
+client_cleanup (struct Channel *ch)
+{
+ /* FIXME: fragment_cache_clear */
+
+ if (ch->is_master)
+ {
+ struct Master *mst = (struct Master *) ch;
+ if (NULL != mst->origin)
+ GNUNET_MULTICAST_origin_stop (mst->origin);
+ GNUNET_CONTAINER_multihashmap_remove (clients, &ch->pub_key_hash, mst);
+ }
+ else
+ {
+ struct Slave *slv = (struct Slave *) ch;
+ if (NULL != slv->join_req)
+ GNUNET_free (slv->join_req);
+ if (NULL != slv->relays)
+ GNUNET_free (slv->relays);
+ if (NULL != slv->member)
+ GNUNET_MULTICAST_member_part (slv->member);
+ }
+
+ GNUNET_free (ch);
+}
+
+
/**
* Called whenever a client is disconnected.
* Frees our resources associated with that client.
if (NULL == client)
return;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
-
struct Channel *ch
= GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
+
if (NULL == ch)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "User context is NULL in client_disconnect()\n");
+ "%p User context is NULL in client_disconnect()\n", ch);
GNUNET_break (0);
return;
}
- if (NULL != ch->tmit_buf)
- {
- GNUNET_free (ch->tmit_buf);
- ch->tmit_buf = NULL;
- }
+ ch->disconnected = GNUNET_YES;
- if (ch->is_master)
+ /* Send pending messages to multicast before cleanup. */
+ if (NULL != ch->tmit_head)
{
- struct Master *mst = (struct Master *) ch;
- if (NULL != mst->origin)
- GNUNET_MULTICAST_origin_stop (mst->origin);
+ transmit_message (ch, GNUNET_NO);
}
else
{
- struct Slave *slv = (struct Slave *) ch;
- if (NULL != slv->join_req)
- GNUNET_free (slv->join_req);
- if (NULL != slv->relays)
- GNUNET_free (slv->relays);
- if (NULL != slv->member)
- GNUNET_MULTICAST_member_part (slv->member);
+ client_cleanup (ch);
}
-
- GNUNET_free (ch);
}
-void
-join_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+
+/**
+ * Master receives a join request from a slave.
+ */
+static void
+join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
const struct GNUNET_MessageHeader *join_req,
struct GNUNET_MULTICAST_JoinHandle *jh)
{
}
-void
+
+static void
membership_test_cb (void *cls,
- const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
uint64_t message_id, uint64_t group_generation,
struct GNUNET_MULTICAST_MembershipTestHandle *mth)
{
}
-void
+
+static void
replay_fragment_cb (void *cls,
- const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
uint64_t fragment_id, uint64_t flags,
struct GNUNET_MULTICAST_ReplayHandle *rh)
-{
+{
}
-void
+
+static void
replay_message_cb (void *cls,
- const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
uint64_t message_id,
uint64_t fragment_offset,
uint64_t flags,
}
-void
-request_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
- const struct GNUNET_MessageHeader *req,
- enum GNUNET_MULTICAST_MessageFlags flags)
+
+static void
+fragment_store_result (void *cls, int64_t result, const char *err_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "fragment_store() returned %l (%s)\n", result, err_msg);
+}
+
+
+static void
+message_to_client (struct Channel *ch,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg)
{
+ uint16_t size = ntohs (mmsg->header.size);
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Sending message to client. "
+ "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
+ ch, GNUNET_ntohll (mmsg->fragment_id),
+ GNUNET_ntohll (mmsg->message_id));
+
+ pmsg = GNUNET_malloc (psize);
+ pmsg->header.size = htons (psize);
+ pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ pmsg->message_id = mmsg->message_id;
+
+ memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+ GNUNET_SERVER_notification_context_add (nc, ch->client);
+ GNUNET_SERVER_notification_context_unicast (nc, ch->client,
+ (const struct GNUNET_MessageHeader *) pmsg,
+ GNUNET_NO);
+ GNUNET_free (pmsg);
}
-void
+
+/**
+ * Convert an uint64_t in network byte order to a HashCode
+ * that can be used as key in a MultiHashMap
+ */
+static inline void
+hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
+{
+ /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
+
+ n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
+ n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
+
+ *key = (struct GNUNET_HashCode) {{ 0 }};
+ *((uint64_t *) key)
+ = (n << 32) | (n >> 32);
+}
+
+
+/**
+ * Convert an uint64_t in host byte order to a HashCode
+ * that can be used as key in a MultiHashMap
+ */
+static inline void
+hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
+{
+#if __BYTE_ORDER == __BIG_ENDIAN
+ hash_key_from_nll (key, n);
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+ *key = (struct GNUNET_HashCode) {{ 0 }};
+ *((uint64_t *) key) = n;
+#else
+ #error byteorder undefined
+#endif
+}
+
+
+static void
+fragment_cache_insert (struct Channel *ch,
+ const struct GNUNET_HashCode *msg_id,
+ struct FragmentCache *frag_cache,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg,
+ uint16_t last_part_type)
+{
+ uint16_t size = ntohs (mmsg->header.size);
+ struct GNUNET_CONTAINER_MultiHashMap
+ *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
+ &ch->pub_key_hash);
+
+ if (NULL == frag_cache)
+ {
+ frag_cache = GNUNET_new (struct FragmentCache);
+ frag_cache->fragments
+ = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+
+ if (NULL == ch->recv_msgs)
+ {
+ ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+ }
+ GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+
+ if (NULL == chan_msgs)
+ {
+ chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+ GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ }
+ }
+
+ struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode);
+ hash_key_from_nll (frag_id, mmsg->fragment_id);
+ struct FragmentEntry
+ *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
+ if (NULL == frag_entry)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Adding message fragment to cache. "
+ "fragment_id: %" PRIu64 ", "
+ "header_size: %" PRIu64 " + %" PRIu64 ").\n",
+ ch, GNUNET_ntohll (mmsg->fragment_id),
+ frag_cache->header_size, size);
+ frag_entry = GNUNET_new (struct FragmentEntry);
+ frag_entry->ref_count = 1;
+ frag_entry->mmsg = GNUNET_malloc (size);
+ memcpy (frag_entry->mmsg, mmsg, size);
+ GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ }
+ else
+ {
+ frag_entry->ref_count++;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Message fragment already in cache. "
+ "fragment_id: %" PRIu64 ", ref_count: %u\n",
+ ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count);
+ }
+
+ switch (last_part_type)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ frag_cache->header_size += size;
+ }
+ GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id,
+ GNUNET_ntohll (mmsg->fragment_id));
+}
+
+
+static void
+fragment_cache_clear (struct Channel *ch,
+ const struct GNUNET_HashCode *msg_id,
+ struct FragmentCache *frag_cache,
+ uint8_t send_to_client)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Clearing message fragment cache.\n", ch);
+
+ struct GNUNET_CONTAINER_MultiHashMap
+ *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
+ &ch->pub_key_hash);
+ GNUNET_assert (NULL != chan_msgs);
+ struct GNUNET_HashCode *frag_id;
+
+ while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments)))
+ {
+ struct FragmentEntry
+ *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
+ if (frag_entry != NULL)
+ {
+ if (GNUNET_YES == send_to_client)
+ {
+ message_to_client (ch, frag_entry->mmsg);
+ }
+ if (1 == frag_entry->ref_count)
+ {
+ GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry);
+ GNUNET_free (frag_entry->mmsg);
+ GNUNET_free (frag_entry);
+ }
+ else
+ {
+ frag_entry->ref_count--;
+ }
+ }
+ GNUNET_free (frag_id);
+ }
+
+ GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache);
+ GNUNET_CONTAINER_heap_destroy (frag_cache->fragments);
+ GNUNET_free (frag_cache);
+}
+
+
+/**
+ * Incoming message fragment from multicast.
+ *
+ * Store it using PSYCstore and send it to the client of the channel.
+ */
+static void
message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
{
+ struct Channel *ch = cls;
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received message of type %u and size %u from multicast.\n",
+ ch, type, size);
+
+ switch (type)
+ {
+ case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
+ {
+ GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key,
+ (const struct
+ GNUNET_MULTICAST_MessageHeader *) msg,
+ 0, NULL, NULL);
+
+#if TODO
+ /* FIXME: apply modifiers to state in PSYCstore */
+ GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key,
+ GNUNET_ntohll (mmsg->message_id),
+ meth->mod_count, mods,
+ rcb, rcb_cls);
+#endif
+
+ const struct GNUNET_MULTICAST_MessageHeader
+ *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
+
+ uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg),
+ (const char *) &mmsg[1]);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Last message part type %u\n", ptype);
+
+ if (GNUNET_NO == ptype)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Received message with invalid parts from multicast. "
+ "Dropping message.\n", ch);
+ GNUNET_break_op (0);
+ break;
+ }
+
+ struct GNUNET_HashCode msg_id;
+ hash_key_from_nll (&msg_id, mmsg->message_id);
+
+ struct FragmentCache *frag_cache = NULL;
+ if (NULL != ch->recv_msgs)
+ frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id);
+
+ switch (ptype)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+ /* FIXME: check state flag / max_state_message_id */
+ if (NULL == frag_cache)
+ {
+ message_to_client (ch, mmsg);
+ break;
+ }
+ else
+ {
+ if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size)
+ { /* first data fragment after the header, send cached fragments */
+ fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_YES);
+ message_to_client (ch, mmsg);
+ break;
+ }
+ else
+ { /* still missing fragments from the header, cache data fragment */
+ /* fall thru */
+ }
+ }
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ /* not all modifiers arrived yet, cache fragment */
+ fragment_cache_insert (ch, &msg_id, frag_cache, mmsg, ptype);
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+ if (NULL != frag_cache)
+ { /* fragments not yet sent to client, remove from cache */
+ fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_NO);
+ }
+ else
+ {
+ message_to_client (ch, mmsg);
+ }
+ break;
+ }
+ break;
+ }
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Dropping unknown message of type %u and size %u.\n",
+ ch, type, size);
+ }
+}
+
+
+/**
+ * Incoming request fragment from multicast for a master.
+ *
+ * @param cls Master.
+ * @param slave_key Sending slave's public key.
+ * @param msg The message.
+ * @param flags Request flags.
+ */
+static void
+request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+ const struct GNUNET_MessageHeader *msg,
+ enum GNUNET_MULTICAST_MessageFlags flags)
+{
+ struct Master *mst = cls;
+ struct Channel *ch = &mst->channel;
+
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u from multicast.\n",
- ntohs (msg->type));
+ "%p Received request of type %u and size %u from multicast.\n",
+ ch, type, size);
+
+ switch (type)
+ {
+ case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
+ {
+ const struct GNUNET_MULTICAST_RequestHeader *req
+ = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
+
+ /* FIXME: see message_cb() */
+ if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req),
+ (const char *) &req[1]))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Dropping message with invalid parts "
+ "received from multicast.\n", ch);
+ GNUNET_break_op (0);
+ break;
+ }
+
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
+ pmsg = GNUNET_malloc (psize);
+ pmsg->header.size = htons (psize);
+ pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ pmsg->message_id = req->request_id;
+ pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
+
+ memcpy (&pmsg[1], &req[1], size - sizeof (*req));
+
+ GNUNET_SERVER_notification_context_add (nc, ch->client);
+ GNUNET_SERVER_notification_context_unicast (nc, ch->client,
+ (const struct GNUNET_MessageHeader *) pmsg,
+ GNUNET_NO);
+ GNUNET_free (pmsg);
+ break;
+ }
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Dropping unknown request of type %u and size %u.\n",
+ ch, type, size);
+ GNUNET_break_op (0);
+ }
}
-void
+
+/**
+ * Response from PSYCstore with the current counter values for a channel master.
+ */
+static void
master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
uint64_t max_message_id, uint64_t max_group_generation,
uint64_t max_state_message_id)
join_cb, membership_test_cb,
replay_fragment_cb, replay_message_cb,
request_cb, message_cb, ch);
+ ch->ready = GNUNET_YES;
}
GNUNET_SERVER_notification_context_add (nc, ch->client);
GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
}
+/**
+ * Response from PSYCstore with the current counter values for a channel slave.
+ */
void
slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
uint64_t max_message_id, uint64_t max_group_generation,
{
slv->max_message_id = max_message_id;
slv->member
- = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key,
+ = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key,
&slv->origin,
slv->relay_count, slv->relays,
slv->join_req, join_cb,
membership_test_cb,
replay_fragment_cb, replay_message_cb,
message_cb, ch);
+ ch->ready = GNUNET_YES;
}
GNUNET_SERVER_notification_context_add (nc, ch->client);
}
+/**
+ * Handle a connecting client starting a channel master.
+ */
static void
handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
const struct MasterStartRequest *req
= (const struct MasterStartRequest *) msg;
struct Master *mst = GNUNET_new (struct Master);
- mst->channel.client = client;
- mst->channel.is_master = GNUNET_YES;
+ struct Channel *ch = &mst->channel;
+ ch->client = client;
+ ch->is_master = GNUNET_YES;
mst->policy = ntohl (req->policy);
mst->priv_key = req->channel_key;
- GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->priv_key,
- &mst->pub_key);
- GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash);
+ GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key);
+ GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Master connected to channel %s.\n",
+ mst, GNUNET_h2s (&ch->pub_key_hash));
- GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
- master_counters_cb, mst);
+ GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst);
GNUNET_SERVER_client_set_user_context (client, &mst->channel);
- GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst,
+ GNUNET_CONTAINER_multihashmap_put (clients, &ch->pub_key_hash, mst,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
+/**
+ * Handle a connecting client joining as a channel slave.
+ */
static void
handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
const struct SlaveJoinRequest *req
= (const struct SlaveJoinRequest *) msg;
struct Slave *slv = GNUNET_new (struct Slave);
+ struct Channel *ch = &slv->channel;
slv->channel.client = client;
slv->channel.is_master = GNUNET_NO;
slv->slave_key = req->slave_key;
- slv->chan_key = req->channel_key;
- GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key),
- &slv->chan_key_hash);
+ ch->pub_key = req->channel_key;
+ GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
+ &ch->pub_key_hash);
slv->origin = req->origin;
slv->relay_count = ntohl (req->relay_count);
+ if (0 < slv->relay_count)
+ {
+ const struct GNUNET_PeerIdentity *relays
+ = (const struct GNUNET_PeerIdentity *) &req[1];
+ slv->relays
+ = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
+ uint32_t i;
+ for (i = 0; i < slv->relay_count; i++)
+ memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+ }
- const struct GNUNET_PeerIdentity *relays
- = (const struct GNUNET_PeerIdentity *) &req[1];
- slv->relays
- = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
- uint32_t i;
- for (i = 0; i < slv->relay_count; i++)
- memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Slave connected to channel %s.\n",
+ slv, GNUNET_h2s (&ch->pub_key_hash));
- GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
- slave_counters_cb, slv);
+ GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv);
GNUNET_SERVER_client_set_user_context (client, &slv->channel);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
+/**
+ * Send acknowledgement to a client.
+ *
+ * Sent after a message fragment has been passed on to multicast.
+ *
+ * @param ch The channel struct for the client.
+ */
static void
-send_transmit_ack (struct Channel *ch)
+send_message_ack (struct Channel *ch)
{
- struct TransmitAck *res = GNUNET_malloc (sizeof (*res));
- res->header.size = htons (sizeof (*res));
- res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
- res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size);
+ struct GNUNET_MessageHeader res;
+ res.size = htons (sizeof (res));
+ res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
GNUNET_SERVER_notification_context_add (nc, ch->client);
- GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
+ GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
GNUNET_NO);
- GNUNET_free (res);
}
+/**
+ * Callback for the transmit functions of multicast.
+ */
static int
transmit_notify (void *cls, size_t *data_size, void *data)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n");
struct Channel *ch = cls;
- struct TransmitMessage *msg = ch->tmit_head;
+ struct TransmitMessage *tmit_msg = ch->tmit_head;
- if (NULL == msg || *data_size < ntohs (msg->size))
+ if (NULL == tmit_msg || *data_size < tmit_msg->size)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p transmit_notify: nothing to send.\n", ch);
*data_size = 0;
return GNUNET_NO;
}
- *data_size = ntohs (msg->size);
- memcpy (data, msg->buf, *data_size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
+
+ *data_size = tmit_msg->size;
+ memcpy (data, tmit_msg->buf, *data_size);
+
+ GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
+ GNUNET_free (tmit_msg);
- GNUNET_free (ch->tmit_buf);
- ch->tmit_buf = NULL;
- GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
+ int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
+ send_message_ack (ch);
- return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
+ if (0 == ch->tmit_task)
+ {
+ if (NULL != ch->tmit_head)
+ {
+ transmit_message (ch, GNUNET_NO);
+ }
+ else if (ch->disconnected)
+ {
+ /* FIXME: handle partial message (when still in_transmit) */
+ client_cleanup (ch);
+ }
+ }
+
+ return ret;
+}
+
+
+/**
+ * Callback for the transmit functions of multicast.
+ */
+static int
+master_transmit_notify (void *cls, size_t *data_size, void *data)
+{
+ int ret = transmit_notify (cls, data_size, data);
+
+ if (GNUNET_YES == ret)
+ {
+ struct Master *mst = cls;
+ mst->tmit_handle = NULL;
+ }
+ return ret;
}
+/**
+ * Callback for the transmit functions of multicast.
+ */
+static int
+slave_transmit_notify (void *cls, size_t *data_size, void *data)
+{
+ int ret = transmit_notify (cls, data_size, data);
+
+ if (GNUNET_YES == ret)
+ {
+ struct Slave *slv = cls;
+ slv->tmit_handle = NULL;
+ }
+ return ret;
+}
+
+
+/**
+ * Transmit a message from a channel master to the multicast group.
+ */
static void
-master_transmit_message (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+master_transmit_message (struct Master *mst, uint8_t inc_msg_id)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
- struct Master *mst = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
mst->channel.tmit_task = 0;
if (NULL == mst->tmit_handle)
{
+ if (GNUNET_YES == inc_msg_id)
+ mst->max_message_id++;
mst->tmit_handle
- = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id,
+ = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
mst->max_group_generation,
- transmit_notify, mst);
+ master_transmit_notify, mst);
}
else
{
}
+/**
+ * Transmit a message from a channel slave to the multicast group.
+ */
static void
-slave_transmit_message (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id)
{
- struct Slave *slv = cls;
slv->channel.tmit_task = 0;
if (NULL == slv->tmit_handle)
{
+ if (GNUNET_YES == inc_msg_id)
+ slv->max_message_id++;
slv->tmit_handle
- = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id,
- transmit_notify, slv);
+ = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
+ slave_transmit_notify, slv);
}
else
{
}
-static int
-buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
+static inline void
+transmit_message (struct Channel *ch, uint8_t inc_msg_id)
{
- uint16_t size = ntohs (msg->size);
- struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
-
- if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
- return GNUNET_SYSERR;
-
- if (0 == ch->tmit_size)
- {
- ch->tmit_buf = GNUNET_malloc (size);
- memcpy (ch->tmit_buf, msg, size);
- ch->tmit_size = size;
- }
- else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size)
- {
- ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size);
- memcpy (ch->tmit_buf + ch->tmit_size, msg, size);
- ch->tmit_size += size;
- }
-
- if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE
- < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData))
- {
- struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage);
- tmit_msg->buf = (char *) msg;
- tmit_msg->size = size;
- tmit_msg->status = ch->tmit_status;
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
- tmit_delay = GNUNET_TIME_UNIT_ZERO;
- }
-
- if (0 != ch->tmit_task)
- GNUNET_SCHEDULER_cancel (ch->tmit_task);
+ ch->is_master
+ ? master_transmit_message ((struct Master *) ch, inc_msg_id)
+ : slave_transmit_message ((struct Slave *) ch, inc_msg_id);
+}
- ch->tmit_task
- = ch->is_master
- ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch)
- : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch);
- return GNUNET_OK;
+static void
+transmit_error (struct Channel *ch)
+{
+ struct GNUNET_MessageHeader *msg;
+ struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
+ + sizeof (*msg));
+ msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
+ msg->size = ntohs (sizeof (*msg));
+ msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
+
+ tmit_msg->buf = (char *) &tmit_msg[1];
+ tmit_msg->size = sizeof (*msg);
+ tmit_msg->state = ch->tmit_state;
+ GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
+ transmit_message (ch, GNUNET_NO);
+
+ /* FIXME: cleanup */
}
+
+/**
+ * Incoming message from a client.
+ */
static void
-handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
- const struct GNUNET_PSYC_MessageMethod *meth
- = (const struct GNUNET_PSYC_MessageMethod *) msg;
struct Channel *ch
= GNUNET_SERVER_client_get_user_context (client, struct Channel);
GNUNET_assert (NULL != ch);
- if (GNUNET_NO != ch->in_transmit)
+ if (GNUNET_YES != ch->ready)
{
- // FIXME: already transmitting a message, send back error message.
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Dropping message from client, channel is not ready yet.\n",
+ ch);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
- ch->in_transmit = GNUNET_YES;
- ch->tmit_buf = NULL;
- ch->tmit_size = 0;
- ch->tmit_mod_recvd = 0;
- ch->tmit_mod_count = ntohl (meth->mod_count);
- ch->tmit_status = GNUNET_PSYC_DATA_CONT;
+ uint8_t inc_msg_id = GNUNET_NO;
+ uint16_t size = ntohs (msg->size);
+ uint16_t psize = 0, ptype = 0, pos = 0;
- buffer_message (ch, msg);
+ if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
+ GNUNET_break (0);
+ transmit_error (ch);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received message from client.\n", ch);
+ GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
- if (0 == ch->tmit_mod_count)
- send_transmit_ack (ch);
+ for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
+ {
+ const struct GNUNET_MessageHeader *pmsg
+ = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
+ psize = ntohs (pmsg->size);
+ ptype = ntohs (pmsg->type);
+ if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p Received invalid message part of type %u and size %u "
+ "from client.\n", ch, ptype, psize);
+ GNUNET_break (0);
+ transmit_error (ch);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received message part from client.\n", ch);
+ GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
+
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
+ inc_msg_id = GNUNET_YES;
+ }
+
+ size -= sizeof (*msg);
+ struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
+ tmit_msg->buf = (char *) &tmit_msg[1];
+ memcpy (tmit_msg->buf, &msg[1], size);
+ tmit_msg->size = size;
+ tmit_msg->state = ch->tmit_state;
+ GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
+ transmit_message (ch, inc_msg_id);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
};
+/**
+ * Client requests to add a slave to the membership database.
+ */
static void
-handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+handle_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
- const struct GNUNET_PSYC_MessageModifier *mod
- = (const struct GNUNET_PSYC_MessageModifier *) msg;
- struct Channel *ch
- = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != ch);
- ch->tmit_mod_recvd++;
- buffer_message (ch, msg);
+}
- if (ch->tmit_mod_recvd == ch->tmit_mod_count)
- send_transmit_ack (ch);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-};
+/**
+ * Client requests to remove a slave from the membership database.
+ */
+static void
+handle_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+
+}
+/**
+ * Client requests channel history from PSYCstore.
+ */
static void
-handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
+handle_story_request (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
- const struct GNUNET_PSYC_MessageData *data
- = (const struct GNUNET_PSYC_MessageData *) msg;
- struct Channel *ch
- = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != ch);
- ch->tmit_status = ntohs (data->status);
- buffer_message (ch, msg);
- send_transmit_ack (ch);
+}
- if (GNUNET_PSYC_DATA_CONT != ch->tmit_status)
- ch->in_transmit = GNUNET_NO;
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-};
+/**
+ * Client requests best matching state variable from PSYCstore.
+ */
+static void
+handle_state_get (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+
+/**
+ * Client requests state variables with a given prefix from PSYCstore.
+ */
+static void
+handle_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+
+}
/**
{ &handle_slave_join, NULL,
GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
- { &handle_transmit_method, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
+ { &handle_psyc_message, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
+
+ { &handle_slave_add, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
+
+ { &handle_slave_remove, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
- { &handle_transmit_modifier, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
+ { &handle_story_request, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
- { &handle_transmit_data, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
+ { &handle_state_get, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
- { NULL, NULL, 0, 0 }
+ { &handle_state_get_prefix, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
};
cfg = c;
store = GNUNET_PSYCSTORE_connect (cfg);
stats = GNUNET_STATISTICS_create ("psyc", cfg);
clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
nc = GNUNET_SERVER_notification_context_create (server, 1);
GNUNET_SERVER_add_handlers (server, handlers);
GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
- NULL);
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+ &shutdown_task, NULL);
}