From 83c058a5ea11b6d7aa05cb71963c6063cb373603 Mon Sep 17 00:00:00 2001 From: Gabor X Toth <*@tg-x.net> Date: Sat, 17 May 2014 10:16:15 +0000 Subject: [PATCH] multicast, psyc: client connections, join requests --- src/include/gnunet_protocols.h | 50 +-- src/include/gnunet_psyc_service.h | 5 +- src/include/gnunet_signatures.h | 7 +- src/multicast/gnunet-service-multicast.c | 399 ++++++++++++++++--- src/multicast/multicast.h | 8 +- src/multicast/multicast_api.c | 99 ++--- src/psyc/gnunet-service-psyc.c | 481 +++++++++++++++++------ src/psyc/psyc.h | 15 + src/psyc/psyc_api.c | 40 +- src/psyc/test_psyc.c | 6 +- 10 files changed, 817 insertions(+), 293 deletions(-) diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index aeeeed9ea..e943b9c29 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -2337,71 +2337,61 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START 750 -/** - * C->S: Stop the origin. - */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP 751 - /** * C->S: Join group as a member. */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 752 - -/** - * C->S: Part the group. - */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART 753 - -/** - * C<->S<->T: Multicast message from the origin to all members. - */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 754 - -/** - * C<->S<->T: Unicast request from a group member to the origin. - */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 755 +#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 751 /** * C<--S<->T: A peer wants to join the group. * * Unicast message to the origin or another group member. */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST +#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST 752 /** * C<->S<->T: Response to a join request. * * Unicast message from a group member to the peer wanting to join. */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION +#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION 753 /** * A peer wants to part the group. */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST +#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST 754 /** * Acknowledgement sent in response to a part request. * * Unicast message from a group member to the peer wanting to part. */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK +#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK 755 /** * Group terminated. */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_GROUP_END +#define GNUNET_MESSAGE_TYPE_MULTICAST_GROUP_END 756 /** - * + * C<->S<->T: Multicast message from the origin to all members. */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST +#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 757 /** - * + * C<->S<->T: Unicast request from a group member to the origin. + */ +#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 758 + +/** + * C<->S<->T: Replay request from a group member to another member. + */ +#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 759 + +/** + * C<->S<->T: Cancellation of a replay request. */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL +#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL 760 diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h index 928e05242..48c1107dc 100644 --- a/src/include/gnunet_psyc_service.h +++ b/src/include/gnunet_psyc_service.h @@ -620,9 +620,7 @@ typedef void * @param message_cb Function to invoke on message parts received from the * channel, typically at least contains method handlers for @e join and * @e part. - * @param join_cb function invoked once we have joined with the current - * message ID of the channel - * @param slave_joined_cb Function to invoke when a peer wants to join. + * @param slave_joined_cb Function invoked once we have joined the channel. * @param cls Closure for @a message_cb and @a slave_joined_cb. * @param method_name Method name for the join request. * @param env Environment containing transient variables for the request, or NULL. @@ -638,7 +636,6 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, uint32_t relay_count, const struct GNUNET_PeerIdentity *relays, GNUNET_PSYC_MessageCallback message_cb, - GNUNET_PSYC_JoinCallback join_cb, GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, void *cls, const char *method_name, diff --git a/src/include/gnunet_signatures.h b/src/include/gnunet_signatures.h index fafe6e5ae..d875aeb6b 100644 --- a/src/include/gnunet_signatures.h +++ b/src/include/gnunet_signatures.h @@ -137,7 +137,7 @@ extern "C" #define GNUNET_SIGNATURE_PURPOSE_REGEX_ACCEPT 18 /** - * Signature of a multicast message. + * Signature of a multicast message sent by the origin. */ #define GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE 19 @@ -166,6 +166,11 @@ extern "C" */ #define GNUNET_SIGNATURE_PURPOSE_SECRETSHARING_DECRYPTION 23 +/** + * Signature of a multicast request sent by a member. + */ +#define GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST 24 + #if 0 /* keep Emacsens' auto-indent happy */ { diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c index 0265660e1..0394ee19e 100644 --- a/src/multicast/gnunet-service-multicast.c +++ b/src/multicast/gnunet-service-multicast.c @@ -46,22 +46,40 @@ static struct GNUNET_SERVER_NotificationContext *nc; /** * All connected origins. - * Group's pub_key_hash -> struct Group + * Group's pub_key_hash -> struct Origin */ static struct GNUNET_CONTAINER_MultiHashMap *origins; /** * All connected members. - * Group's pub_key_hash -> struct Group + * Group's pub_key_hash -> struct Member */ static struct GNUNET_CONTAINER_MultiHashMap *members; +/** + * Connected members per group. + * Group's pub_key_hash -> Member's pub_key -> struct Member + */ +static struct GNUNET_CONTAINER_MultiHashMap *group_members; + + +/** + * List of connected clients. + */ +struct ClientList +{ + struct ClientList *prev; + struct ClientList *next; + struct GNUNET_SERVER_Client *client; +}; + /** * Common part of the client context for both an origin and member. */ struct Group { - struct GNUNET_SERVER_Client *client; + struct ClientList *clients_head; + struct ClientList *clients_tail; /** * Public key of the group. @@ -116,6 +134,29 @@ struct Member */ struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; + /** + * Public key of the member. + */ + struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + + /** + * Hash of @a pub_key. + */ + struct GNUNET_HashCode pub_key_hash; + + /** + * Join request sent to the origin / members. + */ + struct GNUNET_MULTICAST_JoinRequest *join_request; + + /** + * Join decision sent in reply to our request. + * + * Only a positive decision is stored here, in case of a negative decision the + * client is disconnected. + */ + struct MulticastJoinDecisionMessage *join_decision; + /** * Last request fragment ID sent to the origin. */ @@ -135,23 +176,161 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /* FIXME: do clean up here */ } +/** + * Clean up origin data structures after a client disconnected. + */ +static void +cleanup_origin (struct Origin *orig) +{ + struct Group *grp = &orig->grp; + GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig); +} + + +/** + * Clean up member data structures after a client disconnected. + */ +static void +cleanup_member (struct Member *mem) +{ + struct Group *grp = &mem->grp; + struct GNUNET_CONTAINER_MultiHashMap * + grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, + &grp->pub_key_hash); + GNUNET_assert (NULL != grp_mem); + GNUNET_CONTAINER_multihashmap_remove (grp_mem, &mem->pub_key_hash, mem); + + if (0 == GNUNET_CONTAINER_multihashmap_size (grp_mem)) + { + GNUNET_CONTAINER_multihashmap_remove (group_members, &grp->pub_key_hash, + grp_mem); + GNUNET_CONTAINER_multihashmap_destroy (grp_mem); + } + GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem); +} + + +/** + * Clean up group data structures after a client disconnected. + */ +static void +cleanup_group (struct Group *grp) +{ + (GNUNET_YES == grp->is_origin) + ? cleanup_origin ((struct Origin *) grp) + : cleanup_member ((struct Member *) grp); + + GNUNET_free (grp); +} + + +/** + * Called whenever a client is disconnected. + * + * Frees our resources associated with that client. + * + * @param cls Closure. + * @param client Client handle. + */ +static void +client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) +{ + if (NULL == client) + return; + + struct Group *grp + = GNUNET_SERVER_client_get_user_context (client, struct Group); + + if (NULL == grp) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p User context is NULL in client_disconnect()\n", grp); + GNUNET_assert (0); + return; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Client (%s) disconnected from group %s\n", + grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member", + GNUNET_h2s (&grp->pub_key_hash)); + + struct ClientList *cl = grp->clients_head; + while (NULL != cl) + { + if (cl->client == client) + { + GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl); + GNUNET_free (cl); + break; + } + cl = cl->next; + } + + if (NULL == grp->clients_head) + { /* Last client disconnected. */ +#if FIXME + if (NULL != grp->tmit_head) + { /* Send pending messages via CADET before cleanup. */ + transmit_message (grp); + } + else +#endif + { + cleanup_group (grp); + } + } +} + /** - * Iterator callback for sending a message to clients. + * Send message to all clients connected to the group. + */ +static void +message_to_clients (const struct Group *grp, + const struct GNUNET_MessageHeader *msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Sending message to clients.\n", grp); + + struct ClientList *cl = grp->clients_head; + while (NULL != cl) + { + GNUNET_SERVER_notification_context_add (nc, cl->client); + GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO); + cl = cl->next; + } +} + + +/** + * Iterator callback for sending a message to origin clients. */ static int -message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *group) +origin_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *origin) { const struct GNUNET_MessageHeader *msg = cls; - struct Group *grp = group; + struct Member *orig = origin; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Sending message to client.\n", grp); + message_to_clients (&orig->grp, msg); + return GNUNET_YES; +} - GNUNET_SERVER_notification_context_add (nc, grp->client); - GNUNET_SERVER_notification_context_unicast (nc, grp->client, msg, GNUNET_NO); +/** + * Iterator callback for sending a message to member clients. + */ +static int +member_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *member) +{ + const struct GNUNET_MessageHeader *msg = cls; + struct Member *mem = member; + + if (NULL != mem->join_decision) + { /* Only send message to admitted members */ + message_to_clients (&mem->grp, msg); + } return GNUNET_YES; } @@ -167,10 +346,10 @@ message_to_group (struct Group *grp, const struct GNUNET_MessageHeader *msg) { if (origins != NULL) GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - message_callback, (void *) msg); + origin_message_cb, (void *) msg); if (members != NULL) GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, - message_callback, (void *) msg); + member_message_cb, (void *) msg); } @@ -185,7 +364,7 @@ message_to_origin (struct Group *grp, const struct GNUNET_MessageHeader *msg) { if (origins != NULL) GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - message_callback, (void *) msg); + origin_message_cb, (void *) msg); } @@ -199,37 +378,46 @@ handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client, const struct MulticastOriginStartMessage * msg = (const struct MulticastOriginStartMessage *) m; - struct Origin *orig = GNUNET_new (struct Origin); - orig->priv_key = msg->group_key; + struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + struct GNUNET_HashCode pub_key_hash; - struct Group *grp = &orig->grp; - grp->is_origin = GNUNET_YES; - grp->client = client; + GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key); + GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash); + + struct Origin * + orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash); + struct Group *grp; + + if (NULL == orig) + { + orig = GNUNET_new (struct Origin); + orig->priv_key = msg->group_key; + grp = &orig->grp; + grp->is_origin = GNUNET_YES; + grp->pub_key = pub_key; + grp->pub_key_hash = pub_key_hash; + + GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + else + { + grp = &orig->grp; + } - GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key); - GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash); + struct ClientList *cl = GNUNET_new (struct ClientList); + cl->client = client; + GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client connected as origin to group %s.\n", orig, GNUNET_h2s (&grp->pub_key_hash)); GNUNET_SERVER_client_set_user_context (client, grp); - GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); GNUNET_SERVER_receive_done (client, GNUNET_OK); } -/** - * Handle a client stopping an origin. - */ -static void -handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) -{ -} - - /** * Handle a connecting client joining a group. */ @@ -240,34 +428,113 @@ handle_member_join (void *cls, struct GNUNET_SERVER_Client *client, struct MulticastMemberJoinMessage * msg = (struct MulticastMemberJoinMessage *) m; - struct Member *mem = GNUNET_new (struct Member); - mem->priv_key = msg->member_key; + struct GNUNET_CRYPTO_EddsaPublicKey mem_pub_key; + struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash; - struct Group *grp = &mem->grp; - grp->is_origin = GNUNET_NO; - grp->client = client; - grp->pub_key = msg->group_key; - GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash); + GNUNET_CRYPTO_eddsa_key_get_public (&msg->member_key, &mem_pub_key); + GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash); + GNUNET_CRYPTO_hash (&msg->group_key, sizeof (msg->group_key), &pub_key_hash); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash); + struct Member *mem = NULL; + struct Group *grp; + + if (NULL == grp_mem) + { + grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + GNUNET_CONTAINER_multihashmap_put (group_members, &pub_key_hash, grp_mem, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + } + else + { + mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash); + } + + if (NULL == mem) + { + mem = GNUNET_new (struct Member); + mem->priv_key = msg->member_key; + mem->pub_key = mem_pub_key; + mem->pub_key_hash = mem_pub_key_hash; + + grp = &mem->grp; + grp->is_origin = GNUNET_NO; + grp->pub_key = msg->group_key; + grp->pub_key_hash = pub_key_hash; + + GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem_pub_key_hash, mem, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + } + else + { + grp = &mem->grp; + } + + struct ClientList *cl = GNUNET_new (struct ClientList); + cl->client = client; + GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client connected as member to group %s.\n", mem, GNUNET_h2s (&grp->pub_key_hash)); GNUNET_SERVER_client_set_user_context (client, grp); - GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - -/** - * Handle a client parting a group. - */ -static void -handle_member_part (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) -{ + if (NULL != mem->join_decision) + { /* Already got a join decision, send it to client. */ + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, + (struct GNUNET_MessageHeader *) + mem->join_decision, + GNUNET_NO); + } + else if (grp->clients_head == grp->clients_tail) + { /* First client, send join request. */ + struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1]; + uint32_t relay_count = ntohs (msg->relay_count); + struct GNUNET_MessageHeader * + join_req = ((struct GNUNET_MessageHeader *) + ((char *) &msg[1]) + relay_count * sizeof (*relays)); + uint16_t join_req_size = ntohs (join_req->size); + + struct MulticastJoinRequestMessage * + req = GNUNET_malloc (sizeof (*req) + join_req_size); + req->header.size = htons (sizeof (*req) + join_req_size); + req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST); + req->group_key = grp->pub_key; + GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &req->member_key); + memcpy (&req[1], join_req, join_req_size); + + req->purpose.size = htonl (sizeof (*req) + join_req_size + - sizeof (req->header) + - sizeof (req->signature)); + req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST); + + if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->priv_key, &req->purpose, + &req->signature)) + { + /* FIXME: handle error */ + GNUNET_assert (0); + } + + if (NULL != mem->join_request) + GNUNET_free (mem->join_request); + mem->join_request = req; + + if (GNUNET_YES + == GNUNET_CONTAINER_multihashmap_contains (origins, &grp->pub_key_hash)) + { /* Local origin */ + message_to_origin (grp, (struct GNUNET_MessageHeader *) mem->join_request); + } + else + { + /* FIXME: send join request to remote origin / members */ + } + } + GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -296,7 +563,7 @@ handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client, &msg->signature)) { /* FIXME: handle error */ - return; + GNUNET_assert (0); } /* FIXME: send to remote members */ @@ -327,18 +594,24 @@ handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, - sizeof (req->header) - sizeof (req->member_key) - sizeof (req->signature)); - req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE); + req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST); if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->priv_key, &req->purpose, &req->signature)) { /* FIXME: handle error */ - return; + GNUNET_assert (0); } - /* FIXME: send to remote origin */ - - message_to_origin (grp, m); + if (GNUNET_YES + == GNUNET_CONTAINER_multihashmap_contains (origins, &grp->pub_key_hash)) + { /* Local origin */ + message_to_origin (grp, m); + } + else + { + /* FIXME: send to remote origin */ + } GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -357,15 +630,9 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, { &handle_origin_start, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 }, - { &handle_origin_stop, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP, 0 }, - { &handle_member_join, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 }, - { &handle_member_part, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART, 0 }, - { &handle_multicast_message, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 }, @@ -379,9 +646,11 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, stats = GNUNET_STATISTICS_create ("multicast", cfg); origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); nc = GNUNET_SERVER_notification_context_create (server, 1); GNUNET_SERVER_add_handlers (server, handlers); + GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task, NULL); } diff --git a/src/multicast/multicast.h b/src/multicast/multicast.h index daa79e260..5b0fc647c 100644 --- a/src/multicast/multicast.h +++ b/src/multicast/multicast.h @@ -33,7 +33,7 @@ GNUNET_NETWORK_STRUCT_BEGIN /** * Header of a join request sent to the origin or another member. */ -struct GNUNET_MULTICAST_JoinRequest +struct MulticastJoinRequestMessage { /** * Header for the join request. @@ -67,7 +67,7 @@ struct GNUNET_MULTICAST_JoinRequest */ struct GNUNET_PeerIdentity member_peer; - /* Followed by request body. */ + /* Followed by struct GNUNET_MessageHeader join_request */ }; @@ -97,9 +97,9 @@ struct MulticastJoinDecisionMessage */ uint32_t relay_count; - /* followed by 'relay_count' peer identities */ + /* Followed by relay_count peer identities */ - /* followed by the join response message */ + /* Followed by the join response message */ }; diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index d42f438ae..84dac0545 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -196,6 +196,17 @@ struct GNUNET_MULTICAST_Member */ struct GNUNET_MULTICAST_JoinHandle { + struct GNUNET_MULTICAST_Group *group; + + /** + * Public key of the joining member. + */ + struct GNUNET_CRYPTO_EddsaPublicKey member_key; + + /** + * Peer identity of the joining member. + */ + struct GNUNET_PeerIdentity member_peer; }; @@ -437,8 +448,7 @@ disconnect (void *g) * Iterator callback for calling message callbacks for all groups. */ static int -message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *group) +message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group) { const struct GNUNET_MessageHeader *msg = cls; struct GNUNET_MULTICAST_Group *grp = group; @@ -455,33 +465,11 @@ message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash, } -/** - * Handle a multicast message from the service. - * - * Call message callbacks of all origins and members of the destination group. - * - * @param grp Destination group of the message. - * @param msg The message. - */ -static void -handle_multicast_message (struct GNUNET_MULTICAST_Group *grp, - const struct GNUNET_MULTICAST_MessageHeader *msg) -{ - if (origins != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - message_callback, (void *) msg); - if (members != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, - message_callback, (void *) msg); -} - - /** * Iterator callback for calling request callbacks of origins. */ static int -request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash, - void *origin) +request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin) { const struct GNUNET_MULTICAST_RequestHeader *req = cls; struct GNUNET_MULTICAST_Origin *orig = origin; @@ -497,20 +485,26 @@ request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash, /** - * Handle a multicast request from the service. - * - * Call request callbacks of all origins of the destination group. - * - * @param grp Destination group of the message. - * @param msg The message. + * Iterator callback for calling join request callbacks of origins. */ -static void -handle_multicast_request (struct GNUNET_MULTICAST_Group *grp, - const struct GNUNET_MULTICAST_RequestHeader *req) +static int +join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *group) { - if (NULL != origins) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - request_callback, (void *) req); + const struct MulticastJoinRequestMessage *req = cls; + struct GNUNET_MULTICAST_Group *grp = group; + + struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); + jh->group = grp; + jh->member_key = req->member_key; + jh->member_peer = req->member_peer; + + const struct GNUNET_MessageHeader *msg = NULL; + if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size)) + msg =(const struct GNUNET_MessageHeader *) &req[1]; + + grp->join_cb (grp->cb_cls, &req->member_key, msg, jh); + return GNUNET_YES; } @@ -551,22 +545,31 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader); break; + case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST: + size_min = sizeof (struct MulticastJoinRequestMessage); + break; + default: GNUNET_break_op (0); - return; + type = 0; } if (! ((0 < size_eq && size == size_eq) || (0 < size_min && size_min <= size))) { GNUNET_break_op (0); - return; + type = 0; } switch (type) { case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: - handle_multicast_message (grp, (struct GNUNET_MULTICAST_MessageHeader *) msg); + if (origins != NULL) + GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, + message_cb, (void *) msg); + if (members != NULL) + GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, + message_cb, (void *) msg); break; case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: @@ -576,12 +579,19 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) break; } - handle_multicast_request (grp, (struct GNUNET_MULTICAST_RequestHeader *) msg); + if (NULL != origins) + GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, + request_cb, (void *) msg); break; - default: - GNUNET_break_op (0); - return; + case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST: + if (NULL != origins) + GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, + join_request_cb, (void *) msg); + if (NULL != members) + GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, + join_request_cb, (void *) msg); + break; } if (NULL != grp->client) @@ -621,6 +631,7 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_response) { + GNUNET_free (jh); return NULL; } diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 70322adaa..765371d77 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c @@ -59,16 +59,22 @@ static struct GNUNET_PSYCSTORE_Handle *store; /** * All connected masters. - * Channel's pub_key_hash -> struct Channel + * Channel's pub_key_hash -> struct Master */ static struct GNUNET_CONTAINER_MultiHashMap *masters; /** * All connected slaves. - * Channel's pub_key_hash -> struct Channel + * Channel's pub_key_hash -> struct Slave */ static struct GNUNET_CONTAINER_MultiHashMap *slaves; +/** + * Connected slaves per channel. + * Channel's pub_key_hash -> Slave's pub_key -> struct Slave + */ +static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves; + /** * Message in the transmission queue. @@ -78,6 +84,8 @@ struct TransmitMessage struct TransmitMessage *prev; struct TransmitMessage *next; + struct GNUNET_SERVER_Client *client; + /** * ID assigned to the message. */ @@ -163,12 +171,24 @@ struct FragmentQueue }; +/** + * List of connected clients. + */ +struct ClientList +{ + struct ClientList *prev; + struct ClientList *next; + struct GNUNET_SERVER_Client *client; +}; + + /** * Common part of the client context for both a channel master and slave. */ struct Channel { - struct GNUNET_SERVER_Client *client; + struct ClientList *clients_head; + struct ClientList *clients_tail; struct TransmitMessage *tmit_head; struct TransmitMessage *tmit_tail; @@ -315,6 +335,16 @@ struct Slave */ struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; + /** + * Public key of the slave. + */ + struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + + /** + * Hash of @a pub_key. + */ + struct GNUNET_HashCode pub_key_hash; + /** * Handle for the multicast member. */ @@ -378,30 +408,62 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } +/** + * Clean up master data structures after a client disconnected. + */ static void -client_cleanup (struct Channel *ch) +cleanup_master (struct Master *mst) { - /* FIXME: fragment_cache_clear */ + struct Channel *ch = &mst->channel; - if (ch->is_master) - { - struct Master *mst = (struct Master *) ch; - if (NULL != mst->origin) - GNUNET_MULTICAST_origin_stop (mst->origin); - GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch); - } - else + if (NULL != mst->origin) + GNUNET_MULTICAST_origin_stop (mst->origin); + GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch); +} + + +/** + * Clean up slave data structures after a client disconnected. + */ +static void +cleanup_slave (struct Slave *slv) +{ + struct Channel *ch = &slv->channel; + struct GNUNET_CONTAINER_MultiHashMap * + ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, + &ch->pub_key_hash); + GNUNET_assert (NULL != ch_slv); + GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv); + + if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv)) { - struct Slave *slv = (struct Slave *) ch; - if (NULL != slv->join_req) - GNUNET_free (slv->join_req); - if (NULL != slv->relays) - GNUNET_free (slv->relays); - if (NULL != slv->member) - GNUNET_MULTICAST_member_part (slv->member); - GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch); + GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash, + ch_slv); + GNUNET_CONTAINER_multihashmap_destroy (ch_slv); } + GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv); + + if (NULL != slv->join_req) + GNUNET_free (slv->join_req); + if (NULL != slv->relays) + GNUNET_free (slv->relays); + if (NULL != slv->member) + GNUNET_MULTICAST_member_part (slv->member); + GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch); +} + +/** + * Clean up channel data structures after a client disconnected. + */ +static void +cleanup_channel (struct Channel *ch) +{ + /* FIXME: fragment_cache_clear */ + + (GNUNET_YES == ch->is_master) + ? cleanup_master ((struct Master *) ch) + : cleanup_slave ((struct Slave *) ch); GNUNET_free (ch); } @@ -421,7 +483,10 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, struct Channel); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Client (%s) disconnected from channel %s\n", + ch, (GNUNET_YES == ch->is_master) ? "master" : "slave", + GNUNET_h2s (&ch->pub_key_hash)); if (NULL == ch) { @@ -431,29 +496,112 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) return; } - ch->disconnected = GNUNET_YES; + struct ClientList *cl = ch->clients_head; + while (NULL != cl) + { + if (cl->client == client) + { + GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl); + GNUNET_free (cl); + break; + } + cl = cl->next; + } + + if (NULL == ch->clients_head) + { /* Last client disconnected. */ + if (NULL != ch->tmit_head) + { /* Send pending messages to multicast before cleanup. */ + transmit_message (ch); + } + else + { + cleanup_channel (ch); + } + } +} + - /* Send pending messages to multicast before cleanup. */ - if (NULL != ch->tmit_head) +/** + * Send message to all clients connected to the channel. + */ +static void +msg_to_clients (const struct Channel *ch, + const struct GNUNET_MessageHeader *msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Sending message to clients.\n", ch); + + struct ClientList *cl = ch->clients_head; + while (NULL != cl) { - transmit_message (ch); + GNUNET_SERVER_notification_context_add (nc, cl->client); + GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO); + cl = cl->next; + } +} + + +/** + * Closure for join_mem_test_cb() + */ +struct JoinMemTestCls +{ + struct Channel *ch; + struct GNUNET_MULTICAST_JoinHandle *jh; + struct MasterJoinRequest *master_join_req; +}; + + +/** + * Membership test result callback used for join requests.m + */ +static void +join_mem_test_cb (void *cls, int64_t result, const char *err_msg) +{ + struct JoinMemTestCls *jcls = cls; + + if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master) + { /* Pass on join request to client if this is a master channel */ + msg_to_clients (jcls->ch, + (struct GNUNET_MessageHeader *) jcls->master_join_req); } else { - client_cleanup (ch); + // FIXME: relays + GNUNET_MULTICAST_join_decision(jcls->jh, result, 0, NULL, NULL); } + GNUNET_free (jcls->master_join_req); + GNUNET_free (jcls); } /** - * Master receives a join request from a slave. + * Incoming join request from multicast. */ static void join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, const struct GNUNET_MessageHeader *join_req, struct GNUNET_MULTICAST_JoinHandle *jh) { - + struct Channel *ch = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch); + + uint16_t join_req_size = (NULL != join_req) ? ntohs (join_req->size) : 0; + struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_req_size); + req->header.size = htons (sizeof (*req) + join_req_size); + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST); + req->slave_key = *slave_key; + memcpy (&req[1], join_req, join_req_size); + + struct JoinMemTestCls *jcls = GNUNET_malloc (sizeof (*jcls)); + jcls->ch = ch; + jcls->jh = jh; + jcls->master_join_req = req; + + GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key, + ch->max_message_id, 0, + &join_mem_test_cb, jcls); } @@ -474,6 +622,7 @@ replay_fragment_cb (void *cls, struct GNUNET_MULTICAST_ReplayHandle *rh) { + } @@ -497,35 +646,6 @@ fragment_store_result (void *cls, int64_t result, const char *err_msg) } -static void -message_to_client (struct Channel *ch, - const struct GNUNET_MULTICAST_MessageHeader *mmsg) -{ - uint16_t size = ntohs (mmsg->header.size); - struct GNUNET_PSYC_MessageHeader *pmsg; - uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Sending message to client. " - "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", - ch, GNUNET_ntohll (mmsg->fragment_id), - GNUNET_ntohll (mmsg->message_id)); - - pmsg = GNUNET_malloc (psize); - pmsg->header.size = htons (psize); - pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); - pmsg->message_id = mmsg->message_id; - - memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); - - GNUNET_SERVER_notification_context_add (nc, ch->client); - GNUNET_SERVER_notification_context_unicast (nc, ch->client, - (const struct GNUNET_MessageHeader *) pmsg, - GNUNET_NO); - GNUNET_free (pmsg); -} - - /** * Convert an uint64_t in network byte order to a HashCode * that can be used as key in a MultiHashMap @@ -563,6 +683,34 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) } +/** + * Send multicast message to all clients connected to the channel. + */ +static void +mmsg_to_clients (struct Channel *ch, + const struct GNUNET_MULTICAST_MessageHeader *mmsg) +{ + uint16_t size = ntohs (mmsg->header.size); + struct GNUNET_PSYC_MessageHeader *pmsg; + uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Sending message to client. " + "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", + ch, GNUNET_ntohll (mmsg->fragment_id), + GNUNET_ntohll (mmsg->message_id)); + + pmsg = GNUNET_malloc (psize); + pmsg->header.size = htons (psize); + pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + pmsg->message_id = mmsg->message_id; + + memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); + msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg); + GNUNET_free (pmsg); +} + + /** * Insert a multicast message fragment into the queue belonging to the message. * @@ -752,7 +900,7 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id, { if (GNUNET_NO == drop) { - message_to_client (ch, cache_entry->mmsg); + mmsg_to_clients (ch, cache_entry->mmsg); } if (cache_entry->ref_count <= 1) { @@ -997,11 +1145,7 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); memcpy (&pmsg[1], &req[1], size - sizeof (*req)); - - GNUNET_SERVER_notification_context_add (nc, ch->client); - GNUNET_SERVER_notification_context_unicast (nc, ch->client, - (const struct GNUNET_MessageHeader *) pmsg, - GNUNET_NO); + msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg); GNUNET_free (pmsg); break; } @@ -1025,11 +1169,11 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, struct Master *mst = cls; struct Channel *ch = &mst->channel; - struct CountersResult *res = GNUNET_malloc (sizeof (*res)); - res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); - res->header.size = htons (sizeof (*res)); - res->result_code = htonl (result); - res->max_message_id = GNUNET_htonll (max_message_id); + struct CountersResult res; + res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); + res.header.size = htons (sizeof (res)); + res.result_code = htonl (result); + res.max_message_id = GNUNET_htonll (max_message_id); if (GNUNET_OK == result || GNUNET_NO == result) { @@ -1053,10 +1197,7 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, ch, result, GNUNET_h2s (&ch->pub_key_hash)); } - GNUNET_SERVER_notification_context_add (nc, ch->client); - GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, - GNUNET_NO); - GNUNET_free (res); + msg_to_clients (ch, &res.header); } @@ -1071,11 +1212,11 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, struct Slave *slv = cls; struct Channel *ch = &slv->channel; - struct CountersResult *res = GNUNET_malloc (sizeof (*res)); - res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); - res->header.size = htons (sizeof (*res)); - res->result_code = htonl (result); - res->max_message_id = GNUNET_htonll (max_message_id); + struct CountersResult res; + res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); + res.header.size = htons (sizeof (res)); + res.result_code = htonl (result); + res.max_message_id = GNUNET_htonll (max_message_id); if (GNUNET_OK == result || GNUNET_NO == result) { @@ -1099,10 +1240,7 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, ch, result, GNUNET_h2s (&ch->pub_key_hash)); } - GNUNET_SERVER_notification_context_add (nc, ch->client); - GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, - GNUNET_NO); - GNUNET_free (res); + msg_to_clients (ch, &res.header); } @@ -1125,25 +1263,55 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, const struct MasterStartRequest *req = (const struct MasterStartRequest *) msg; - struct Master *mst = GNUNET_new (struct Master); - mst->policy = ntohl (req->policy); - mst->priv_key = req->channel_key; + struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + struct GNUNET_HashCode pub_key_hash; - struct Channel *ch = &mst->channel; - ch->client = client; - ch->is_master = GNUNET_YES; - GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key); - GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash); - channel_init (ch); + GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key); + GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash); + + struct Master * + mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash); + struct Channel *ch; + + if (NULL == mst) + { + mst = GNUNET_new (struct Master); + mst->policy = ntohl (req->policy); + mst->priv_key = req->channel_key; + + ch = &mst->channel; + ch->is_master = GNUNET_YES; + ch->pub_key = pub_key; + ch->pub_key_hash = pub_key_hash; + channel_init (ch); + + GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst); + } + else + { + ch = &mst->channel; + + struct CountersResult res; + res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); + res.header.size = htons (sizeof (res)); + res.result_code = htonl (GNUNET_OK); + res.max_message_id = GNUNET_htonll (mst->max_message_id); + + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, &res.header, + GNUNET_NO); + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Master connected to channel %s.\n", + "%p Client connected as master to channel %s.\n", mst, GNUNET_h2s (&ch->pub_key_hash)); - GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst); + struct ClientList *cl = GNUNET_new (struct ClientList); + cl->client = client; + GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl); - GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); GNUNET_SERVER_client_set_user_context (client, ch); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -1158,37 +1326,82 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, { const struct SlaveJoinRequest *req = (const struct SlaveJoinRequest *) msg; - struct Slave *slv = GNUNET_new (struct Slave); - slv->priv_key = req->slave_key; - slv->origin = req->origin; - slv->relay_count = ntohl (req->relay_count); - if (0 < slv->relay_count) + + struct GNUNET_CRYPTO_EddsaPublicKey slv_pub_key; + struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash; + + GNUNET_CRYPTO_eddsa_key_get_public (&req->slave_key, &slv_pub_key); + GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash); + GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash); + + struct GNUNET_CONTAINER_MultiHashMap * + ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash); + struct Slave *slv = NULL; + struct Channel *ch; + + if (NULL == ch_slv) { - const struct GNUNET_PeerIdentity *relays - = (const struct GNUNET_PeerIdentity *) &req[1]; - slv->relays - = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity)); - uint32_t i; - for (i = 0; i < slv->relay_count; i++) - memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); + ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + GNUNET_CONTAINER_multihashmap_put (channel_slaves, &pub_key_hash, ch_slv, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + } + else + { + slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash); } - struct Channel *ch = &slv->channel; - ch->client = client; - ch->is_master = GNUNET_NO; - ch->pub_key = req->channel_key; - GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), - &ch->pub_key_hash); - channel_init (ch); + if (NULL == slv) + { + slv = GNUNET_new (struct Slave); + slv->priv_key = req->slave_key; + slv->origin = req->origin; + slv->relay_count = ntohl (req->relay_count); + if (0 < slv->relay_count) + { + const struct GNUNET_PeerIdentity *relays + = (const struct GNUNET_PeerIdentity *) &req[1]; + slv->relays + = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity)); + uint32_t i; + for (i = 0; i < slv->relay_count; i++) + memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); + } + + ch = &slv->channel; + ch->is_master = GNUNET_NO; + ch->pub_key = req->channel_key; + ch->pub_key_hash = pub_key_hash; + channel_init (ch); + + GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv_pub_key_hash, ch, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv); + } + else + { + ch = &slv->channel; + + struct CountersResult res; + res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); + res.header.size = htons (sizeof (res)); + res.result_code = htonl (GNUNET_OK); + res.max_message_id = GNUNET_htonll (ch->max_message_id); + + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, &res.header, + GNUNET_NO); + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Slave connected to channel %s.\n", + "%p Client connected as slave to channel %s.\n", slv, GNUNET_h2s (&ch->pub_key_hash)); - GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv); + struct ClientList *cl = GNUNET_new (struct ClientList); + cl->client = client; + GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl); - GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); GNUNET_SERVER_client_set_user_context (client, &slv->channel); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -1202,14 +1415,15 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, * @param ch The channel struct for the client. */ static void -send_message_ack (struct Channel *ch) +send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client) { struct GNUNET_MessageHeader res; res.size = htons (sizeof (res)); res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK); - GNUNET_SERVER_notification_context_add (nc, ch->client); - GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, GNUNET_NO); + /* FIXME */ + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO); } @@ -1236,12 +1450,13 @@ transmit_notify (void *cls, size_t *data_size, void *data) *data_size = tmit_msg->size; memcpy (data, &tmit_msg[1], *data_size); + int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; + if (NULL != tmit_msg->client) + send_message_ack (ch, tmit_msg->client); + GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); GNUNET_free (tmit_msg); - int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; - send_message_ack (ch); - if (0 == ch->tmit_task) { if (NULL != ch->tmit_head) @@ -1251,7 +1466,7 @@ transmit_notify (void *cls, size_t *data_size, void *data) else if (ch->disconnected) { /* FIXME: handle partial message (when still in_transmit) */ - client_cleanup (ch); + cleanup_channel (ch); } } @@ -1394,12 +1609,15 @@ slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg, static void -queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg, +queue_message (struct Channel *ch, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg, uint16_t first_ptype, uint16_t last_ptype) { uint16_t size = ntohs (msg->size) - sizeof (*msg); struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size); memcpy (&tmit_msg[1], &msg[1], size); + tmit_msg->client = client; tmit_msg->size = size; tmit_msg->state = ch->tmit_state; @@ -1414,7 +1632,7 @@ queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg, static void -transmit_error (struct Channel *ch) +transmit_error (struct Channel *ch, struct GNUNET_SERVER_Client *client) { uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; @@ -1422,7 +1640,7 @@ transmit_error (struct Channel *ch) msg.size = ntohs (sizeof (msg)); msg.type = ntohs (type); - queue_message (ch, &msg, type, type); + queue_message (ch, client, &msg, type, type); transmit_message (ch); /* FIXME: cleanup */ @@ -1458,7 +1676,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch); GNUNET_break (0); - transmit_error (ch); + transmit_error (ch, client); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } @@ -1472,12 +1690,12 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Received invalid message part from client.\n", ch); GNUNET_break (0); - transmit_error (ch); + transmit_error (ch, client); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - queue_message (ch, msg, first_ptype, last_ptype); + queue_message (ch, client, msg, first_ptype, last_ptype); transmit_message (ch); GNUNET_SERVER_receive_done (client, GNUNET_OK); @@ -1581,6 +1799,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, stats = GNUNET_STATISTICS_create ("psyc", cfg); masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); nc = GNUNET_SERVER_notification_context_create (server, 1); GNUNET_SERVER_add_handlers (server, handlers); diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index f2d386548..ab7b35d40 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h @@ -227,6 +227,21 @@ struct OperationResult }; +struct MasterJoinRequest +{ + /** + * Types: + * - GNUNET_MESSAGE_TYPE_PSYC_MASTER_JOIN_REQUEST + */ + struct GNUNET_MessageHeader header; + /** + * Public key of the joining slave. + */ + struct GNUNET_CRYPTO_EddsaPublicKey slave_key; + + /* Followed by struct GNUNET_MessageHeader join_request */ +}; + GNUNET_NETWORK_STRUCT_END #endif diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 85f86ceaa..62f099166 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c @@ -126,12 +126,7 @@ struct GNUNET_PSYC_Channel GNUNET_PSYC_MessageCallback hist_message_cb; /** - * Join handler callback. - */ - GNUNET_PSYC_JoinCallback join_cb; - - /** - * Closure for @a message_cb and @a join_cb. + * Closure for @a message_cb. */ void *cb_cls; @@ -200,6 +195,11 @@ struct GNUNET_PSYC_Master struct GNUNET_PSYC_Channel ch; GNUNET_PSYC_MasterStartCallback start_cb; + + /** + * Join handler callback. + */ + GNUNET_PSYC_JoinCallback join_cb; }; @@ -908,6 +908,18 @@ handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch) } +static void +handle_psyc_join_request (struct GNUNET_PSYC_Master *mst, + const struct MasterJoinRequest *req) +{ + // FIXME: extract join message from req[1] + const char *method_name = "_fixme"; + struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); + mst->join_cb (mst->ch.cb_cls, &req->slave_key, method_name, + 0, NULL, NULL, 0, jh); +} + + /** * Type of a function to call when we receive a message * from the service. @@ -951,6 +963,9 @@ message_handler (void *cls, case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: size_eq = sizeof (struct GNUNET_MessageHeader); break; + case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST: + size_min = sizeof (struct MasterJoinRequest); + break; default: GNUNET_break_op (0); return; @@ -988,6 +1003,11 @@ message_handler (void *cls, case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg); break; + + case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST: + handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch, + (const struct MasterJoinRequest *) msg); + break; } if (NULL != ch->client) @@ -1186,8 +1206,8 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, req->policy = policy; mst->start_cb = master_started_cb; + mst->join_cb = join_cb; ch->message_cb = message_cb; - ch->join_cb = join_cb; ch->cb_cls = cls; ch->cfg = cfg; ch->is_master = GNUNET_YES; @@ -1320,9 +1340,7 @@ GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) * @param message_cb Function to invoke on message parts received from the * channel, typically at least contains method handlers for @e join and * @e part. - * @param join_cb function invoked once we have joined with the current - * message ID of the channel - * @param slave_joined_cb Function to invoke when a peer wants to join. + * @param slave_joined_cb Function invoked once we have joined the channel. * @param cls Closure for @a message_cb and @a slave_joined_cb. * @param method_name Method name for the join request. * @param env Environment containing transient variables for the request, or NULL. @@ -1339,7 +1357,6 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, uint32_t relay_count, const struct GNUNET_PeerIdentity *relays, GNUNET_PSYC_MessageCallback message_cb, - GNUNET_PSYC_JoinCallback join_cb, GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, void *cls, const char *method_name, @@ -1362,7 +1379,6 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, slv->join_cb = slave_joined_cb; ch->message_cb = message_cb; - ch->join_cb = join_cb; ch->cb_cls = cls; ch->cfg = cfg; diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index f58ecb7f6..cef8a5dcf 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c @@ -130,6 +130,7 @@ end_badly (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { res = 1; cleanup (); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test FAILED.\n"); } @@ -144,6 +145,7 @@ end_normally (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { res = 0; cleanup (); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test PASSED.\n"); } @@ -181,7 +183,7 @@ master_message (void *cls, uint64_t message_id, uint32_t flags, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master got message part of type %u and size %u " - "belonging to message ID %llu with flags %u\n", + "belonging to message ID %llu with flags %bu\n", type, size, message_id, flags); switch (test) @@ -225,7 +227,7 @@ slave_message (void *cls, uint64_t message_id, uint32_t flags, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave got message part of type %u and size %u " - "belonging to message ID %llu with flags %u\n", + "belonging to message ID %llu with flags %bu\n", type, size, message_id, flags); switch (test) -- 2.25.1