X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdv%2Fgnunet-service-dv.c;h=3fbd0cc030459710be5c81784a1197997342156d;hb=633c9ed2d8392f0620dee12513f65a4cc602ea60;hp=bd0057ab47a2c37027e27666dd559f03643bbfe6;hpb=29e6158507a0758192075ac6ece7ba8e75ddc49a;p=oweals%2Fgnunet.git diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index bd0057ab4..3fbd0cc03 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2013 GNUnet e.V. + Copyright (C) 2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -124,39 +124,6 @@ struct RouteMessage GNUNET_NETWORK_STRUCT_END -/** - * Linked list of messages to send to clients. - */ -struct PendingMessage -{ - /** - * Pointer to next item in the list - */ - struct PendingMessage *next; - - /** - * Pointer to previous item in the list - */ - struct PendingMessage *prev; - - /** - * Actual message to be sent, allocated after this struct. - */ - const struct GNUNET_MessageHeader *msg; - - /** - * Next target for the message (a neighbour of ours). - */ - struct GNUNET_PeerIdentity next_target; - - /** - * Unique ID of the message. - */ - uint32_t uid; - -}; - - /** * Information about a direct neighbor (core-level, excluding * DV-links, only DV-enabled peers). @@ -177,20 +144,10 @@ struct DirectNeighbor */ struct GNUNET_HashCode real_session_id; - /** - * Head of linked list of messages to send to this peer. - */ - struct PendingMessage *pm_head; - - /** - * Tail of linked list of messages to send to this peer. - */ - struct PendingMessage *pm_tail; - /** * Transmit handle to core service. */ - struct GNUNET_CORE_TransmitHandle *cth; + struct GNUNET_MQ_Handle *mq; /** * Routing table of the neighbor, NULL if not yet established. @@ -230,7 +187,7 @@ struct DirectNeighbor * ID of the task we use to (periodically) update our consensus * with this peer. Used if we are the initiating peer. */ - struct GNUNET_SCHEDULER_Task * initiate_task; + struct GNUNET_SCHEDULER_Task *initiate_task; /** * At what offset are we, with respect to inserting our own routes @@ -244,11 +201,6 @@ struct DirectNeighbor */ unsigned int consensus_insertion_distance; - /** - * Number of messages currently in the 'pm_XXXX'-DLL. - */ - unsigned int pm_queue_size; - /** * Elements in consensus */ @@ -371,7 +323,7 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; * Hopefully this client will never change, although if the plugin * dies and returns for some reason it may happen. */ -static struct GNUNET_SERVER_NotificationContext *nc; +static struct GNUNET_NotificationContext *nc; /** * Handle for the statistics service. @@ -386,7 +338,7 @@ static struct GNUNET_ATS_PerformanceHandle *ats; /** * Task scheduled to refresh routes based on direct neighbours. */ -static struct GNUNET_SCHEDULER_Task * rr_task; +static struct GNUNET_SCHEDULER_Task *rr_task; /** * #GNUNET_YES if we are shutting down. @@ -457,10 +409,10 @@ send_data_to_plugin (const struct GNUNET_MessageHeader *message, received_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DV_RECV); received_msg->distance = htonl (distance); received_msg->sender = *origin; - memcpy (&received_msg[1], message, ntohs (message->size)); - GNUNET_SERVER_notification_context_broadcast (nc, - &received_msg->header, - GNUNET_YES); + GNUNET_memcpy (&received_msg[1], message, ntohs (message->size)); + GNUNET_notification_context_broadcast (nc, + &received_msg->header, + GNUNET_YES); GNUNET_free (received_msg); } @@ -473,36 +425,9 @@ send_data_to_plugin (const struct GNUNET_MessageHeader *message, static void send_control_to_plugin (const struct GNUNET_MessageHeader *message) { - GNUNET_SERVER_notification_context_broadcast (nc, - message, - GNUNET_NO); -} - - -/** - * Give an (N)ACK message to the plugin, we transmitted a message for it. - * - * @param target peer that received the message - * @param uid plugin-chosen UID for the message - * @param nack #GNUNET_NO to send ACK, #GNUNET_YES to send NACK - */ -static void -send_ack_to_plugin (const struct GNUNET_PeerIdentity *target, - uint32_t uid, - int nack) -{ - struct GNUNET_DV_AckMessage ack_msg; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Delivering ACK for message to peer `%s'\n", - GNUNET_i2s (target)); - ack_msg.header.size = htons (sizeof (ack_msg)); - ack_msg.header.type = htons ((GNUNET_YES == nack) - ? GNUNET_MESSAGE_TYPE_DV_SEND_NACK - : GNUNET_MESSAGE_TYPE_DV_SEND_ACK); - ack_msg.uid = htonl (uid); - ack_msg.target = *target; - send_control_to_plugin (&ack_msg.header); + GNUNET_notification_context_broadcast (nc, + message, + GNUNET_NO); } @@ -580,76 +505,11 @@ send_disconnect_to_plugin (const struct GNUNET_PeerIdentity *target) } -/** - * Function called to transfer a message to another peer - * via core. - * - * @param cls closure with the direct neighbor - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -core_transmit_notify (void *cls, size_t size, void *buf) -{ - struct DirectNeighbor *dn = cls; - char *cbuf = buf; - struct PendingMessage *pending; - size_t off; - size_t msize; - - dn->cth = NULL; - if (NULL == buf) - { - /* client disconnected */ - return 0; - } - off = 0; - while ( (NULL != (pending = dn->pm_head)) && - (size >= off + (msize = ntohs (pending->msg->size)))) - { - dn->pm_queue_size--; - GNUNET_CONTAINER_DLL_remove (dn->pm_head, - dn->pm_tail, - pending); - memcpy (&cbuf[off], pending->msg, msize); - if (0 != pending->uid) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Acking transmission of %u bytes to %s with plugin\n", - msize, - GNUNET_i2s (&pending->next_target)); - send_ack_to_plugin (&pending->next_target, - pending->uid, - GNUNET_NO); - } - GNUNET_free (pending); - off += msize; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting total of %u bytes to %s\n", - off, - GNUNET_i2s (&dn->peer)); - GNUNET_assert (NULL != core_api); - if (NULL != pending) - dn->cth = - GNUNET_CORE_notify_transmit_ready (core_api, - GNUNET_YES /* cork */, - GNUNET_CORE_PRIO_BEST_EFFORT, - GNUNET_TIME_UNIT_FOREVER_REL, - &dn->peer, - msize, - &core_transmit_notify, dn); - return off; -} - - /** * Forward the given payload to the given target. * * @param target where to send the message * @param distance distance to the @a sender - * @param uid unique ID for the message * @param sender original sender of the message * @param actual_target ultimate recipient for the message * @param payload payload of the message @@ -657,17 +517,14 @@ core_transmit_notify (void *cls, size_t size, void *buf) static void forward_payload (struct DirectNeighbor *target, uint32_t distance, - uint32_t uid, const struct GNUNET_PeerIdentity *sender, const struct GNUNET_PeerIdentity *actual_target, const struct GNUNET_MessageHeader *payload) { - struct PendingMessage *pm; + struct GNUNET_MQ_Envelope *env; struct RouteMessage *rm; - size_t msize; - if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) && - (0 == uid) && + if ( (GNUNET_MQ_get_length (target->mq) >= MAX_QUEUE_SIZE) && (0 != memcmp (sender, &my_identity, sizeof (struct GNUNET_PeerIdentity))) ) @@ -675,39 +532,24 @@ forward_payload (struct DirectNeighbor *target, /* not _our_ client and queue is full, drop */ GNUNET_STATISTICS_update (stats, "# messages dropped", - 1, GNUNET_NO); + 1, + GNUNET_NO); return; } - msize = sizeof (struct RouteMessage) + ntohs (payload->size); - if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + if (sizeof (struct RouteMessage) + ntohs (payload->size) + >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { GNUNET_break (0); return; } - pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); - pm->next_target = target->peer; - pm->uid = uid; - pm->msg = (const struct GNUNET_MessageHeader *) &pm[1]; - rm = (struct RouteMessage *) &pm[1]; - rm->header.size = htons ((uint16_t) msize); - rm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_ROUTE); + env = GNUNET_MQ_msg_nested_mh (rm, + GNUNET_MESSAGE_TYPE_DV_ROUTE, + payload); rm->distance = htonl (distance); rm->target = *actual_target; rm->sender = *sender; - memcpy (&rm[1], payload, ntohs (payload->size)); - GNUNET_CONTAINER_DLL_insert_tail (target->pm_head, - target->pm_tail, - pm); - target->pm_queue_size++; - GNUNET_assert (NULL != core_api); - if (NULL == target->cth) - target->cth = GNUNET_CORE_notify_transmit_ready (core_api, - GNUNET_YES /* cork */, - GNUNET_CORE_PRIO_BEST_EFFORT, - GNUNET_TIME_UNIT_FOREVER_REL, - &target->peer, - msize, - &core_transmit_notify, target); + GNUNET_MQ_send (target->mq, + env); } @@ -855,8 +697,12 @@ build_set (void *cls) /* Find next non-NULL entry */ neighbor->consensus_insertion_offset++; - if ( (0 != memcmp (&target->peer, &my_identity, sizeof (my_identity))) && - (0 != memcmp (&target->peer, &neighbor->peer, sizeof (neighbor->peer))) ) + if ( (0 != memcmp (&target->peer, + &my_identity, + sizeof (my_identity))) && + (0 != memcmp (&target->peer, + &neighbor->peer, + sizeof (struct GNUNET_PeerIdentity))) ) { /* Add target if it is not the neighbor or this peer */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -916,8 +762,12 @@ handle_direct_connect (struct DirectNeighbor *neighbor) /* construct session ID seed as XOR of both peer's identities */ - GNUNET_CRYPTO_hash (&my_identity, sizeof (my_identity), &h1); - GNUNET_CRYPTO_hash (&neighbor->peer, sizeof (struct GNUNET_PeerIdentity), &h2); + GNUNET_CRYPTO_hash (&my_identity, + sizeof (my_identity), + &h1); + GNUNET_CRYPTO_hash (&neighbor->peer, + sizeof (struct GNUNET_PeerIdentity), + &h2); GNUNET_CRYPTO_hash_xor (&h1, &h2, &session_id); @@ -949,7 +799,7 @@ handle_direct_connect (struct DirectNeighbor *neighbor) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting SET listen operation with peer `%s'\n", - GNUNET_i2s(&neighbor->peer)); + GNUNET_i2s (&neighbor->peer)); neighbor->listen_handle = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, &neighbor->real_session_id, @@ -965,16 +815,21 @@ handle_direct_connect (struct DirectNeighbor *neighbor) * * @param cls closure * @param peer peer identity this notification is about + * @param mq message queue for sending data to @a peer + * @return our `struct DirectNeighbour` for this peer */ -static void +static void * handle_core_connect (void *cls, - const struct GNUNET_PeerIdentity *peer) + const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Handle *mq) { struct DirectNeighbor *neighbor; /* Check for connect to self message */ - if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) - return; + if (0 == memcmp (&my_identity, + peer, + sizeof (struct GNUNET_PeerIdentity))) + return NULL; /* check if entry exists */ neighbor = GNUNET_CONTAINER_multipeermap_get (direct_neighbors, peer); @@ -988,9 +843,9 @@ handle_core_connect (void *cls, GNUNET_i2s (peer), (unsigned int) neighbor->distance); if (DIRECT_NEIGHBOR_COST != neighbor->distance) - return; + return NULL; handle_direct_connect (neighbor); - return; + return NULL; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core connected to %s (distance unknown)\n", @@ -999,12 +854,13 @@ handle_core_connect (void *cls, neighbor->peer = *peer; GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_put (direct_neighbors, - peer, + &neighbor->peer, neighbor, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); neighbor->connected = GNUNET_YES; neighbor->distance = 0; /* unknown */ neighbor->network = GNUNET_ATS_NET_UNSPECIFIED; + return neighbor; } @@ -1200,19 +1056,12 @@ handle_direct_disconnect (struct DirectNeighbor *neighbor) GNUNET_CONTAINER_multipeermap_iterate (all_routes, &cull_routes, neighbor); - if (NULL != neighbor->cth) - { - GNUNET_CORE_notify_transmit_ready_cancel (neighbor->cth); - neighbor->cth = NULL; - } - if (NULL != neighbor->direct_route) { release_route (neighbor->direct_route); GNUNET_free (neighbor->direct_route); neighbor->direct_route = NULL; } - if (NULL != neighbor->neighbor_table_consensus) { GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table_consensus, @@ -1333,7 +1182,7 @@ handle_ats_update (void *cls, neighbor->peer = address->peer; GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_put (direct_neighbors, - &address->peer, + &neighbor->peer, neighbor, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); neighbor->connected = GNUNET_NO; /* not yet */ @@ -1497,16 +1346,19 @@ check_target_added (void *cls, * * @param cls the `struct DirectNeighbor` we're building the consensus with * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK + * @param current_size current set size * @param status see `enum GNUNET_SET_Status` */ static void handle_set_union_result (void *cls, const struct GNUNET_SET_Element *element, + uint64_t current_size, enum GNUNET_SET_Status status) { struct DirectNeighbor *neighbor = cls; struct DirectNeighbor *dn; struct Target *target; + const struct Target *ctarget; char *status_str; switch (status) @@ -1539,14 +1391,18 @@ handle_set_union_result (void *cls, GNUNET_break_op (0); return; } - if ( (NULL != (dn = GNUNET_CONTAINER_multipeermap_get (direct_neighbors, &((struct Target *) element->data)->peer))) && (DIRECT_NEIGHBOR_COST == dn->distance) ) + ctarget = element->data; + if ( (NULL != + (dn = GNUNET_CONTAINER_multipeermap_get (direct_neighbors, + &ctarget->peer))) && + (DIRECT_NEIGHBOR_COST == dn->distance) ) { /* this is a direct neighbor of ours, we do not care about routes to this peer */ return; } target = GNUNET_new (struct Target); - memcpy (target, element->data, sizeof (struct Target)); + GNUNET_memcpy (target, element->data, sizeof (struct Target)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received information about peer `%s' with distance %u from SET\n", GNUNET_i2s (&target->peer), @@ -1674,6 +1530,7 @@ listen_set_union (void *cls, GNUNET_SET_OPERATION_UNION); neighbor->set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, + (struct GNUNET_SET_Option[]) {{ 0 }}, &handle_set_union_result, neighbor); neighbor->consensus_insertion_offset = 0; @@ -1704,6 +1561,7 @@ initiate_set_union (void *cls) &neighbor->real_session_id, NULL, GNUNET_SET_RESULT_ADDED, + (struct GNUNET_SET_Option[]) {{ 0 }}, &handle_set_union_result, neighbor); neighbor->consensus_insertion_offset = 0; @@ -1713,6 +1571,34 @@ initiate_set_union (void *cls) } +/** + * Check that @a rm is well-formed. + * + * @param cls closure + * @param rm the message + * @return #GNUNET_OK if @a rm is well-formed. + */ +static int +check_dv_route_message (void *cls, + const struct RouteMessage *rm) +{ + const struct GNUNET_MessageHeader *payload; + + if (ntohs (rm->header.size) < sizeof (struct RouteMessage) + sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + payload = (const struct GNUNET_MessageHeader *) &rm[1]; + if (ntohs (rm->header.size) != sizeof (struct RouteMessage) + ntohs (payload->size)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + /** * Core handler for DV data messages. Whatever this message * contains all we really have to do is rip it out of its @@ -1720,19 +1606,16 @@ initiate_set_union (void *cls) * in with. * * @param cls closure - * @param peer peer which sent the message (immediate sender) - * @param message the message - * @return #GNUNET_OK on success, #GNUNET_SYSERR if the other peer violated the protocol + * @param rm the message */ -static int +static void handle_dv_route_message (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) + const struct RouteMessage *rm) { - const struct RouteMessage *rm; + struct DirectNeighbor *neighbor = cls; const struct GNUNET_MessageHeader *payload; struct Route *route; - struct DirectNeighbor *neighbor; + struct DirectNeighbor *nneighbor; struct DirectNeighbor *dn; struct Target *target; uint32_t distance; @@ -1741,39 +1624,31 @@ handle_dv_route_message (void *cls, char prev[5]; char dst[5]; - if (ntohs (message->size) < sizeof (struct RouteMessage) + sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - rm = (const struct RouteMessage *) message; distance = ntohl (rm->distance); payload = (const struct GNUNET_MessageHeader *) &rm[1]; - if (ntohs (message->size) != sizeof (struct RouteMessage) + ntohs (payload->size)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - strncpy (prev, GNUNET_i2s (peer), 4); + strncpy (prev, GNUNET_i2s (&neighbor->peer), 4); strncpy (me, GNUNET_i2s (&my_identity), 4); strncpy (src, GNUNET_i2s (&rm->sender), 4); strncpy (dst, GNUNET_i2s (&rm->target), 4); prev[4] = me[4] = src[4] = dst[4] = '\0'; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Handling DV message with %u bytes payload of type %u from %s to %s routed by %s to me (%s @ hop %u)\n", - ntohs (message->size) - sizeof (struct RouteMessage), + (unsigned int) (ntohs (rm->header.size) - sizeof (struct RouteMessage)), ntohs (payload->type), - src, dst, - prev, me, + src, + dst, + prev, + me, (unsigned int) distance + 1); if (0 == memcmp (&rm->target, &my_identity, sizeof (struct GNUNET_PeerIdentity))) { - if ((NULL - != (dn = GNUNET_CONTAINER_multipeermap_get (direct_neighbors, - &rm->sender))) && (DIRECT_NEIGHBOR_COST == dn->distance)) + if ((NULL != + (dn = GNUNET_CONTAINER_multipeermap_get (direct_neighbors, + &rm->sender))) && + (DIRECT_NEIGHBOR_COST == dn->distance)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Discarding DV message, as %s is a direct neighbor\n", @@ -1781,7 +1656,7 @@ handle_dv_route_message (void *cls, GNUNET_STATISTICS_update (stats, "# messages discarded (direct neighbor)", 1, GNUNET_NO); - return GNUNET_OK; + return; } /* message is for me, check reverse route! */ route = GNUNET_CONTAINER_multipeermap_get (all_routes, @@ -1790,13 +1665,6 @@ handle_dv_route_message (void *cls, (distance < DEFAULT_FISHEYE_DEPTH) ) { /* don't have reverse route yet, learn it! */ - neighbor = GNUNET_CONTAINER_multipeermap_get (direct_neighbors, - peer); - if (NULL == neighbor) - { - GNUNET_break (0); - return GNUNET_OK; - } target = GNUNET_new (struct Target); target->peer = rm->sender; target->distance = htonl (distance); @@ -1814,7 +1682,7 @@ handle_dv_route_message (void *cls, { GNUNET_break_op (0); GNUNET_free (target); - return GNUNET_OK; + return; } add_new_route (target, neighbor); } @@ -1825,7 +1693,7 @@ handle_dv_route_message (void *cls, send_data_to_plugin (payload, &rm->sender, 1 + distance); - return GNUNET_OK; + return; } if ( (NULL == GNUNET_CONTAINER_multipeermap_get (direct_neighbors, &rm->sender)) && @@ -1836,13 +1704,6 @@ handle_dv_route_message (void *cls, "Learning sender %s at distance %u from forwarding!\n", GNUNET_i2s (&rm->sender), 1 + distance); - neighbor = GNUNET_CONTAINER_multipeermap_get (direct_neighbors, - peer); - if (NULL == neighbor) - { - GNUNET_break (0); - return GNUNET_OK; - } target = GNUNET_new (struct Target); target->peer = rm->sender; target->distance = htonl (distance); @@ -1856,7 +1717,7 @@ handle_dv_route_message (void *cls, { GNUNET_break_op (0); GNUNET_free (target); - return GNUNET_OK; + return; } add_new_route (target, neighbor); } @@ -1865,9 +1726,9 @@ handle_dv_route_message (void *cls, &rm->target); if (NULL == route) { - neighbor = GNUNET_CONTAINER_multipeermap_get (direct_neighbors, + nneighbor = GNUNET_CONTAINER_multipeermap_get (direct_neighbors, &rm->target); - if (NULL == neighbor) + if (NULL == nneighbor) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No route to %s, not routing %u bytes!\n", @@ -1876,57 +1737,69 @@ handle_dv_route_message (void *cls, GNUNET_STATISTICS_update (stats, "# messages discarded (no route)", 1, GNUNET_NO); - return GNUNET_OK; + return; } } else { - neighbor = route->next_hop; + nneighbor = route->next_hop; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Forwarding message to %s\n", - GNUNET_i2s (&neighbor->peer)); - forward_payload (neighbor, + GNUNET_i2s (&nneighbor->peer)); + forward_payload (nneighbor, distance + 1, - 0, &rm->sender, &rm->target, payload); - return GNUNET_OK; } /** - * Service server's handler for message send requests (which come - * bubbling up to us through the DV plugin). + * Check that @a msg is well-formed * - * @param cls closure - * @param client identification of the client + * @param cls identification of the client * @param message the actual message + * @return #GNUNET_OK if @a msg is well-formed */ -static void -handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +static int +check_dv_send_message (void *cls, + const struct GNUNET_DV_SendMessage *msg) { - struct Route *route; - const struct GNUNET_DV_SendMessage *msg; const struct GNUNET_MessageHeader *payload; - if (ntohs (message->size) < sizeof (struct GNUNET_DV_SendMessage) + sizeof (struct GNUNET_MessageHeader)) + if (ntohs (msg->header.size) < sizeof (struct GNUNET_DV_SendMessage) + + sizeof (struct GNUNET_MessageHeader)) { GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; + return GNUNET_SYSERR; } - msg = (const struct GNUNET_DV_SendMessage *) message; - GNUNET_break (0 != ntohl (msg->uid)); payload = (const struct GNUNET_MessageHeader *) &msg[1]; - if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size)) + if (ntohs (msg->header.size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size)) { GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; + return GNUNET_SYSERR; } + return GNUNET_OK; +} + + +/** + * Service server's handler for message send requests (which come + * bubbling up to us through the DV plugin). + * + * @param cls identification of the client + * @param message the actual message + */ +static void +handle_dv_send_message (void *cls, + const struct GNUNET_DV_SendMessage *msg) +{ + struct GNUNET_SERVICE_Client *client = cls; + struct Route *route; + const struct GNUNET_MessageHeader *payload; + + payload = (const struct GNUNET_MessageHeader *) &msg[1]; route = GNUNET_CONTAINER_multipeermap_get (all_routes, &msg->target); if (NULL == route) @@ -1939,8 +1812,7 @@ handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_STATISTICS_update (stats, "# local messages discarded (no route)", 1, GNUNET_NO); - send_ack_to_plugin (&msg->target, ntohl (msg->uid), GNUNET_YES); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVICE_client_continue (client); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1951,11 +1823,10 @@ handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client, forward_payload (route->next_hop, 0 /* first hop, distance is zero */, - htonl (msg->uid), &my_identity, &msg->target, payload); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVICE_client_continue (client); } @@ -1967,16 +1838,6 @@ handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client, static void cleanup_neighbor (struct DirectNeighbor *neighbor) { - struct PendingMessage *pending; - - while (NULL != (pending = neighbor->pm_head)) - { - neighbor->pm_queue_size--; - GNUNET_CONTAINER_DLL_remove (neighbor->pm_head, - neighbor->pm_tail, - pending); - GNUNET_free (pending); - } handle_direct_disconnect (neighbor); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (direct_neighbors, @@ -1991,36 +1852,31 @@ cleanup_neighbor (struct DirectNeighbor *neighbor) * * @param cls closure * @param peer peer identity this notification is about + * @param internal_cls the corresponding `struct DirectNeighbor` */ static void -handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) +handle_core_disconnect (void *cls, + const struct GNUNET_PeerIdentity *peer, + void *internal_cls) { - struct DirectNeighbor *neighbor; + struct DirectNeighbor *neighbor = internal_cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received core peer disconnect message for peer `%s'!\n", GNUNET_i2s (peer)); /* Check for disconnect from self message */ - if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) - return; - neighbor = - GNUNET_CONTAINER_multipeermap_get (direct_neighbors, peer); if (NULL == neighbor) - { - GNUNET_break (0); return; - } GNUNET_break (GNUNET_YES == neighbor->connected); neighbor->connected = GNUNET_NO; if (DIRECT_NEIGHBOR_COST == neighbor->distance) { - GNUNET_STATISTICS_update (stats, "# peers connected (1-hop)", - -1, GNUNET_NO); + -1, + GNUNET_NO); } cleanup_neighbor (neighbor); - if (GNUNET_YES == in_shutdown) return; schedule_refresh_routes (); @@ -2089,14 +1945,16 @@ shutdown_task (void *cls) GNUNET_ATS_performance_done (ats); ats = NULL; GNUNET_CONTAINER_multipeermap_iterate (direct_neighbors, - &free_direct_neighbors, NULL); + &free_direct_neighbors, + NULL); GNUNET_CONTAINER_multipeermap_iterate (all_routes, - &free_route, NULL); + &free_route, + NULL); GNUNET_CONTAINER_multipeermap_destroy (direct_neighbors); GNUNET_CONTAINER_multipeermap_destroy (all_routes); GNUNET_STATISTICS_destroy (stats, GNUNET_NO); stats = NULL; - GNUNET_SERVER_notification_context_destroy (nc); + GNUNET_notification_context_destroy (nc); nc = NULL; for (i=0;itarget.distance); - cm.peer = route->target.peer; - - GNUNET_SERVER_notification_context_unicast (nc, - client, - &cm.header, - GNUNET_NO); + struct GNUNET_MQ_Envelope *env; + struct GNUNET_DV_ConnectMessage *cm; + + env = GNUNET_MQ_msg (cm, + GNUNET_MESSAGE_TYPE_DV_CONNECT); + cm->distance = htonl (route->target.distance); + cm->peer = route->target.peer; + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); return GNUNET_OK; } @@ -2152,11 +2007,14 @@ notify_client_about_route (void *cls, * @param message the actual message */ static void -handle_start (void *cls, struct GNUNET_SERVER_Client *client, +handle_start (void *cls, const struct GNUNET_MessageHeader *message) { - GNUNET_SERVER_notification_context_add (nc, client); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + struct GNUNET_SERVICE_Client *client = cls; + + GNUNET_notification_context_add (nc, + GNUNET_SERVICE_client_get_mq (client)); + GNUNET_SERVICE_client_continue (client); GNUNET_CONTAINER_multipeermap_iterate (all_routes, ¬ify_client_about_route, client); @@ -2184,69 +2042,105 @@ core_init (void *cls, * Process dv requests. * * @param cls closure - * @param server the initialized server * @param c configuration to use + * @param service the initialized service */ static void -run (void *cls, struct GNUNET_SERVER_Handle *server, - const struct GNUNET_CONFIGURATION_Handle *c) +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *c, + struct GNUNET_SERVICE_Handle *service) { - static struct GNUNET_CORE_MessageHandler core_handlers[] = { - {&handle_dv_route_message, GNUNET_MESSAGE_TYPE_DV_ROUTE, 0}, - {NULL, 0, 0} - }; - static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { - {&handle_start, NULL, - GNUNET_MESSAGE_TYPE_DV_START, - sizeof (struct GNUNET_MessageHeader) }, - { &handle_dv_send_message, NULL, - GNUNET_MESSAGE_TYPE_DV_SEND, - 0}, - {NULL, NULL, 0, 0} + struct GNUNET_MQ_MessageHandler core_handlers[] = { + GNUNET_MQ_hd_var_size (dv_route_message, + GNUNET_MESSAGE_TYPE_DV_ROUTE, + struct RouteMessage, + NULL), + GNUNET_MQ_handler_end () }; in_shutdown = GNUNET_NO; cfg = c; - direct_neighbors = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); - all_routes = GNUNET_CONTAINER_multipeermap_create (65536, GNUNET_NO); - core_api = GNUNET_CORE_connect (cfg, NULL, + direct_neighbors = GNUNET_CONTAINER_multipeermap_create (128, + GNUNET_NO); + all_routes = GNUNET_CONTAINER_multipeermap_create (65536, + GNUNET_NO); + core_api = GNUNET_CORE_connect (cfg, + NULL, &core_init, &handle_core_connect, &handle_core_disconnect, - NULL, GNUNET_NO, - NULL, GNUNET_NO, core_handlers); if (NULL == core_api) return; - ats = GNUNET_ATS_performance_init (cfg, &handle_ats_update, NULL); + ats = GNUNET_ATS_performance_init (cfg, + &handle_ats_update, + NULL); if (NULL == ats) { GNUNET_CORE_disconnect (core_api); core_api = NULL; return; } - nc = GNUNET_SERVER_notification_context_create (server, - MAX_QUEUE_SIZE_PLUGIN); - stats = GNUNET_STATISTICS_create ("dv", cfg); - GNUNET_SERVER_add_handlers (server, plugin_handlers); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, - &shutdown_task, NULL); + nc = GNUNET_notification_context_create (MAX_QUEUE_SIZE_PLUGIN); + stats = GNUNET_STATISTICS_create ("dv", + cfg); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); } /** - * The main function for the dv service. + * Callback called when a client connects to the service. * - * @param argc number of arguments from the command line - * @param argv command line arguments - * @return 0 ok, 1 on error + * @param cls closure for the service + * @param c the new client that connected to the service + * @param mq the message queue used to send messages to the client + * @return @a c + */ +static void * +client_connect_cb (void *cls, + struct GNUNET_SERVICE_Client *c, + struct GNUNET_MQ_Handle *mq) +{ + return c; +} + + +/** + * Callback called when a client disconnected from the service + * + * @param cls closure for the service + * @param c the client that disconnected + * @param internal_cls should be equal to @a c */ -int -main (int argc, char *const *argv) +static void +client_disconnect_cb (void *cls, + struct GNUNET_SERVICE_Client *c, + void *internal_cls) { - return (GNUNET_OK == - GNUNET_SERVICE_run (argc, argv, "dv", GNUNET_SERVICE_OPTION_NONE, - &run, NULL)) ? 0 : 1; + GNUNET_assert (c == internal_cls); } + +/** + * Define "main" method using service macro. + */ +GNUNET_SERVICE_MAIN +("dv", + GNUNET_SERVICE_OPTION_NONE, + &run, + &client_connect_cb, + &client_disconnect_cb, + NULL, + GNUNET_MQ_hd_fixed_size (start, + GNUNET_MESSAGE_TYPE_DV_START, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (dv_send_message, + GNUNET_MESSAGE_TYPE_DV_SEND, + struct GNUNET_DV_SendMessage, + NULL), + GNUNET_MQ_handler_end ()); + + /* end of gnunet-service-dv.c */