X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmulticast%2Fgnunet-service-multicast.c;h=b068f13083e72b557ca586532d857b2aeb125a1a;hb=be35c296dd1f2aa82dcf718be5b257fa340d5328;hp=1bb27f96e2d197363b7b679c6a6cffb76778a33d;hpb=a8133f3ff80aaaccc904e4bbaf69bc53dee499cb;p=oweals%2Fgnunet.git diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c index 1bb27f96e..b068f1308 100644 --- a/src/multicast/gnunet-service-multicast.c +++ b/src/multicast/gnunet-service-multicast.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009 Christian Grothoff (and other contributing authors) + Copyright (C) 2009 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -26,8 +26,9 @@ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_signatures.h" +#include "gnunet_applications.h" #include "gnunet_statistics_service.h" -#include "gnunet_core_service.h" +#include "gnunet_cadet_service.h" #include "gnunet_multicast_service.h" #include "multicast.h" @@ -37,15 +38,14 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Server handle. + * Service handle. */ -static struct GNUNET_SERVER_Handle *server; +static struct GNUNET_SERVICE_Handle *service; /** - * Core handle. - * Only used during initialization. + * CADET handle. */ -static struct GNUNET_CORE_Handle *core; +static struct GNUNET_CADET_Handle *cadet; /** * Identity of this peer. @@ -56,29 +56,141 @@ static struct GNUNET_PeerIdentity this_peer; * Handle to the statistics service. */ static struct GNUNET_STATISTICS_Handle *stats; -/** - * Notification context, simplifies client broadcasts. - */ -static struct GNUNET_SERVER_NotificationContext *nc; /** - * All connected origins. - * Group's pub_key_hash -> struct Origin + * All connected origin clients. + * Group's pub_key_hash -> struct Origin * (uniq) */ static struct GNUNET_CONTAINER_MultiHashMap *origins; /** - * All connected members. - * Group's pub_key_hash -> struct Member + * All connected member clients. + * Group's pub_key_hash -> struct Member * (multi) */ static struct GNUNET_CONTAINER_MultiHashMap *members; /** - * Connected members per group. - * Group's pub_key_hash -> Member's pub_key -> struct Member + * Connected member clients per group. + * Group's pub_key_hash -> Member's pub_key_hash (uniq) -> struct Member * (uniq) */ static struct GNUNET_CONTAINER_MultiHashMap *group_members; +/** + * Incoming CADET channels with connected children in the tree. + * Group's pub_key_hash -> struct Channel * (multi) + */ +static struct GNUNET_CONTAINER_MultiHashMap *channels_in; + +/** + * Outgoing CADET channels connecting to parents in the tree. + * Group's pub_key_hash -> struct Channel * (multi) + */ +static struct GNUNET_CONTAINER_MultiHashMap *channels_out; + +/** + * Incoming replay requests from CADET. + * Group's pub_key_hash -> + * H(fragment_id, message_id, fragment_offset, flags) -> struct Channel * + */ +static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet; + +/** + * Incoming replay requests from clients. + * Group's pub_key_hash -> + * H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVICE_Client * + */ +static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client; + + +/** + * Join status of a remote peer. + */ +enum JoinStatus +{ + JOIN_REFUSED = -1, + JOIN_NOT_ASKED = 0, + JOIN_WAITING = 1, + JOIN_ADMITTED = 2, +}; + +enum ChannelDirection +{ + DIR_INCOMING = 0, + DIR_OUTGOING = 1, +}; + + +/** + * Context for a CADET channel. + */ +struct Channel +{ + /** + * Group the channel belongs to. + * + * Only set for outgoing channels. + */ + struct Group *group; + + /** + * CADET channel. + */ + struct GNUNET_CADET_Channel *channel; + + /** + * CADET transmission handle. + */ + struct GNUNET_CADET_TransmitHandle *tmit_handle; + + /** + * Public key of the target group. + */ + struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key; + + /** + * Hash of @a group_pub_key. + */ + struct GNUNET_HashCode group_pub_hash; + + /** + * Public key of the joining member. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key; + + /** + * Remote peer identity. + */ + struct GNUNET_PeerIdentity peer; + + /** + * Current window size, set by cadet_notify_window_change() + */ + int32_t window_size; + + /** + * Is the connection established? + */ + int8_t is_connected; + + /** + * Is the remote peer admitted to the group? + * @see enum JoinStatus + */ + int8_t join_status; + + /** + * Number of messages waiting to be sent to CADET. + */ + uint8_t msgs_pending; + + /** + * Channel direction. + * @see enum ChannelDirection + */ + uint8_t direction; +}; + + /** * List of connected clients. */ @@ -86,11 +198,12 @@ struct ClientList { struct ClientList *prev; struct ClientList *next; - struct GNUNET_SERVER_Client *client; + struct GNUNET_SERVICE_Client *client; }; + /** - * Common part of the client context for both an origin and member. + * Client context for an origin or member. */ struct Group { @@ -108,29 +221,44 @@ struct Group struct GNUNET_HashCode pub_key_hash; /** - * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)? + * CADET port hash. */ - uint8_t is_origin; + struct GNUNET_HashCode cadet_port_hash; /** * Is the client disconnected? #GNUNET_YES or #GNUNET_NO */ uint8_t disconnected; + + /** + * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)? + */ + uint8_t is_origin; + + union { + struct Origin *origin; + struct Member *member; + }; }; /** - * Client context for a group's origin. +* Client context for a group's origin. */ struct Origin { - struct Group grp; + struct Group group; /** * Private key of the group. */ struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; + /** + * CADET port. + */ + struct GNUNET_CADET_Port *cadet_port; + /** * Last message fragment ID sent to the group. */ @@ -143,7 +271,7 @@ struct Origin */ struct Member { - struct Group grp; + struct Group group; /** * Private key of the member. @@ -173,26 +301,74 @@ struct Member */ struct MulticastJoinDecisionMessageHeader *join_dcsn; + /** + * CADET channel to the origin. + */ + struct Channel *origin_channel; + + /** + * Peer identity of origin. + */ + struct GNUNET_PeerIdentity origin; + + /** + * Peer identity of relays (other members to connect). + */ + struct GNUNET_PeerIdentity *relays; + /** * Last request fragment ID sent to the origin. */ uint64_t max_fragment_id; + + /** + * Number of @a relays. + */ + uint32_t relay_count; +}; + + +/** + * Client context. + */ +struct Client { + struct GNUNET_SERVICE_Client *client; + struct Group *group; }; +struct ReplayRequestKey +{ + uint64_t fragment_id; + uint64_t message_id; + uint64_t fragment_offset; + uint64_t flags; +}; + + +static struct Channel * +cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer); + +static void +cadet_channel_destroy (struct Channel *chn); + +static void +client_send_join_decision (struct Member *mem, + const struct MulticastJoinDecisionMessageHeader *hdcsn); + + /** * Task run during shutdown. * * @param cls unused - * @param tc unused */ static void -shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls) { - if (NULL != core) + if (NULL != cadet) { - GNUNET_CORE_disconnect (core); - core = NULL; + GNUNET_CADET_disconnect (cadet); + cadet = NULL; } if (NULL != stats) { @@ -202,14 +378,21 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /* FIXME: do more clean up here */ } + /** * Clean up origin data structures after a client disconnected. */ static void cleanup_origin (struct Origin *orig) { - struct Group *grp = &orig->grp; + struct Group *grp = &orig->group; GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig); + if (NULL != orig->cadet_port) + { + GNUNET_CADET_close_port (orig->cadet_port); + orig->cadet_port = NULL; + } + GNUNET_free (orig); } @@ -219,7 +402,7 @@ cleanup_origin (struct Origin *orig) static void cleanup_member (struct Member *mem) { - struct Group *grp = &mem->grp; + struct Group *grp = &mem->group; struct GNUNET_CONTAINER_MultiHashMap * grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &grp->pub_key_hash); @@ -238,6 +421,7 @@ cleanup_member (struct Member *mem) mem->join_dcsn = NULL; } GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem); + GNUNET_free (mem); } @@ -248,68 +432,117 @@ static void cleanup_group (struct Group *grp) { (GNUNET_YES == grp->is_origin) - ? cleanup_origin ((struct Origin *) grp) - : cleanup_member ((struct Member *) grp); + ? cleanup_origin (grp->origin) + : cleanup_member (grp->member); +} - GNUNET_free (grp); + +void +replay_key_hash (uint64_t fragment_id, uint64_t message_id, + uint64_t fragment_offset, uint64_t flags, + struct GNUNET_HashCode *key_hash) +{ + struct ReplayRequestKey key = { + .fragment_id = fragment_id, + .message_id = message_id, + .fragment_offset = fragment_offset, + .flags = flags, + }; + GNUNET_CRYPTO_hash (&key, sizeof (key), key_hash); } /** - * Called whenever a client is disconnected. + * Remove channel from replay request hashmap. * - * Frees our resources associated with that client. + * @param chn + * Channel to remove. * - * @param cls Closure. - * @param client Client handle. + * @return #GNUNET_YES if there are more entries to process, + * #GNUNET_NO when reached end of hashmap. */ -static void -client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) +static int +replay_req_remove_cadet (struct Channel *chn) { - if (NULL == client) - return; - - struct Group *grp - = GNUNET_SERVER_client_get_user_context (client, struct Group); - - if (NULL == grp) + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &chn->group->pub_key_hash); + if (NULL == grp_replay_req) + return GNUNET_NO; + + struct GNUNET_CONTAINER_MultiHashMapIterator * + it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req); + struct GNUNET_HashCode key; + const struct Channel *c; + while (GNUNET_YES + == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key, + (const void **) &c)) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "%p User context is NULL in client_disconnect()\n", grp); - GNUNET_assert (0); - return; + if (c == chn) + { + GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn); + GNUNET_CONTAINER_multihashmap_iterator_destroy (it); + return GNUNET_YES; + } } + GNUNET_CONTAINER_multihashmap_iterator_destroy (it); + return GNUNET_NO; +} - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Client (%s) disconnected from group %s\n", - grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member", - GNUNET_h2s (&grp->pub_key_hash)); - struct ClientList *cl = grp->clients_head; - while (NULL != cl) +/** + * Remove client from replay request hashmap. + * + * @param client + * Client to remove. + * + * @return #GNUNET_YES if there are more entries to process, + * #GNUNET_NO when reached end of hashmap. + */ +static int +replay_req_remove_client (struct Group *grp, struct GNUNET_SERVICE_Client *client) +{ + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client, + &grp->pub_key_hash); + if (NULL == grp_replay_req) + return GNUNET_NO; + + struct GNUNET_CONTAINER_MultiHashMapIterator * + it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req); + struct GNUNET_HashCode key; + const struct GNUNET_SERVICE_Client *c; + while (GNUNET_YES + == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key, + (const void **) &c)) { - if (cl->client == client) + if (c == client) { - GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl); - GNUNET_free (cl); - break; + GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, client); + GNUNET_CONTAINER_multihashmap_iterator_destroy (it); + return GNUNET_YES; } - cl = cl->next; } + GNUNET_CONTAINER_multihashmap_iterator_destroy (it); + return GNUNET_NO; +} - if (NULL == grp->clients_head) - { /* Last client disconnected. */ -#if FIXME - if (NULL != grp->tmit_head) - { /* Send pending messages via CADET before cleanup. */ - transmit_message (grp); - } - else -#endif - { - cleanup_group (grp); - } - } + +/** + * Send message to a client. + */ +static void +client_send (struct GNUNET_SERVICE_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Sending message to client.\n", client); + + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_copy (msg); + + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); } @@ -317,17 +550,20 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) * Send message to all clients connected to the group. */ static void -message_to_clients (const struct Group *grp, - const struct GNUNET_MessageHeader *msg) +client_send_group (const struct Group *grp, + const struct GNUNET_MessageHeader *msg) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Sending message to clients.\n", grp); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Sending message to all clients of the group.\n", grp); struct ClientList *cl = grp->clients_head; while (NULL != cl) { - GNUNET_SERVER_notification_context_add (nc, cl->client); - GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_copy (msg); + + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cl->client), + env); cl = cl->next; } } @@ -337,13 +573,13 @@ message_to_clients (const struct Group *grp, * Iterator callback for sending a message to origin clients. */ static int -origin_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *origin) +client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *origin) { const struct GNUNET_MessageHeader *msg = cls; struct Member *orig = origin; - message_to_clients (&orig->grp, msg); + client_send_group (&orig->group, msg); return GNUNET_YES; } @@ -352,15 +588,15 @@ origin_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, * Iterator callback for sending a message to member clients. */ static int -member_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *member) +client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *member) { const struct GNUNET_MessageHeader *msg = cls; struct Member *mem = member; if (NULL != mem->join_dcsn) { /* Only send message to admitted members */ - message_to_clients (&mem->grp, msg); + client_send_group (&mem->group, msg); } return GNUNET_YES; } @@ -369,130 +605,887 @@ member_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, /** * Send message to all origin and member clients connected to the group. * + * @param pub_key_hash + * H(key_pub) of the group. + * @param msg + * Message to send. + */ +static int +client_send_all (struct GNUNET_HashCode *pub_key_hash, + const struct GNUNET_MessageHeader *msg) +{ + int n = 0; + n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash, + client_send_origin_cb, + (void *) msg); + n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash, + client_send_member_cb, + (void *) msg); + return n; +} + + +/** + * Send message to a random origin client or a random member client. + * * @param grp The group to send @a msg to. * @param msg Message to send. */ -static void -message_to_group (struct Group *grp, const struct GNUNET_MessageHeader *msg) +static int +client_send_random (struct GNUNET_HashCode *pub_key_hash, + const struct GNUNET_MessageHeader *msg) { - if (origins != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - origin_message_cb, (void *) msg); - if (members != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, - member_message_cb, (void *) msg); + int n = 0; + n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb, + (void *) msg); + if (n <= 0) + n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb, + (void *) msg); + return n; } /** * Send message to all origin clients connected to the group. * - * @param grp The group to send @a msg to. - * @param msg Message to send. + * @param pub_key_hash + * H(key_pub) of the group. + * @param msg + * Message to send. */ -static void -message_to_origin (struct Group *grp, const struct GNUNET_MessageHeader *msg) +static int +client_send_origin (struct GNUNET_HashCode *pub_key_hash, + const struct GNUNET_MessageHeader *msg) { - if (origins != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - origin_message_cb, (void *) msg); + int n = 0; + n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash, + client_send_origin_cb, + (void *) msg); + return n; } /** - * Handle a connecting client starting an origin. + * Send fragment acknowledgement to all clients of the channel. + * + * @param pub_key_hash + * H(key_pub) of the group. */ static void -client_origin_start (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) +client_send_ack (struct GNUNET_HashCode *pub_key_hash) { - const struct MulticastOriginStartMessage * - msg = (const struct MulticastOriginStartMessage *) m; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Sending message ACK to client.\n"); - struct GNUNET_CRYPTO_EddsaPublicKey pub_key; - struct GNUNET_HashCode pub_key_hash; + static struct GNUNET_MessageHeader *msg = NULL; + if (NULL == msg) + { + msg = GNUNET_malloc (sizeof (*msg)); + msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK); + msg->size = htons (sizeof (*msg)); + } + client_send_all (pub_key_hash, msg); +} - GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key); - GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash); - struct Origin * - orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash); - struct Group *grp; +struct CadetTransmitClosure +{ + struct Channel *chn; + const struct GNUNET_MessageHeader *msg; +}; - if (NULL == orig) - { - orig = GNUNET_new (struct Origin); - orig->priv_key = msg->group_key; - grp = &orig->grp; - grp->is_origin = GNUNET_YES; - grp->pub_key = pub_key; - grp->pub_key_hash = pub_key_hash; - GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); +/** + * Send a message to a CADET channel. + * + * @param chn Channel. + * @param msg Message. + */ +static void +cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_copy (msg); + + GNUNET_MQ_send (GNUNET_CADET_get_mq (chn->channel), env); + + if (0 < chn->window_size) + { + client_send_ack (&chn->group_pub_hash); } else { - grp = &orig->grp; + chn->msgs_pending++; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Queuing message. Pending messages: %u\n", + chn, chn->msgs_pending); } - - struct ClientList *cl = GNUNET_new (struct ClientList); - cl->client = client; - GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Client connected as origin to group %s.\n", - orig, GNUNET_h2s (&grp->pub_key_hash)); - - GNUNET_SERVER_client_set_user_context (client, grp); - GNUNET_SERVER_receive_done (client, GNUNET_OK); } /** - * Handle a connecting client joining a group. + * Create CADET channel and send a join request. */ static void -client_member_join (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) +cadet_send_join_request (struct Member *mem) { - const struct MulticastMemberJoinMessage * - msg = (const struct MulticastMemberJoinMessage *) m; - uint16_t msg_size = ntohs (msg->header.size); - - struct GNUNET_CRYPTO_EcdsaPublicKey mem_pub_key; - struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash; - - GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key); - GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash); - GNUNET_CRYPTO_hash (&msg->group_key, sizeof (msg->group_key), &pub_key_hash); - - struct GNUNET_CONTAINER_MultiHashMap * - grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash); - struct Member *mem = NULL; - struct Group *grp; + mem->origin_channel = cadet_channel_create (&mem->group, &mem->origin); + cadet_send_channel (mem->origin_channel, &mem->join_req->header); - if (NULL != grp_mem) + uint32_t i; + for (i = 0; i < mem->relay_count; i++) { - mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash); + struct Channel * + chn = cadet_channel_create (&mem->group, &mem->relays[i]); + cadet_send_channel (chn, &mem->join_req->header); } - if (NULL == mem) - { - mem = GNUNET_new (struct Member); - mem->priv_key = msg->member_key; - mem->pub_key = mem_pub_key; - mem->pub_key_hash = mem_pub_key_hash; +} - grp = &mem->grp; - grp->is_origin = GNUNET_NO; - grp->pub_key = msg->group_key; - grp->pub_key_hash = pub_key_hash; - if (NULL == grp_mem) +static int +cadet_send_join_decision_cb (void *cls, + const struct GNUNET_HashCode *group_pub_hash, + void *channel) +{ + const struct MulticastJoinDecisionMessageHeader *hdcsn = cls; + struct Channel *chn = channel; + + const struct MulticastJoinDecisionMessage *dcsn = + (struct MulticastJoinDecisionMessage *) &hdcsn[1]; + + if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key)) + && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer))) + { + if (GNUNET_YES == ntohl (dcsn->is_admitted)) + { + chn->join_status = JOIN_ADMITTED; + } + else + { + chn->join_status = JOIN_REFUSED; + } + + cadet_send_channel (chn, &hdcsn->header); + return GNUNET_NO; + } + return GNUNET_YES; +} + + +/** + * Send join decision to a remote peer. + */ +static void +cadet_send_join_decision (struct Group *grp, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, &grp->pub_key_hash, + &cadet_send_join_decision_cb, + (void *) hdcsn); +} + + +/** + * Iterator callback for sending a message to origin clients. + */ +static int +cadet_send_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *channel) +{ + const struct GNUNET_MessageHeader *msg = cls; + struct Channel *chn = channel; + if (JOIN_ADMITTED == chn->join_status) + cadet_send_channel (chn, msg); + return GNUNET_YES; +} + + +/** + * Send message to all connected children. + */ +static int +cadet_send_children (struct GNUNET_HashCode *pub_key_hash, + const struct GNUNET_MessageHeader *msg) +{ + int n = 0; + if (channels_in != NULL) + n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, pub_key_hash, + cadet_send_cb, (void *) msg); + return n; +} + + +#if 0 // unused as yet +/** + * Send message to all connected parents. + */ +static int +cadet_send_parents (struct GNUNET_HashCode *pub_key_hash, + const struct GNUNET_MessageHeader *msg) +{ + int n = 0; + if (channels_in != NULL) + n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_out, pub_key_hash, + cadet_send_cb, (void *) msg); + return n; +} +#endif + + +/** + * CADET channel connect handler. + * + * @see GNUNET_CADET_ConnectEventHandler() + */ +static void * +cadet_notify_connect (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *source) +{ + struct Channel *chn = GNUNET_malloc (sizeof *chn); + chn->group = cls; + chn->channel = channel; + chn->direction = DIR_INCOMING; + chn->join_status = JOIN_NOT_ASKED; + + GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + return chn; +} + + +/** + * CADET window size change handler. + * + * @see GNUNET_CADET_WindowSizeEventHandler() + */ +static void +cadet_notify_window_change (void *cls, + const struct GNUNET_CADET_Channel *channel, + int window_size) +{ + struct Channel *chn = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Window size changed to %d. Pending messages: %u\n", + chn, window_size, chn->msgs_pending); + + chn->is_connected = GNUNET_YES; + chn->window_size = (int32_t) window_size; + + for (int i = 0; i < window_size; i++) + { + if (0 < chn->msgs_pending) + { + client_send_ack (&chn->group_pub_hash); + chn->msgs_pending--; + } + else + { + break; + } + } +} + + +/** + * CADET channel disconnect handler. + * + * @see GNUNET_CADET_DisconnectEventHandler() + */ +static void +cadet_notify_disconnect (void *cls, + const struct GNUNET_CADET_Channel *channel) +{ + if (NULL == cls) + return; + + struct Channel *chn = cls; + if (NULL != chn->group) + { + if (GNUNET_NO == chn->group->is_origin) + { + struct Member *mem = (struct Member *) chn->group; + if (chn == mem->origin_channel) + mem->origin_channel = NULL; + } + } + + int ret; + do + { + ret = replay_req_remove_cadet (chn); + } + while (GNUNET_YES == ret); + + GNUNET_free (chn); +} + + +static int +check_cadet_join_request (void *cls, + const struct MulticastJoinRequestMessage *req) +{ + struct Channel *chn = cls; + + if (NULL == chn + || JOIN_NOT_ASKED != chn->join_status) + { + return GNUNET_SYSERR; + } + + uint16_t size = ntohs (req->header.size); + if (size < sizeof (*req)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (ntohl (req->purpose.size) != (size + - sizeof (req->header) + - sizeof (req->reserved) + - sizeof (req->signature))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST, + &req->purpose, &req->signature, + &req->member_pub_key)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Incoming join request message from CADET. + */ +static void +handle_cadet_join_request (void *cls, + const struct MulticastJoinRequestMessage *req) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + + struct GNUNET_HashCode group_pub_hash; + GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash); + chn->group_pub_key = req->group_pub_key; + chn->group_pub_hash = group_pub_hash; + chn->member_pub_key = req->member_pub_key; + chn->peer = req->peer; + chn->join_status = JOIN_WAITING; + + client_send_all (&group_pub_hash, &req->header); +} + + +static int +check_cadet_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + uint16_t size = ntohs (hdcsn->header.size); + if (size < sizeof (struct MulticastJoinDecisionMessageHeader) + + sizeof (struct MulticastJoinDecisionMessage)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (NULL == chn->group || GNUNET_NO != chn->group->is_origin) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + switch (chn->join_status) + { + case JOIN_REFUSED: + return GNUNET_SYSERR; + + case JOIN_ADMITTED: + return GNUNET_OK; + + case JOIN_NOT_ASKED: + case JOIN_WAITING: + break; + } + + return GNUNET_OK; +} + + +/** + * Incoming join decision message from CADET. + */ +static void +handle_cadet_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + const struct MulticastJoinDecisionMessage * + dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; + + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + + // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer? + struct Member *mem = (struct Member *) chn->group; + client_send_join_decision (mem, hdcsn); + if (GNUNET_YES == ntohl (dcsn->is_admitted)) + { + chn->join_status = JOIN_ADMITTED; + } + else + { + chn->join_status = JOIN_REFUSED; + cadet_channel_destroy (chn); + } +} + + +static int +check_cadet_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + uint16_t size = ntohs (msg->header.size); + if (size < sizeof (*msg)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (ntohl (msg->purpose.size) != (size + - sizeof (msg->header) + - sizeof (msg->hop_counter) + - sizeof (msg->signature))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE, + &msg->purpose, &msg->signature, + &chn->group_pub_key)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Incoming multicast message from CADET. + */ +static void +handle_cadet_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + client_send_all (&chn->group_pub_hash, &msg->header); +} + + +static int +check_cadet_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + uint16_t size = ntohs (req->header.size); + if (size < sizeof (*req)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (ntohl (req->purpose.size) != (size + - sizeof (req->header) + - sizeof (req->member_pub_key) + - sizeof (req->signature))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST, + &req->purpose, &req->signature, + &req->member_pub_key)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Incoming multicast request message from CADET. + */ +static void +handle_cadet_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + client_send_origin (&chn->group_pub_hash, &req->header); +} + + +static int +check_cadet_replay_request (void *cls, + const struct MulticastReplayRequestMessage *req) +{ + uint16_t size = ntohs (req->header.size); + if (size < sizeof (*req)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Incoming multicast replay request from CADET. + */ +static void +handle_cadet_replay_request (void *cls, + const struct MulticastReplayRequestMessage *req) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + + struct MulticastReplayRequestMessage rep = *req; + GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key)); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &chn->group->pub_key_hash); + if (NULL == grp_replay_req) + { + grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_put (replay_req_cadet, + &chn->group->pub_key_hash, grp_replay_req, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + struct GNUNET_HashCode key_hash; + replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset, + rep.flags, &key_hash); + GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + client_send_random (&chn->group_pub_hash, &rep.header); +} + + +static int +check_cadet_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Incoming multicast replay response from CADET. + */ +static void +handle_cadet_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + + /* @todo FIXME: got replay error response, send request to other members */ +} + + +static void +group_set_cadet_port_hash (struct Group *grp) +{ + struct CadetPort { + struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + uint32_t app_type; + } port = { + grp->pub_key, + GNUNET_APPLICATION_TYPE_MULTICAST, + }; + GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash); +} + + + +/** + * Create new outgoing CADET channel. + * + * @param peer + * Peer to connect to. + * @param group_pub_key + * Public key of group the channel belongs to. + * @param group_pub_hash + * Hash of @a group_pub_key. + * + * @return Channel. + */ +static struct Channel * +cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer) +{ + struct Channel *chn = GNUNET_malloc (sizeof (*chn)); + chn->group = grp; + chn->group_pub_key = grp->pub_key; + chn->group_pub_hash = grp->pub_key_hash; + chn->peer = *peer; + chn->direction = DIR_OUTGOING; + chn->is_connected = GNUNET_NO; + chn->join_status = JOIN_WAITING; + + struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (cadet_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + chn), + + GNUNET_MQ_hd_var_size (cadet_join_decision, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, + struct MulticastJoinDecisionMessageHeader, + chn), + + GNUNET_MQ_hd_var_size (cadet_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + chn), + + GNUNET_MQ_hd_var_size (cadet_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + struct MulticastReplayResponseMessage, + chn), + + GNUNET_MQ_handler_end () + }; + + chn->channel = GNUNET_CADET_channel_creatE (cadet, chn, &chn->peer, + &grp->cadet_port_hash, + GNUNET_CADET_OPTION_RELIABLE, + cadet_notify_window_change, + cadet_notify_disconnect, + cadet_handlers); + GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + return chn; +} + + +/** + * Destroy outgoing CADET channel. + */ +static void +cadet_channel_destroy (struct Channel *chn) +{ + GNUNET_CADET_channel_destroy (chn->channel); + GNUNET_CONTAINER_multihashmap_remove_all (channels_out, &chn->group_pub_hash); + GNUNET_free (chn); +} + +/** + * Handle a connecting client starting an origin. + */ +static void +handle_client_origin_start (void *cls, + const struct MulticastOriginStartMessage *msg) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + + struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + struct GNUNET_HashCode pub_key_hash; + + GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key); + GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash); + + struct Origin * + orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash); + struct Group *grp; + + if (NULL == orig) + { + orig = GNUNET_new (struct Origin); + orig->priv_key = msg->group_key; + orig->max_fragment_id = GNUNET_ntohll (msg->max_fragment_id); + + grp = c->group = &orig->group; + grp->origin = orig; + grp->is_origin = GNUNET_YES; + grp->pub_key = pub_key; + grp->pub_key_hash = pub_key_hash; + + GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + + group_set_cadet_port_hash (grp); + + struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (cadet_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + grp), + + GNUNET_MQ_hd_var_size (cadet_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, + struct GNUNET_MULTICAST_RequestHeader, + grp), + + GNUNET_MQ_hd_var_size (cadet_join_request, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + struct MulticastJoinRequestMessage, + grp), + + GNUNET_MQ_hd_var_size (cadet_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), + + GNUNET_MQ_hd_var_size (cadet_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + struct MulticastReplayResponseMessage, + grp), + + GNUNET_MQ_handler_end () + }; + + + orig->cadet_port = GNUNET_CADET_open_porT (cadet, + &grp->cadet_port_hash, + cadet_notify_connect, + NULL, + cadet_notify_window_change, + cadet_notify_disconnect, + cadet_handlers); + } + else + { + grp = &orig->group; + } + + struct ClientList *cl = GNUNET_new (struct ClientList); + cl->client = client; + GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Client connected as origin to group %s.\n", + orig, GNUNET_h2s (&grp->pub_key_hash)); + GNUNET_SERVICE_client_continue (client); +} + + +static int +check_client_member_join (void *cls, + const struct MulticastMemberJoinMessage *msg) +{ + uint16_t msg_size = ntohs (msg->header.size); + struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1]; + uint32_t relay_count = ntohl (msg->relay_count); + uint16_t relay_size = relay_count * sizeof (*relays); + struct GNUNET_MessageHeader *join_msg = NULL; + uint16_t join_msg_size = 0; + if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader) + <= msg_size) + { + join_msg = (struct GNUNET_MessageHeader *) + (((char *) &msg[1]) + relay_size); + join_msg_size = ntohs (join_msg->size); + } + return + msg_size == (sizeof (*msg) + relay_size + join_msg_size) + ? GNUNET_OK + : GNUNET_SYSERR; +} + + +/** + * Handle a connecting client joining a group. + */ +static void +handle_client_member_join (void *cls, + const struct MulticastMemberJoinMessage *msg) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + + uint16_t msg_size = ntohs (msg->header.size); + + struct GNUNET_CRYPTO_EcdsaPublicKey mem_pub_key; + struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash; + + GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key); + GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash); + GNUNET_CRYPTO_hash (&msg->group_pub_key, sizeof (msg->group_pub_key), &pub_key_hash); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash); + struct Member *mem = NULL; + struct Group *grp; + + if (NULL != grp_mem) + { + mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash); + } + if (NULL == mem) + { + mem = GNUNET_new (struct Member); + mem->origin = msg->origin; + mem->priv_key = msg->member_key; + mem->pub_key = mem_pub_key; + mem->pub_key_hash = mem_pub_key_hash; + mem->max_fragment_id = 0; // FIXME + + grp = c->group = &mem->group; + grp->member = mem; + grp->is_origin = GNUNET_NO; + grp->pub_key = msg->group_pub_key; + grp->pub_key_hash = pub_key_hash; + group_set_cadet_port_hash (grp); + + if (NULL == grp_mem) { grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); GNUNET_CONTAINER_multihashmap_put (group_members, &grp->pub_key_hash, grp_mem, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); @@ -501,7 +1494,7 @@ client_member_join (void *cls, struct GNUNET_SERVER_Client *client, } else { - grp = &mem->grp; + grp = &mem->group; } struct ClientList *cl = GNUNET_new (struct ClientList); @@ -511,24 +1504,24 @@ client_member_join (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client connected to group %s..\n", mem, GNUNET_h2s (&grp->pub_key_hash)); + char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p ..as member %s.\n", - mem, GNUNET_h2s (&mem_pub_key_hash)); - - GNUNET_SERVER_client_set_user_context (client, grp); + "%p ..as member %s (%s).\n", + mem, GNUNET_h2s (&mem->pub_key_hash), str); + GNUNET_free (str); if (NULL != mem->join_dcsn) { /* Already got a join decision, send it to client. */ - GNUNET_SERVER_notification_context_add (nc, client); - GNUNET_SERVER_notification_context_unicast (nc, client, - (struct GNUNET_MessageHeader *) - mem->join_dcsn, - GNUNET_NO); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_copy (&mem->join_dcsn->header); + + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); } - else if (grp->clients_head == grp->clients_tail) + else { /* First client of the group, send join request. */ struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1]; - uint32_t relay_count = ntohs (msg->relay_count); + uint32_t relay_count = ntohl (msg->relay_count); uint16_t relay_size = relay_count * sizeof (*relays); struct GNUNET_MessageHeader *join_msg = NULL; uint16_t join_msg_size = 0; @@ -539,25 +1532,22 @@ client_member_join (void *cls, struct GNUNET_SERVER_Client *client, (((char *) &msg[1]) + relay_size); join_msg_size = ntohs (join_msg->size); } - if (sizeof (*msg) + relay_size + join_msg_size != msg_size) - { - GNUNET_break (0); - GNUNET_SERVER_client_disconnect (client); - return; - } + uint16_t req_msg_size = sizeof (struct MulticastJoinRequestMessage) + join_msg_size; struct MulticastJoinRequestMessage * - req = GNUNET_malloc (sizeof (*req) + join_msg_size); - req->header.size = htons (sizeof (*req) + join_msg_size); + req = GNUNET_malloc (req_msg_size); + req->header.size = htons (req_msg_size); req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST); - req->group_key = grp->pub_key; - req->member_peer = this_peer; - GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_key); + req->group_pub_key = grp->pub_key; + req->peer = this_peer; + GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_pub_key); if (0 < join_msg_size) - memcpy (&req[1], join_msg, join_msg_size); + GNUNET_memcpy (&req[1], join_msg, join_msg_size); - req->purpose.size = htonl (sizeof (*req) + join_msg_size + req->member_pub_key = mem->pub_key; + req->purpose.size = htonl (req_msg_size - sizeof (req->header) + - sizeof (req->reserved) - sizeof (req->signature)); req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST); @@ -572,17 +1562,40 @@ client_member_join (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_free (mem->join_req); mem->join_req = req; - if (GNUNET_YES - == GNUNET_CONTAINER_multihashmap_contains (origins, &grp->pub_key_hash)) - { /* Local origin */ - message_to_origin (grp, (struct GNUNET_MessageHeader *) mem->join_req); - } - else - { - /* FIXME: send join request to remote peers */ + if (0 == client_send_origin (&grp->pub_key_hash, &mem->join_req->header)) + { /* No local origins, send to remote origin */ + cadet_send_join_request (mem); } } - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVICE_client_continue (client); +} + + +static void +client_send_join_decision (struct Member *mem, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + client_send_group (&mem->group, &hdcsn->header); + + const struct MulticastJoinDecisionMessage * + dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; + if (GNUNET_YES == ntohl (dcsn->is_admitted)) + { /* Member admitted, store join_decision. */ + uint16_t dcsn_size = ntohs (dcsn->header.size); + mem->join_dcsn = GNUNET_malloc (dcsn_size); + GNUNET_memcpy (mem->join_dcsn, dcsn, dcsn_size); + } + else + { /* Refused entry, but replay would be still possible for past members. */ + } +} + + +static int +check_client_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + return GNUNET_OK; } @@ -590,97 +1603,107 @@ client_member_join (void *cls, struct GNUNET_SERVER_Client *client, * Join decision from client. */ static void -client_join_decision (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) +handle_client_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) { - struct Group * - grp = GNUNET_SERVER_client_get_user_context (client, struct Group); - const struct MulticastJoinDecisionMessageHeader * - hdcsn = (const struct MulticastJoinDecisionMessageHeader *) m; - const struct MulticastJoinDecisionMessage * - dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join decision from client for group %s..\n", grp, GNUNET_h2s (&grp->pub_key_hash)); - if (GNUNET_YES - == GNUNET_CONTAINER_multihashmap_contains (origins, &grp->pub_key_hash)) - { /* Local origin */ - struct GNUNET_CONTAINER_MultiHashMap * - grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, - &grp->pub_key_hash); - if (NULL != grp_mem) - { - struct GNUNET_HashCode member_key_hash; - GNUNET_CRYPTO_hash (&hdcsn->member_key, sizeof (hdcsn->member_key), - &member_key_hash); - struct Member * - mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p ..and member %s: %p\n", - grp, GNUNET_h2s (&member_key_hash), mem); - if (NULL != mem) - { - message_to_clients (&mem->grp, (struct GNUNET_MessageHeader *) hdcsn); - if (GNUNET_YES == ntohl (dcsn->is_admitted)) - { /* Member admitted, store join_decision. */ - uint16_t dcsn_size = ntohs (dcsn->header.size); - mem->join_dcsn = GNUNET_malloc (dcsn_size); - memcpy (mem->join_dcsn, dcsn, dcsn_size); - } - else - { /* Refused entry, disconnect clients. */ - struct ClientList *cl = mem->grp.clients_head; - while (NULL != cl) - { - struct GNUNET_SERVER_Client *client = cl->client; - cl = cl->next; - GNUNET_SERVER_client_disconnect (client); - } - } - } - } + struct GNUNET_CONTAINER_MultiHashMap * + grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, + &grp->pub_key_hash); + struct Member *mem = NULL; + if (NULL != grp_mem) + { + struct GNUNET_HashCode member_key_hash; + GNUNET_CRYPTO_hash (&hdcsn->member_pub_key, sizeof (hdcsn->member_pub_key), + &member_key_hash); + mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p ..and member %s: %p\n", + grp, GNUNET_h2s (&member_key_hash), mem); + } + if (NULL != mem) + { /* Found local member */ + client_send_join_decision (mem, hdcsn); } else - { - /* FIXME: send join decision to hdcsn->peer */ + { /* Look for remote member */ + cadet_send_join_decision (grp, hdcsn); } - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVICE_client_continue (client); +} + + +static int +check_client_multicast_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + return GNUNET_OK; } + /** * Incoming message from a client. */ static void -client_multicast_message (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) +handle_client_multicast_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) { - struct Group * - grp = GNUNET_SERVER_client_get_user_context (client, struct Group); - GNUNET_assert (GNUNET_YES == grp->is_origin); - struct Origin *orig = (struct Origin *) grp; - struct GNUNET_MULTICAST_MessageHeader * - msg = (struct GNUNET_MULTICAST_MessageHeader *) m; + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; - msg->fragment_id = GNUNET_htonll (orig->max_fragment_id++); - msg->purpose.size = htonl (sizeof (*msg) + ntohs (m->size) - - sizeof (msg->header) - - sizeof (msg->hop_counter) - - sizeof (msg->signature)); - msg->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE); + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + GNUNET_assert (GNUNET_YES == grp->is_origin); + struct Origin *orig = grp->origin; - if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &msg->purpose, - &msg->signature)) + /* FIXME: yucky, should use separate message structs for P2P and CS! */ + struct GNUNET_MULTICAST_MessageHeader * + out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (&msg->header); + out->fragment_id = GNUNET_htonll (++orig->max_fragment_id); + out->purpose.size = htonl (ntohs (out->header.size) + - sizeof (out->header) + - sizeof (out->hop_counter) + - sizeof (out->signature)); + out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE); + + if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &out->purpose, + &out->signature)) { - /* FIXME: handle error */ GNUNET_assert (0); } - /* FIXME: send to remote members */ + client_send_all (&grp->pub_key_hash, &out->header); + cadet_send_children (&grp->pub_key_hash, &out->header); + client_send_ack (&grp->pub_key_hash); + GNUNET_free (out); + + GNUNET_SERVICE_client_continue (client); +} + - message_to_group (grp, m); - GNUNET_SERVER_receive_done (client, GNUNET_OK); +static int +check_client_multicast_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + return GNUNET_OK; } @@ -688,82 +1711,349 @@ client_multicast_message (void *cls, struct GNUNET_SERVER_Client *client, * Incoming request from a client. */ static void -client_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) +handle_client_multicast_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) { - struct Group * - grp = GNUNET_SERVER_client_get_user_context (client, struct Group); + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; + + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } GNUNET_assert (GNUNET_NO == grp->is_origin); - struct Member *mem = (struct Member *) grp; + struct Member *mem = grp->member; + /* FIXME: yucky, should use separate message structs for P2P and CS! */ struct GNUNET_MULTICAST_RequestHeader * - req = (struct GNUNET_MULTICAST_RequestHeader *) m; + out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (&req->header); + out->member_pub_key = mem->pub_key; + out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id); + out->purpose.size = htonl (ntohs (out->header.size) + - sizeof (out->header) + - sizeof (out->member_pub_key) + - sizeof (out->signature)); + out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST); + + if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &out->purpose, + &out->signature)) + { + GNUNET_assert (0); + } + + uint8_t send_ack = GNUNET_YES; + if (0 == client_send_origin (&grp->pub_key_hash, &out->header)) + { /* No local origins, send to remote origin */ + if (NULL != mem->origin_channel) + { + cadet_send_channel (mem->origin_channel, &out->header); + send_ack = GNUNET_NO; + } + else + { + /* FIXME: not yet connected to origin */ + GNUNET_SERVICE_client_drop (client); + GNUNET_free (out); + return; + } + } + if (GNUNET_YES == send_ack) + { + client_send_ack (&grp->pub_key_hash); + } + GNUNET_free (out); + GNUNET_SERVICE_client_continue (client); +} - req->fragment_id = GNUNET_ntohll (mem->max_fragment_id++); - req->purpose.size = htonl (sizeof (*req) + ntohs (m->size) - - sizeof (req->header) - - sizeof (req->member_key) - - sizeof (req->signature)); - req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST); +/** + * Incoming replay request from a client. + */ +static void +handle_client_replay_request (void *cls, + const struct MulticastReplayRequestMessage *rep) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; - if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &req->purpose, - &req->signature)) + if (NULL == grp) { - /* FIXME: handle error */ - GNUNET_assert (0); + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + GNUNET_assert (GNUNET_NO == grp->is_origin); + struct Member *mem = grp->member; + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client, + &grp->pub_key_hash); + if (NULL == grp_replay_req) + { + grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_put (replay_req_client, + &grp->pub_key_hash, grp_replay_req, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } - if (GNUNET_YES - == GNUNET_CONTAINER_multihashmap_contains (origins, &grp->pub_key_hash)) - { /* Local origin */ - message_to_origin (grp, m); + struct GNUNET_HashCode key_hash; + replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset, + rep->flags, &key_hash); + GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + if (0 == client_send_origin (&grp->pub_key_hash, &rep->header)) + { /* No local origin, replay from remote members / origin. */ + if (NULL != mem->origin_channel) + { + cadet_send_channel (mem->origin_channel, &rep->header); + } + else + { + /* FIXME: not yet connected to origin */ + GNUNET_SERVICE_client_drop (client); + return; + } } - else + GNUNET_SERVICE_client_continue (client); +} + + +static int +cadet_send_replay_response_cb (void *cls, + const struct GNUNET_HashCode *key_hash, + void *value) +{ + struct Channel *chn = value; + struct GNUNET_MessageHeader *msg = cls; + + cadet_send_channel (chn, msg); + return GNUNET_OK; +} + + +static int +client_send_replay_response_cb (void *cls, + const struct GNUNET_HashCode *key_hash, + void *value) +{ + struct GNUNET_SERVICE_Client *client = value; + struct GNUNET_MessageHeader *msg = cls; + + client_send (client, msg); + return GNUNET_OK; +} + + +static int +check_client_replay_response_end (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + return GNUNET_OK; +} + + +/** + * End of replay response from a client. + */ +static void +handle_client_replay_response_end (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; + + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + + struct GNUNET_HashCode key_hash; + replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset, + res->flags, &key_hash); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &grp->pub_key_hash); + if (NULL != grp_replay_req_cadet) + { + GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash); + } + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client, + &grp->pub_key_hash); + if (NULL != grp_replay_req_client) + { + GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash); + } + GNUNET_SERVICE_client_continue (client); +} + + +static int +check_client_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + const struct GNUNET_MessageHeader *msg = &res->header; + if (GNUNET_MULTICAST_REC_OK == res->error_code) { - /* FIXME: send to remote origin */ + msg = GNUNET_MQ_extract_nested_mh (res); + if (NULL == msg) + { + return GNUNET_SYSERR; + } } - GNUNET_SERVER_receive_done (client, GNUNET_OK); + return GNUNET_OK; } /** - * Connected to core service. + * Incoming replay response from a client. + * + * Respond with a multicast message on success, or otherwise with an error code. */ static void -core_connected_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity) +handle_client_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) { - this_peer = *my_identity; + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; - static const struct GNUNET_SERVER_MessageHandler handlers[] = { - { &client_origin_start, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 }, + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } - { &client_member_join, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 }, + const struct GNUNET_MessageHeader *msg = &res->header; + if (GNUNET_MULTICAST_REC_OK == res->error_code) + { + msg = GNUNET_MQ_extract_nested_mh (res); + } - { &client_join_decision, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 }, + struct GNUNET_HashCode key_hash; + replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset, + res->flags, &key_hash); - { &client_multicast_message, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 }, + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &grp->pub_key_hash); + if (NULL != grp_replay_req_cadet) + { + GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash, + cadet_send_replay_response_cb, + (void *) msg); + } + if (GNUNET_MULTICAST_REC_OK == res->error_code) + { + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client, + &grp->pub_key_hash); + if (NULL != grp_replay_req_client) + { + GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash, + client_send_replay_response_cb, + (void *) msg); + } + } + else + { + handle_client_replay_response_end (c, res); + return; + } + GNUNET_SERVICE_client_continue (client); +} - { &client_multicast_request, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 }, - {NULL, NULL, 0, 0} - }; +/** + * A new client connected. + * + * @param cls NULL + * @param client client to add + * @param mq message queue for @a client + * @return @a client + */ +static void * +client_notify_connect (void *cls, + struct GNUNET_SERVICE_Client *client, + struct GNUNET_MQ_Handle *mq) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client); + /* FIXME: send connect ACK */ - stats = GNUNET_STATISTICS_create ("multicast", cfg); - origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); - nc = GNUNET_SERVER_notification_context_create (server, 1); + struct Client *c = GNUNET_new (struct Client); + c->client = client; + + return c; +} + + +/** + * Called whenever a client is disconnected. + * Frees our resources associated with that client. + * + * @param cls closure + * @param client identification of the client + * @param app_ctx must match @a client + */ +static void +client_notify_disconnect (void *cls, + struct GNUNET_SERVICE_Client *client, + void *app_ctx) +{ + struct Client *c = app_ctx; + struct Group *grp = c->group; + GNUNET_free (c); + + if (NULL == grp) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p User context is NULL in client_disconnect()\n", grp); + GNUNET_break (0); + return; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Client (%s) disconnected from group %s\n", + grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member", + GNUNET_h2s (&grp->pub_key_hash)); + + struct ClientList *cl = grp->clients_head; + while (NULL != cl) + { + if (cl->client == client) + { + GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl); + GNUNET_free (cl); + break; + } + cl = cl->next; + } + + while (GNUNET_YES == replay_req_remove_client (grp, client)); - GNUNET_SERVER_add_handlers (server, handlers); - GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, - NULL); + if (NULL == grp->clients_head) + { /* Last client disconnected. */ +#if FIXME + if (NULL != grp->tmit_head) + { /* Send pending messages via CADET before cleanup. */ + transmit_message (grp); + } + else +#endif + { + cleanup_group (grp); + } + } } @@ -775,29 +2065,73 @@ core_connected_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity) * @param cfg configuration to use */ static void -run (void *cls, struct GNUNET_SERVER_Handle *srv, - const struct GNUNET_CONFIGURATION_Handle *c) +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *c, + struct GNUNET_SERVICE_Handle *svc) { cfg = c; - server = srv; - core = GNUNET_CORE_connect (cfg, NULL, core_connected_cb, NULL, NULL, - NULL, GNUNET_NO, NULL, GNUNET_NO, NULL); + service = svc; + GNUNET_CRYPTO_get_peer_identity (cfg, &this_peer); + + stats = GNUNET_STATISTICS_create ("multicast", cfg); + origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + channels_in = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + channels_out = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + + cadet = GNUNET_CADET_connecT (cfg); + + GNUNET_assert (NULL != cadet); + + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); } /** - * The main function for the multicast service. - * - * @param argc number of arguments from the command line - * @param argv command line arguments - * @return 0 ok, 1 on error + * Define "main" method using service macro. */ -int -main (int argc, char *const *argv) -{ - return (GNUNET_OK == - GNUNET_SERVICE_run (argc, argv, "multicast", - GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1; -} +GNUNET_SERVICE_MAIN +("multicast", + GNUNET_SERVICE_OPTION_NONE, + run, + client_notify_connect, + client_notify_disconnect, + NULL, + GNUNET_MQ_hd_fixed_size (client_origin_start, + GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, + struct MulticastOriginStartMessage, + NULL), + GNUNET_MQ_hd_var_size (client_member_join, + GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, + struct MulticastMemberJoinMessage, + NULL), + GNUNET_MQ_hd_var_size (client_join_decision, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, + struct MulticastJoinDecisionMessageHeader, + NULL), + GNUNET_MQ_hd_var_size (client_multicast_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (client_multicast_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, + struct GNUNET_MULTICAST_RequestHeader, + NULL), + GNUNET_MQ_hd_fixed_size (client_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + NULL), + GNUNET_MQ_hd_var_size (client_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + struct MulticastReplayResponseMessage, + NULL), + GNUNET_MQ_hd_var_size (client_replay_response_end, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END, + struct MulticastReplayResponseMessage, + NULL)); /* end of gnunet-service-multicast.c */