From: Christian Grothoff Date: Thu, 15 Nov 2018 14:43:41 +0000 (+0100) Subject: getting data structures in place for gnunet-service-tng X-Git-Tag: v0.11.0~238^2~4 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=700359ed8ce18fd6ddfc0940a5d0e5ba145c1fd1;p=oweals%2Fgnunet.git getting data structures in place for gnunet-service-tng --- diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 73b295442..1e638377a 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -72,6 +72,147 @@ enum ClientType }; +/** + * Client connected to the transport service. + */ +struct TransportClient; + + +/** + * A neighbour that at least one communicator is connected to. + */ +struct Neighbour; + + +/** + * List of available queues for a particular neighbour. + */ +struct Queue +{ + /** + * Kept in a MDLL. + */ + struct Queue *next_neighbour; + + /** + * Kept in a MDLL. + */ + struct Queue *prev_neighbour; + + /** + * Kept in a MDLL. + */ + struct Queue *prev_client; + + /** + * Kept in a MDLL. + */ + struct Queue *next_client; + + /** + * Which neighbour is this queue for? + */ + struct Neighbour *neighbour; + + /** + * Which communicator offers this queue? + */ + struct TransportClient *tc; + + /** + * Unique identifier of this queue with the communicator. + */ + uint32_t qid; + + /** + * Network type offered by this queue. + */ + enum GNUNET_ATS_Network_Type nt; + + /** + * Address served by the queue. + */ + const char *address; +}; + + +/** + * A neighbour that at least one communicator is connected to. + */ +struct Neighbour +{ + + /** + * Which peer is this about? + */ + struct GNUNET_PeerIdentity pid; + + /** + * Head of list of messages pending for this neighbour. + */ + struct PendingMessage *pending_msg_head; + + /** + * Tail of list of messages pending for this neighbour. + */ + struct PendingMessage *pending_msg_tail; + + /** + * Head of DLL of queues to this peer. + */ + struct Queue *queue_head; + + /** + * Tail of DLL of queues to this peer. + */ + struct Queue *queue_tail; + +}; + + +/** + * Transmission request from CORE that is awaiting delivery. + */ +struct PendingMessage +{ + /** + * Kept in a MDLL of messages for this @a target. + */ + struct PendingMessage *next_neighbour; + + /** + * Kept in a MDLL of messages for this @a target. + */ + struct PendingMessage *prev_neighbour; + + /** + * Kept in a MDLL of messages from this @a client. + */ + struct PendingMessage *next_client; + + /** + * Kept in a MDLL of messages from this @a client. + */ + struct PendingMessage *prev_client; + + /** + * Target of the request. + */ + struct Neighbour *target; + + /** + * Client that issued the transmission request. + */ + struct TransportClient *client; + + /** + * Size of the original message. + */ + uint32_t bytes_msg; + +}; + + /** * Client connected to the transport service. */ @@ -107,17 +248,63 @@ struct TransportClient { /** - * Peer identity to monitor the addresses of. - * Zero to monitor all neighbours. Valid if - * @e type is #CT_MONITOR. + * Information for @e type #CT_CORE. */ - struct GNUNET_PeerIdentity monitor_peer; + struct { + + /** + * Head of list of messages pending for this client. + */ + struct PendingMessage *pending_msg_head; + + /** + * Tail of list of messages pending for this client. + */ + struct PendingMessage *pending_msg_tail; + + } core; + + /** + * Information for @e type #CT_MONITOR. + */ + struct { + + /** + * Peer identity to monitor the addresses of. + * Zero to monitor all neighbours. Valid if + * @e type is #CT_MONITOR. + */ + struct GNUNET_PeerIdentity peer; + + /** + * Is this a one-shot monitor? + */ + int one_shot; + + } monitor; + /** - * If @e type is #CT_COMMUNICATOR, this communicator - * supports communicating using these addresses. + * Information for @e type #CT_COMMUNICATOR. */ - const char *address_prefix; + struct { + /** + * If @e type is #CT_COMMUNICATOR, this communicator + * supports communicating using these addresses. + */ + char *address_prefix; + + /** + * Head of DLL of queues offered by this communicator. + */ + struct Queue *queue_head; + + /** + * Tail of DLL of queues offered by this communicator. + */ + struct Queue *queue_tail; + + } communicator; } details; @@ -154,6 +341,26 @@ struct GNUNET_PeerIdentity GST_my_identity; */ struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key; +/** + * Map from PIDs to `struct Neighbour` entries. A peer is + * a neighbour if we have an MQ to it from some communicator. + */ +static struct GNUNET_CONTAINER_MultiPeerMap *neighbours; + + +/** + * Lookup neighbour record for peer @a pid. + * + * @param pid neighbour to look for + * @return NULL if we do not have this peer as a neighbour + */ +static struct Neighbour * +lookup_neighbour (const struct GNUNET_PeerIdentity *pid) +{ + return GNUNET_CONTAINER_multipeermap_get (neighbours, + pid); +} + /** * Called whenever a client connects. Allocates our @@ -210,10 +417,23 @@ client_disconnect_cb (void *cls, case CT_NONE: break; case CT_CORE: + { + struct PendingMessage *pm; + + while (NULL != (pm = tc->details.core.pending_msg_head)) + { + GNUNET_CONTAINER_MDLL_remove (client, + tc->details.core.pending_msg_head, + tc->details.core.pending_msg_tail, + pm); + pm->client = NULL; + } + } break; case CT_MONITOR: break; case CT_COMMUNICATOR: + GNUNET_free (tc->details.communicator.address_prefix); break; } GNUNET_free (tc); @@ -268,10 +488,15 @@ static int check_client_send (void *cls, const struct OutboundMessage *obm) { + struct TransportClient *tc = cls; uint16_t size; const struct GNUNET_MessageHeader *obmm; - - (void) cls; + + if (CT_CORE != tc->type) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } size = ntohs (obm->header.size) - sizeof (struct OutboundMessage); if (size < sizeof (struct GNUNET_MessageHeader)) { @@ -288,6 +513,51 @@ check_client_send (void *cls, } +/** + * Send a response to the @a pm that we have processed a + * "send" request with status @a success. We + * transmitted @a bytes_physical on the actual wire. + * Sends a confirmation to the "core" client responsible + * for the original request and free's @a pm. + * + * @param pm handle to the original pending message + * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR + * for transmission failure + * @param bytes_physical amount of bandwidth consumed + */ +static void +client_send_response (struct PendingMessage *pm, + int success, + uint32_t bytes_physical) +{ + struct TransportClient *tc = pm->client; + struct Neighbour *target = pm->target; + struct GNUNET_MQ_Envelope *env; + struct SendOkMessage *som; + + if (NULL != tc) + { + env = GNUNET_MQ_msg (som, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); + som->success = htonl ((uint32_t) success); + som->bytes_msg = htonl (pm->bytes_msg); + som->bytes_physical = htonl (bytes_physical); + som->peer = target->pid; + GNUNET_MQ_send (tc->mq, + env); + GNUNET_CONTAINER_MDLL_remove (client, + tc->details.core.pending_msg_head, + tc->details.core.pending_msg_tail, + pm); + } + GNUNET_CONTAINER_MDLL_remove (neighbour, + target->pending_msg_head, + target->pending_msg_tail, + pm); + GNUNET_free (pm); +} + + /** * Client asked for transmission to a peer. Process the request. * @@ -299,9 +569,55 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) { struct TransportClient *tc = cls; + struct PendingMessage *pm; const struct GNUNET_MessageHeader *obmm; + struct Neighbour *target; + uint32_t bytes_msg; + GNUNET_assert (CT_CORE == tc->type); obmm = (const struct GNUNET_MessageHeader *) &obm[1]; + bytes_msg = ntohs (obmm->size); + target = lookup_neighbour (&obm->peer); + if (NULL == target) + { + /* Failure: don't have this peer as a neighbour (anymore). + Might have gone down asynchronously, so this is NOT + a protocol violation by CORE. Still count the event, + as this should be rare. */ + struct GNUNET_MQ_Envelope *env; + struct SendOkMessage *som; + + env = GNUNET_MQ_msg (som, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); + som->success = htonl (GNUNET_SYSERR); + som->bytes_msg = htonl (bytes_msg); + som->bytes_physical = htonl (0); + som->peer = obm->peer; + GNUNET_MQ_send (tc->mq, + env); + GNUNET_SERVICE_client_continue (tc->client); + GNUNET_STATISTICS_update (GST_stats, + "# messages dropped (neighbour unknown)", + 1, + GNUNET_NO); + return; + } + pm = GNUNET_new (struct PendingMessage); + pm->client = tc; + pm->target = target; + pm->bytes_msg = bytes_msg; + GNUNET_CONTAINER_MDLL_insert (neighbour, + target->pending_msg_head, + target->pending_msg_tail, + pm); + GNUNET_CONTAINER_MDLL_insert (client, + tc->details.core.pending_msg_head, + tc->details.core.pending_msg_tail, + pm); + // FIXME: do the work, continuation with: + client_send_response (pm, + GNUNET_NO, + 0); } @@ -315,10 +631,16 @@ static int check_communicator_available (void *cls, const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam) { + struct TransportClient *tc = cls; const char *addr; uint16_t size; - (void) cls; + if (CT_NONE != tc->type) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + tc->type = CT_COMMUNICATOR; size = ntohs (cam->header.size) - sizeof (*cam); if (0 == size) return GNUNET_OK; /* receive-only communicator */ @@ -345,17 +667,10 @@ handle_communicator_available (void *cls, struct TransportClient *tc = cls; uint16_t size; - if (CT_NONE != tc->type) - { - GNUNET_break (0); - GNUNET_SERVICE_client_drop (tc->client); - return; - } - tc->type = CT_COMMUNICATOR; size = ntohs (cam->header.size) - sizeof (*cam); if (0 == size) return; /* receive-only communicator */ - tc->details.address_prefix = GNUNET_strdup ((const char *) &cam[1]); + tc->details.communicator.address_prefix = GNUNET_strdup ((const char *) &cam[1]); GNUNET_SERVICE_client_continue (tc->client); } @@ -370,10 +685,15 @@ static int check_add_address (void *cls, const struct GNUNET_TRANSPORT_AddAddressMessage *aam) { + struct TransportClient *tc = cls; const char *addr; uint16_t size; - (void) cls; + if (CT_COMMUNICATOR != tc->type) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } size = ntohs (aam->header.size) - sizeof (*aam); if (0 == size) { @@ -418,12 +738,19 @@ handle_del_address (void *cls, { struct TransportClient *tc = cls; + if (CT_COMMUNICATOR != tc->type) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } + GNUNET_SERVICE_client_continue (tc->client); } /** - * Client asked for transmission to a peer. Process the request. + * Client notified us about transmission from a peer. Process the request. * * @param cls the client * @param obm the send message that was sent @@ -432,10 +759,15 @@ static int check_incoming_msg (void *cls, const struct GNUNET_TRANSPORT_IncomingMessage *im) { + struct TransportClient *tc = cls; uint16_t size; const struct GNUNET_MessageHeader *obmm; - (void) cls; + if (CT_COMMUNICATOR != tc->type) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } size = ntohs (im->header.size) - sizeof (*im); if (size < sizeof (struct GNUNET_MessageHeader)) { @@ -478,10 +810,15 @@ static int check_add_queue_message (void *cls, const struct GNUNET_TRANSPORT_AddQueueMessage *aqm) { + struct TransportClient *tc = cls; const char *addr; uint16_t size; - (void) cls; + if (CT_COMMUNICATOR != tc->type) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } size = ntohs (aqm->header.size) - sizeof (*aqm); if (0 == size) { @@ -509,11 +846,65 @@ handle_add_queue_message (void *cls, const struct GNUNET_TRANSPORT_AddQueueMessage *aqm) { struct TransportClient *tc = cls; + struct Queue *queue; + struct Neighbour *neighbour; + const char *addr; + uint16_t addr_len; + neighbour = lookup_neighbour (&aqm->receiver); + if (NULL == neighbour) + { + neighbour = GNUNET_new (struct Neighbour); + neighbour->pid = aqm->receiver; + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_put (neighbours, + &neighbour->pid, + neighbour, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + // FIXME: notify cores/monitors! + } + addr_len = ntohs (aqm->header.size) - sizeof (*aqm); + addr = (const char *) &aqm[1]; + + queue = GNUNET_malloc (sizeof (struct Queue) + addr_len); + queue->qid = aqm->qid; + queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt); + queue->tc = tc; + queue->neighbour = neighbour; + queue->address = (const char *) &queue[1]; + memcpy (&queue[1], + addr, + addr_len); + GNUNET_CONTAINER_MDLL_insert (neighbour, + neighbour->queue_head, + neighbour->queue_tail, + queue); + GNUNET_CONTAINER_MDLL_insert (client, + tc->details.communicator.queue_head, + tc->details.communicator.queue_tail, + queue); + // FIXME: possibly transmit queued messages? GNUNET_SERVICE_client_continue (tc->client); } +/** + * Release memory used by @a neighbour. + * + * @param neighbour neighbour entry to free + */ +static void +free_neighbour (struct Neighbour *neighbour) +{ + GNUNET_assert (NULL == neighbour->queue_head); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (neighbours, + &neighbour->pid, + neighbour)); + GNUNET_free (neighbour); +} + + /** * Queue to a peer went down. Process the request. * @@ -526,7 +917,42 @@ handle_del_queue_message (void *cls, { struct TransportClient *tc = cls; - GNUNET_SERVICE_client_continue (tc->client); + if (CT_COMMUNICATOR != tc->type) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } + for (struct Queue *queue = tc->details.communicator.queue_head; + NULL != queue; + queue = queue->next_client) + { + struct Neighbour *neighbour = queue->neighbour; + + if ( (dqm->qid != queue->qid) || + (0 != memcmp (&dqm->receiver, + &neighbour->pid, + sizeof (struct GNUNET_PeerIdentity))) ) + continue; + GNUNET_CONTAINER_MDLL_remove (neighbour, + neighbour->queue_head, + neighbour->queue_tail, + queue); + GNUNET_CONTAINER_MDLL_remove (client, + tc->details.communicator.queue_head, + tc->details.communicator.queue_tail, + queue); + GNUNET_free (queue); + if (NULL == neighbour->queue_head) + { + // FIXME: notify cores/monitors! + free_neighbour (neighbour); + } + GNUNET_SERVICE_client_continue (tc->client); + return; + } + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); } @@ -542,6 +968,12 @@ handle_send_message_ack (void *cls, { struct TransportClient *tc = cls; + if (CT_COMMUNICATOR != tc->type) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } GNUNET_SERVICE_client_continue (tc->client); } @@ -565,12 +997,37 @@ handle_monitor_start (void *cls, return; } tc->type = CT_MONITOR; - tc->details.monitor_peer = start->peer; - // FIXME: remember also the one_shot flag! + tc->details.monitor.peer = start->peer; + tc->details.monitor.one_shot = ntohl (start->one_shot); + // FIXME: do work! GNUNET_SERVICE_client_continue (tc->client); } +/** + * Free neighbour entry. + * + * @param cls NULL + * @param pid unused + * @param value a `struct Neighbour` + * @return #GNUNET_OK (always) + */ +static int +free_neighbour_cb (void *cls, + const struct GNUNET_PeerIdentity *pid, + void *value) +{ + struct Neighbour *neighbour = value; + + (void) cls; + (void) pid; + GNUNET_break (0); // should this ever happen? + free_neighbour (neighbour); + + return GNUNET_OK; +} + + /** * Function called when the service shuts down. Unloads our plugins * and cancels pending validations. @@ -578,7 +1035,7 @@ handle_monitor_start (void *cls, * @param cls closure, unused */ static void -shutdown_task (void *cls) +do_shutdown (void *cls) { (void) cls; @@ -593,6 +1050,10 @@ shutdown_task (void *cls) GNUNET_free (GST_my_private_key); GST_my_private_key = NULL; } + GNUNET_CONTAINER_multipeermap_iterate (neighbours, + &free_neighbour_cb, + NULL); + GNUNET_CONTAINER_multipeermap_destroy (neighbours); } @@ -608,8 +1069,11 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *c, struct GNUNET_SERVICE_Handle *service) { + (void) cls; /* setup globals */ GST_cfg = c; + neighbours = GNUNET_CONTAINER_multipeermap_create (1024, + GNUNET_YES); GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg); if (NULL == GST_my_private_key) { @@ -626,7 +1090,7 @@ run (void *cls, GST_stats = GNUNET_STATISTICS_create ("transport", GST_cfg); - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); /* start subsystems */ }