From: Christian Grothoff Date: Fri, 23 Sep 2016 13:14:25 +0000 (+0000) Subject: converting datastore to new MQ API X-Git-Tag: initial-import-from-subversion-38251~227 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=3e872864741aaa83be947b614eafb4fef2c74253;p=oweals%2Fgnunet.git converting datastore to new MQ API --- diff --git a/po/POTFILES.in b/po/POTFILES.in index c6d3571e7..6c39471bb 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -11,7 +11,6 @@ src/ats/gnunet-ats-solver-eval.c src/ats/gnunet-service-ats_addresses.c src/ats/gnunet-service-ats.c src/ats/gnunet-service-ats_connectivity.c -src/ats/gnunet-service-ats_feedback.c src/ats/gnunet-service-ats_normalization.c src/ats/gnunet-service-ats_performance.c src/ats/gnunet-service-ats_plugins.c @@ -70,7 +69,6 @@ src/core/core_api_monitor_peers.c src/core/core_api_mq.c src/core/gnunet-core.c src/core/gnunet-service-core.c -src/core/gnunet-service-core_clients.c src/core/gnunet-service-core_kx.c src/core/gnunet-service-core_sessions.c src/core/gnunet-service-core_typemap.c @@ -380,9 +378,7 @@ src/transport/gnunet-helper-transport-bluetooth.c src/transport/gnunet-helper-transport-wlan.c src/transport/gnunet-helper-transport-wlan-dummy.c src/transport/gnunet-service-transport_ats.c -src/transport/gnunet-service-transport_blacklist.c src/transport/gnunet-service-transport.c -src/transport/gnunet-service-transport_clients.c src/transport/gnunet-service-transport_hello.c src/transport/gnunet-service-transport_manipulation.c src/transport/gnunet-service-transport_neighbours.c @@ -421,7 +417,6 @@ src/tun/tun.c src/util/bandwidth.c src/util/bio.c src/util/client.c -src/util/client_manager.c src/util/common_allocation.c src/util/common_endian.c src/util/common_logging.c @@ -462,6 +457,7 @@ src/util/helper.c src/util/load.c src/util/mq.c src/util/mst.c +src/util/nc.c src/util/network.c src/util/op.c src/util/os_installation.c diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c index 5853d447d..e632e33e0 100644 --- a/src/datastore/gnunet-service-datastore.c +++ b/src/datastore/gnunet-service-datastore.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2004-2014 GNUnet e.V. + Copyright (C) 2004-2014, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -115,7 +115,7 @@ struct ReservationList /** * Client that made the reservation. */ - struct GNUNET_SERVER_Client *client; + struct GNUNET_SERVICE_Client *client; /** * Number of bytes (still) reserved. @@ -246,50 +246,6 @@ sync_stats () } -/** - * Context for transmitting replies to clients. - */ -struct TransmitCallbackContext -{ - - /** - * We keep these in a doubly-linked list (for cleanup). - */ - struct TransmitCallbackContext *next; - - /** - * We keep these in a doubly-linked list (for cleanup). - */ - struct TransmitCallbackContext *prev; - - /** - * The message that we're asked to transmit. - */ - struct GNUNET_MessageHeader *msg; - - /** - * Handle for the transmission request. - */ - struct GNUNET_SERVER_TransmitHandle *th; - - /** - * Client that we are transmitting to. - */ - struct GNUNET_SERVER_Client *client; - -}; - - -/** - * Head of the doubly-linked list (for cleanup). - */ -static struct TransmitCallbackContext *tcc_head; - -/** - * Tail of the doubly-linked list (for cleanup). - */ -static struct TransmitCallbackContext *tcc_tail; - /** * Have we already cleaned up the TCCs and are hence no longer * willing (or able) to transmit anything to anyone? @@ -304,7 +260,7 @@ static struct GNUNET_STATISTICS_GetHandle *stat_get; /** * Handle to our server. */ -static struct GNUNET_SERVER_Handle *server; +static struct GNUNET_SERVICE_Handle *service; /** * Task that is used to remove expired entries from @@ -495,86 +451,6 @@ manage_space (unsigned long long need) } -/** - * Function called to notify a client about the socket - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_callback (void *cls, size_t size, void *buf) -{ - struct TransmitCallbackContext *tcc = cls; - size_t msize; - - tcc->th = NULL; - GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc); - msize = ntohs (tcc->msg->size); - if (size == 0) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Transmission to client failed!\n")); - GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR); - GNUNET_SERVER_client_drop (tcc->client); - GNUNET_free (tcc->msg); - GNUNET_free (tcc); - return 0; - } - GNUNET_assert (size >= msize); - GNUNET_memcpy (buf, tcc->msg, msize); - GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK); - GNUNET_SERVER_client_drop (tcc->client); - GNUNET_free (tcc->msg); - GNUNET_free (tcc); - return msize; -} - - -/** - * Transmit the given message to the client. - * - * @param client target of the message - * @param msg message to transmit, will be freed! - */ -static void -transmit (struct GNUNET_SERVER_Client *client, - struct GNUNET_MessageHeader *msg) -{ - struct TransmitCallbackContext *tcc; - - if (GNUNET_YES == cleaning_done) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Shutdown in progress, aborting transmission.\n")); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - GNUNET_free (msg); - return; - } - tcc = GNUNET_new (struct TransmitCallbackContext); - tcc->msg = msg; - tcc->client = client; - if (NULL == - (tcc->th = - GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_callback, tcc))) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - GNUNET_free (msg); - GNUNET_free (tcc); - return; - } - GNUNET_SERVER_client_keep (client); - GNUNET_CONTAINER_DLL_insert (tcc_head, tcc_tail, tcc); -} - - /** * Transmit a status code to the client. * @@ -583,8 +459,11 @@ transmit (struct GNUNET_SERVER_Client *client, * @param msg optional error message (can be NULL) */ static void -transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg) +transmit_status (struct GNUNET_SERVICE_Client *client, + int code, + const char *msg) { + struct GNUNET_MQ_Envelope *env; struct StatusMessage *sm; size_t slen; @@ -592,14 +471,16 @@ transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg) "Transmitting `%s' message with value %d and message `%s'\n", "STATUS", code, msg != NULL ? msg : "(none)"); slen = (msg == NULL) ? 0 : strlen (msg) + 1; - sm = GNUNET_malloc (sizeof (struct StatusMessage) + slen); - sm->header.size = htons (sizeof (struct StatusMessage) + slen); - sm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS); + env = GNUNET_MQ_msg_extra (sm, + slen, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS); sm->status = htonl (code); sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration); - if (slen > 0) - GNUNET_memcpy (&sm[1], msg, slen); - transmit (client, &sm->header); + GNUNET_memcpy (&sm[1], + msg, + slen); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); } @@ -607,7 +488,7 @@ transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg) * Function that will transmit the given datastore entry * to the client. * - * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client). + * @param cls closure, pointer to the client (of type `struct GNUNET_SERVICE_Client`). * @param key key for the content * @param size number of bytes in data * @param data content stored @@ -631,27 +512,27 @@ transmit_item (void *cls, struct GNUNET_TIME_Absolute expiration, uint64_t uid) { - struct GNUNET_SERVER_Client *client = cls; + struct GNUNET_SERVICE_Client *client = cls; + struct GNUNET_MQ_Envelope *env; struct GNUNET_MessageHeader *end; struct DataMessage *dm; - if (key == NULL) + if (NULL == key) { /* transmit 'DATA_END' */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' message\n", - "DATA_END"); - end = GNUNET_new (struct GNUNET_MessageHeader); - end->size = htons (sizeof (struct GNUNET_MessageHeader)); - end->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END); - transmit (client, end); - GNUNET_SERVER_client_drop (client); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmitting DATA_END message\n"); + env = GNUNET_MQ_msg (end, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); return GNUNET_OK; } GNUNET_assert (sizeof (struct DataMessage) + size < GNUNET_SERVER_MAX_MESSAGE_SIZE); - dm = GNUNET_malloc (sizeof (struct DataMessage) + size); - dm->header.size = htons (sizeof (struct DataMessage) + size); - dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA); + env = GNUNET_MQ_msg_extra (dm, + size, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA); dm->rid = htonl (0); dm->size = htonl (size); dm->type = htonl (type); @@ -662,10 +543,13 @@ transmit_item (void *cls, dm->expiration = GNUNET_TIME_absolute_hton (expiration); dm->uid = GNUNET_htonll (uid); dm->key = *key; - GNUNET_memcpy (&dm[1], data, size); + GNUNET_memcpy (&dm[1], + data, + size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting `%s' message for `%s' of type %u with expiration %s (in: %s)\n", - "DATA", GNUNET_h2s (key), type, + "Transmitting DATA message for `%s' of type %u with expiration %s (in: %s)\n", + GNUNET_h2s (key), + type, GNUNET_STRINGS_absolute_time_to_string (expiration), GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration), GNUNET_YES)); @@ -673,8 +557,8 @@ transmit_item (void *cls, gettext_noop ("# results found"), 1, GNUNET_NO); - transmit (client, &dm->header); - GNUNET_SERVER_client_drop (client); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); return GNUNET_OK; } @@ -682,20 +566,18 @@ transmit_item (void *cls, /** * Handle RESERVE-message. * - * @param cls closure - * @param client identification of the client + * @param cls identification of the client * @param message the actual message */ static void -handle_reserve (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +handle_reserve (void *cls, + const struct ReserveMessage *msg) { /** * Static counter to produce reservation identifiers. */ static int reservation_gen; - - const struct ReserveMessage *msg = (const struct ReserveMessage *) message; + struct GNUNET_SERVICE_Client *client = cls; struct ReservationList *e; unsigned long long used; unsigned long long req; @@ -707,16 +589,15 @@ handle_reserve (void *cls, struct GNUNET_SERVER_Client *client, amount = GNUNET_ntohll (msg->amount); entries = ntohl (msg->entries); used = payload + reserved; - req = - amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries; + req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries; if (used + req > quota) { if (quota < used) used = quota; /* cheat a bit for error message (to avoid negative numbers) */ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _ - ("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"), - quota - used, "RESERVE", req); + _("Insufficient space (%llu bytes are available) to satisfy RESERVE request for %llu bytes\n"), + quota - used, + req); if (cache_size < req) { /* TODO: document this in the FAQ; essentially, if this @@ -725,19 +606,22 @@ handle_reserve (void *cls, struct GNUNET_SERVER_Client *client, * larger than 1/8th of the overall available space, and * we only reserve 1/8th for "fresh" insertions */ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _ - ("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"), - req, cache_size); - transmit_status (client, 0, + _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"), + req, + cache_size); + transmit_status (client, + 0, gettext_noop ("Insufficient space to satisfy request and " "requested amount is larger than cache size")); } else { - transmit_status (client, 0, + transmit_status (client, + 0, gettext_noop ("Insufficient space to satisfy request")); } + GNUNET_SERVICE_client_continue (client); return; } reserved += req; @@ -754,24 +638,24 @@ handle_reserve (void *cls, struct GNUNET_SERVER_Client *client, e->rid = ++reservation_gen; if (reservation_gen < 0) reservation_gen = 0; /* wrap around */ - transmit_status (client, e->rid, NULL); + transmit_status (client, + e->rid, + NULL); + GNUNET_SERVICE_client_continue (client); } /** * Handle RELEASE_RESERVE-message. * - * @param cls closure - * @param client identification of the client + * @param cls identification of the client * @param message the actual message */ static void handle_release_reserve (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + const struct ReleaseReserveMessage *msg) { - const struct ReleaseReserveMessage *msg = - (const struct ReleaseReserveMessage *) message; + struct GNUNET_SERVICE_Client *client = cls; struct ReservationList *pos; struct ReservationList *prev; struct ReservationList *next; @@ -804,43 +688,42 @@ handle_release_reserve (void *cls, "Returning %llu remaining reserved bytes to storage pool\n", rem); GNUNET_free (pos); - transmit_status (client, GNUNET_OK, NULL); + transmit_status (client, + GNUNET_OK, + NULL); + GNUNET_SERVICE_client_continue (client); return; } prev = pos; } GNUNET_break (0); - transmit_status (client, GNUNET_SYSERR, + transmit_status (client, + GNUNET_SYSERR, gettext_noop ("Could not find matching reservation")); + GNUNET_SERVICE_client_continue (client); } /** * Check that the given message is a valid data message. * - * @return NULL if the message is not well-formed, otherwise the message + * @param dm message to check + * @return #GNUNET_SYSERR is not well-formed, otherwise #GNUNET_OK */ -static const struct DataMessage * -check_data (const struct GNUNET_MessageHeader *message) +static int +check_data (const struct DataMessage *dm) { uint16_t size; uint32_t dsize; - const struct DataMessage *dm; - size = ntohs (message->size); - if (size < sizeof (struct DataMessage)) - { - GNUNET_break (0); - return NULL; - } - dm = (const struct DataMessage *) message; + size = ntohs (dm->header.size); dsize = ntohl (dm->size); if (size != dsize + sizeof (struct DataMessage)) { GNUNET_break (0); - return NULL; + return GNUNET_SYSERR; } - return dm; + return GNUNET_OK; } @@ -853,7 +736,7 @@ struct PutContext /** * Client to notify on completion. */ - struct GNUNET_SERVER_Client *client; + struct GNUNET_SERVICE_Client *client; #if ! HAVE_UNALIGNED_64_ACCESS void *reserved; @@ -887,13 +770,16 @@ put_continuation (void *cls, gettext_noop ("# bytes stored"), size, GNUNET_YES); - GNUNET_CONTAINER_bloomfilter_add (filter, key); + GNUNET_CONTAINER_bloomfilter_add (filter, + key); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Successfully stored %u bytes under key `%s'\n", - size, GNUNET_h2s (key)); + size, + GNUNET_h2s (key)); } - transmit_status (pc->client, status, msg); - GNUNET_SERVER_client_drop (pc->client); + transmit_status (pc->client, + status, + msg); GNUNET_free (pc); if (quota - reserved - cache_size < payload) { @@ -918,11 +804,17 @@ execute_put (struct PutContext *pc) const struct DataMessage *dm; dm = (const struct DataMessage *) &pc[1]; - plugin->api->put (plugin->api->cls, &dm->key, ntohl (dm->size), &dm[1], - ntohl (dm->type), ntohl (dm->priority), - ntohl (dm->anonymity), ntohl (dm->replication), + plugin->api->put (plugin->api->cls, + &dm->key, + ntohl (dm->size), + &dm[1], + ntohl (dm->type), + ntohl (dm->priority), + ntohl (dm->anonymity), + ntohl (dm->replication), GNUNET_TIME_absolute_ntoh (dm->expiration), - &put_continuation, pc); + &put_continuation, + pc); } @@ -937,10 +829,11 @@ check_present_continuation (void *cls, int status, const char *msg) { - struct GNUNET_SERVER_Client *client = cls; + struct GNUNET_SERVICE_Client *client = cls; - transmit_status (client, GNUNET_NO, NULL); - GNUNET_SERVER_client_drop (client); + transmit_status (client, + GNUNET_NO, + NULL); } @@ -984,7 +877,9 @@ check_present (void *cls, if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) || (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) || ( (size == ntohl (dm->size)) && - (0 == memcmp (&dm[1], data, size)) ) ) + (0 == memcmp (&dm[1], + data, + size)) ) ) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Result already present in datastore\n"); @@ -1000,8 +895,9 @@ check_present (void *cls, pc->client); else { - transmit_status (pc->client, GNUNET_NO, NULL); - GNUNET_SERVER_client_drop (pc->client); + transmit_status (pc->client, + GNUNET_NO, + NULL); } GNUNET_free (pc); } @@ -1013,31 +909,43 @@ check_present (void *cls, } +/** + * Verify PUT-message. + * + * @param cls identification of the client + * @param message the actual message + * @return #GNUNET_OK if @a dm is well-formed + */ +static int +check_put (void *cls, + const struct DataMessage *dm) +{ + if (GNUNET_OK != check_data (dm)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + /** * Handle PUT-message. * - * @param cls closure - * @param client identification of the client + * @param cls identification of the client * @param message the actual message */ static void handle_put (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + const struct DataMessage *dm) { - const struct DataMessage *dm = check_data (message); + struct GNUNET_SERVICE_Client *client = cls; int rid; struct ReservationList *pos; struct PutContext *pc; struct GNUNET_HashCode vhash; uint32_t size; - if ((dm == NULL) || (ntohl (dm->type) == 0)) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing PUT request for `%s' of type %u\n", GNUNET_h2s (&dm->key), @@ -1066,11 +974,15 @@ handle_put (void *cls, pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage)); pc->client = client; - GNUNET_SERVER_client_keep (client); - GNUNET_memcpy (&pc[1], dm, size + sizeof (struct DataMessage)); - if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key)) + GNUNET_memcpy (&pc[1], + dm, + size + sizeof (struct DataMessage)); + if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, + &dm->key)) { - GNUNET_CRYPTO_hash (&dm[1], size, &vhash); + GNUNET_CRYPTO_hash (&dm[1], + size, + &vhash); plugin->api->get_key (plugin->api->cls, 0, &dm->key, @@ -1078,27 +990,26 @@ handle_put (void *cls, ntohl (dm->type), &check_present, pc); + GNUNET_SERVICE_client_continue (client); return; } execute_put (pc); + GNUNET_SERVICE_client_continue (client); } /** * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message. * - * @param cls closure - * @param client identification of the client - * @param message the actual message + * @param cls identification of the client + * @param msg the actual message */ static void handle_get (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + const struct GetMessage *msg) { - const struct GetMessage *msg; + struct GNUNET_SERVICE_Client *client = cls; - msg = (const struct GetMessage *) message; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing GET request of type %u\n", ntohl (msg->type)); @@ -1106,7 +1017,6 @@ handle_get (void *cls, gettext_noop ("# GET requests received"), 1, GNUNET_NO); - GNUNET_SERVER_client_keep (client); plugin->api->get_key (plugin->api->cls, GNUNET_ntohll (msg->offset), NULL, @@ -1114,23 +1024,22 @@ handle_get (void *cls, ntohl (msg->type), &transmit_item, client); + GNUNET_SERVICE_client_continue (client); } + /** * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message. * * @param cls closure - * @param client identification of the client - * @param message the actual message + * @param msg the actual message */ static void handle_get_key (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + const struct GetKeyMessage *msg) { - const struct GetKeyMessage *msg; + struct GNUNET_SERVICE_Client *client = cls; - msg = (const struct GetKeyMessage *) message; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing GET request for `%s' of type %u\n", GNUNET_h2s (&msg->key), @@ -1139,7 +1048,6 @@ handle_get_key (void *cls, gettext_noop ("# GET KEY requests received"), 1, GNUNET_NO); - GNUNET_SERVER_client_keep (client); if (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)) @@ -1157,6 +1065,7 @@ handle_get_key (void *cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_SERVICE_client_continue (client); return; } plugin->api->get_key (plugin->api->cls, @@ -1166,6 +1075,7 @@ handle_get_key (void *cls, ntohl (msg->type), &transmit_item, client); + GNUNET_SERVICE_client_continue (client); } @@ -1181,104 +1091,100 @@ update_continuation (void *cls, int status, const char *msg) { - struct GNUNET_SERVER_Client *client = cls; + struct GNUNET_SERVICE_Client *client = cls; - transmit_status (client, status, msg); - GNUNET_SERVER_client_drop (client); + transmit_status (client, + status, + msg); } /** * Handle UPDATE-message. * - * @param cls closure - * @param client identification of the client + * @param cls client identification of the client * @param message the actual message */ static void handle_update (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + const struct UpdateMessage *msg) { - const struct UpdateMessage *msg; + struct GNUNET_SERVICE_Client *client = cls; GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"), 1, GNUNET_NO); - msg = (const struct UpdateMessage *) message; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing UPDATE request for %llu\n", (unsigned long long) GNUNET_ntohll (msg->uid)); - GNUNET_SERVER_client_keep (client); plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid), (int32_t) ntohl (msg->priority), GNUNET_TIME_absolute_ntoh (msg->expiration), - &update_continuation, client); + &update_continuation, + client); + GNUNET_SERVICE_client_continue (client); } /** * Handle GET_REPLICATION-message. * - * @param cls closure - * @param client identification of the client + * @param cls identification of the client * @param message the actual message */ static void handle_get_replication (void *cls, - struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { + struct GNUNET_SERVICE_Client *client = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Processing `%s' request\n", - "GET_REPLICATION"); + "Processing GET_REPLICATION request\n"); GNUNET_STATISTICS_update (stats, gettext_noop ("# GET REPLICATION requests received"), 1, GNUNET_NO); - GNUNET_SERVER_client_keep (client); plugin->api->get_replication (plugin->api->cls, - &transmit_item, client); + &transmit_item, + client); + GNUNET_SERVICE_client_continue (client); } /** * Handle GET_ZERO_ANONYMITY-message. * - * @param cls closure - * @param client identification of the client + * @param cls client identification of the client * @param message the actual message */ static void handle_get_zero_anonymity (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + const struct GetZeroAnonymityMessage *msg) { - const struct GetZeroAnonymityMessage *msg = - (const struct GetZeroAnonymityMessage *) message; + struct GNUNET_SERVICE_Client *client = cls; enum GNUNET_BLOCK_Type type; type = (enum GNUNET_BLOCK_Type) ntohl (msg->type); if (type == GNUNET_BLOCK_TYPE_ANY) { GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + GNUNET_SERVICE_client_drop (client); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Processing `%s' request\n", - "GET_ZERO_ANONYMITY"); + "Processing GET_ZERO_ANONYMITY request\n"); GNUNET_STATISTICS_update (stats, gettext_noop ("# GET ZERO ANONYMITY requests received"), 1, GNUNET_NO); - GNUNET_SERVER_client_keep (client); plugin->api->get_zero_anonymity (plugin->api->cls, GNUNET_ntohll (msg->offset), type, - &transmit_item, client); + &transmit_item, + client); + GNUNET_SERVICE_client_continue (client); } @@ -1309,7 +1215,7 @@ remove_callback (void *cls, struct GNUNET_TIME_Absolute expiration, uint64_t uid) { - struct GNUNET_SERVER_Client *client = cls; + struct GNUNET_SERVICE_Client *client = cls; if (NULL == key) { @@ -1318,7 +1224,6 @@ remove_callback (void *cls, transmit_status (client, GNUNET_NO, _("Content not found")); - GNUNET_SERVER_client_drop (client); return GNUNET_OK; /* last item */ } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1330,13 +1235,35 @@ remove_callback (void *cls, gettext_noop ("# bytes removed (explicit request)"), size, GNUNET_YES); - GNUNET_CONTAINER_bloomfilter_remove (filter, key); - transmit_status (client, GNUNET_OK, NULL); - GNUNET_SERVER_client_drop (client); + GNUNET_CONTAINER_bloomfilter_remove (filter, + key); + transmit_status (client, + GNUNET_OK, + NULL); return GNUNET_NO; } +/** + * Verify REMOVE-message. + * + * @param cls identification of the client + * @param message the actual message + * @return #GNUNET_OK if @a dm is well-formed + */ +static int +check_remove (void *cls, + const struct DataMessage *dm) +{ + if (GNUNET_OK != check_data (dm)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + /** * Handle REMOVE-message. * @@ -1346,22 +1273,14 @@ remove_callback (void *cls, */ static void handle_remove (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + const struct DataMessage *dm) { - const struct DataMessage *dm = check_data (message); + struct GNUNET_SERVICE_Client *client = cls; struct GNUNET_HashCode vhash; - if (NULL == dm) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } GNUNET_STATISTICS_update (stats, gettext_noop ("# REMOVE requests received"), 1, GNUNET_NO); - GNUNET_SERVER_client_keep (client); GNUNET_CRYPTO_hash (&dm[1], ntohl (dm->size), &vhash); @@ -1374,26 +1293,28 @@ handle_remove (void *cls, &dm->key, &vhash, (enum GNUNET_BLOCK_Type) ntohl (dm->type), - &remove_callback, client); + &remove_callback, + client); + GNUNET_SERVICE_client_continue (client); } /** * Handle DROP-message. * - * @param cls closure - * @param client identification of the client + * @param cls identification of the client * @param message the actual message */ static void handle_drop (void *cls, - struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { + struct GNUNET_SERVICE_Client *client = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing DROP request\n"); do_drop = GNUNET_YES; - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVICE_client_continue (client); } @@ -1479,7 +1400,8 @@ load_plugin () plugin_name); ret->short_name = GNUNET_strdup (plugin_name); ret->lib_name = libname; - ret->api = GNUNET_PLUGIN_load (libname, &ret->env); + ret->api = GNUNET_PLUGIN_load (libname, + &ret->env); if (NULL == ret->api) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -1512,30 +1434,18 @@ unload_plugin (struct DatastorePlugin *plug) } -static const struct GNUNET_SERVER_MessageHandler handlers[] = { - {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, - sizeof (struct ReserveMessage)}, - {&handle_release_reserve, NULL, - GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, - sizeof (struct ReleaseReserveMessage)}, - {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0}, - {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, - sizeof (struct UpdateMessage)}, - {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, - sizeof (struct GetMessage) }, - {&handle_get_key, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY, - sizeof (struct GetKeyMessage) }, - {&handle_get_replication, NULL, - GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, - sizeof (struct GNUNET_MessageHeader)}, - {&handle_get_zero_anonymity, NULL, - GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, - sizeof (struct GetZeroAnonymityMessage)}, - {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0}, - {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, - sizeof (struct GNUNET_MessageHeader)}, - {NULL, NULL, 0, 0} -}; +/** + * Initialization complete, start operating the service. + */ +static void +begin_service () +{ + GNUNET_SERVICE_resume (service); + expired_kill_task + = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE, + &delete_expired, + NULL); +} /** @@ -1556,17 +1466,13 @@ add_key_to_bloomfilter (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Bloomfilter construction complete.\n")); - GNUNET_SERVER_add_handlers (server, handlers); - GNUNET_SERVER_resume (server); - expired_kill_task - = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE, - &delete_expired, - NULL); + begin_service (); return; } while (0 < count--) - GNUNET_CONTAINER_bloomfilter_add (bf, key); + GNUNET_CONTAINER_bloomfilter_add (bf, + key); } @@ -1594,11 +1500,13 @@ process_stat_done (void *cls, filter = NULL; if (NULL != stats) { - GNUNET_STATISTICS_destroy (stats, GNUNET_YES); + GNUNET_STATISTICS_destroy (stats, + GNUNET_YES); stats = NULL; } return; } + if (GNUNET_NO == stats_worked) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1608,8 +1516,8 @@ process_stat_done (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("New payload: %lld\n"), (long long) payload); - } + if (GNUNET_YES == refresh_bf) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -1622,19 +1530,12 @@ process_stat_done (void *cls, return; } else + { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Plugin does not support get_keys function. Please fix!\n")); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Bloomfilter construction complete.\n")); + } } - - GNUNET_SERVER_add_handlers (server, handlers); - GNUNET_SERVER_resume (server); - expired_kill_task - = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE, - &delete_expired, - NULL); + begin_service (); } @@ -1648,7 +1549,8 @@ stat_timeout (void *cls) { stat_timeout_task = NULL; GNUNET_STATISTICS_get_cancel (stat_get); - process_stat_done (NULL, GNUNET_NO); + process_stat_done (NULL, + GNUNET_NO); } @@ -1658,20 +1560,7 @@ stat_timeout (void *cls) static void cleaning_task (void *cls) { - struct TransmitCallbackContext *tcc; - cleaning_done = GNUNET_YES; - while (NULL != (tcc = tcc_head)) - { - GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc); - if (tcc->th != NULL) - { - GNUNET_SERVER_notify_transmit_ready_cancel (tcc->th); - GNUNET_SERVER_client_drop (tcc->client); - } - GNUNET_free (tcc->msg); - GNUNET_free (tcc); - } if (NULL != expired_kill_task) { GNUNET_SCHEDULER_cancel (expired_kill_task); @@ -1721,23 +1610,40 @@ cleaning_task (void *cls) /** - * Function that removes all active reservations made - * by the given client and releases the space for other - * requests. + * Add a client to our list of active clients. + * + * @param cls NULL + * @param client client to add + * @param mq message queue for @a client + * @return @a client + */ +static void * +client_connect_cb (void *cls, + struct GNUNET_SERVICE_Client *client, + struct GNUNET_MQ_Handle *mq) +{ + return client; +} + + +/** + * Called whenever a client is disconnected. + * Frees our resources associated with that client. * * @param cls closure * @param client identification of the client + * @param app_ctx must match @a client */ static void -cleanup_reservations (void *cls, - struct GNUNET_SERVER_Client *client) +client_disconnect_cb (void *cls, + struct GNUNET_SERVICE_Client *client, + void *app_ctx) { struct ReservationList *pos; struct ReservationList *prev; struct ReservationList *next; - if (NULL == client) - return; + GNUNET_assert (app_ctx == client); prev = NULL; pos = reservations; while (NULL != pos) @@ -1762,6 +1668,7 @@ cleanup_reservations (void *cls, gettext_noop ("# reserved"), reserved, GNUNET_NO); + } @@ -1769,19 +1676,19 @@ cleanup_reservations (void *cls, * Process datastore requests. * * @param cls closure - * @param serv the initialized server + * @param serv the initialized service * @param c configuration to use */ static void run (void *cls, - struct GNUNET_SERVER_Handle *serv, - const struct GNUNET_CONFIGURATION_Handle *c) + const struct GNUNET_CONFIGURATION_Handle *c, + struct GNUNET_SERVICE_Handle *serv) { char *fn; char *pfn; unsigned int bf_size; - server = serv; + service = serv; cfg = c; if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, @@ -1789,25 +1696,27 @@ run (void *cls, "DATABASE", &plugin_name)) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("No `%s' specified for `%s' in configuration!\n"), - "DATABASE", - "DATASTORE"); + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "DATABASE", + "DATASTORE"); return; } GNUNET_asprintf ("a_stat_name, _("# bytes used in file-sharing datastore `%s'"), plugin_name); if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_size (cfg, "DATASTORE", "QUOTA", "a)) + GNUNET_CONFIGURATION_get_value_size (cfg, + "DATASTORE", + "QUOTA", + "a)) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("No `%s' specified for `%s' in configuration!\n"), - "QUOTA", - "DATASTORE"); + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "QUOTA", + "DATASTORE"); return; } - stats = GNUNET_STATISTICS_create ("datastore", cfg); + stats = GNUNET_STATISTICS_create ("datastore", + cfg); GNUNET_STATISTICS_set (stats, gettext_noop ("# quota"), quota, @@ -1887,7 +1796,9 @@ run (void *cls, } else { - filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5); /* approx. 3% false positives at max use */ + filter = GNUNET_CONTAINER_bloomfilter_init (NULL, + bf_size, + 5); /* approx. 3% false positives at max use */ refresh_bf = GNUNET_YES; } GNUNET_free_non_null (fn); @@ -1897,12 +1808,13 @@ run (void *cls, _("Failed to initialize bloomfilter.\n")); if (NULL != stats) { - GNUNET_STATISTICS_destroy (stats, GNUNET_YES); + GNUNET_STATISTICS_destroy (stats, + GNUNET_YES); stats = NULL; } return; } - GNUNET_SERVER_suspend (server); + GNUNET_SERVICE_suspend (service); stat_get = GNUNET_STATISTICS_get (stats, "datastore", @@ -1911,39 +1823,69 @@ run (void *cls, &process_stat_in, NULL); if (NULL == stat_get) - process_stat_done (NULL, GNUNET_SYSERR); + process_stat_done (NULL, + GNUNET_SYSERR); else - stat_timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, - &stat_timeout, - NULL); - GNUNET_SERVER_disconnect_notify (server, - &cleanup_reservations, - NULL); + stat_timeout_task + = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + &stat_timeout, + NULL); GNUNET_SCHEDULER_add_shutdown (&cleaning_task, NULL); } /** - * The main function for the datastore service. - * - * @param argc number of arguments from the command line - * @param argv command line arguments - * @return 0 ok, 1 on error - */ -int -main (int argc, - char *const *argv) -{ - int ret; - - ret = - (GNUNET_OK == - GNUNET_SERVICE_run (argc, argv, "datastore", - GNUNET_SERVICE_OPTION_NONE, - &run, NULL)) ? 0 : 1; - return ret; -} + * Define "main" method using service macro. + */ +GNUNET_SERVICE_MAIN +("datastore", + GNUNET_SERVICE_OPTION_NONE, + &run, + &client_connect_cb, + &client_disconnect_cb, + NULL, + GNUNET_MQ_hd_fixed_size (reserve, + GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, + struct ReserveMessage, + NULL), + GNUNET_MQ_hd_fixed_size (release_reserve, + GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, + struct ReleaseReserveMessage, + NULL), + GNUNET_MQ_hd_var_size (put, + GNUNET_MESSAGE_TYPE_DATASTORE_PUT, + struct DataMessage, + NULL), + GNUNET_MQ_hd_fixed_size (update, + GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, + struct UpdateMessage, + NULL), + GNUNET_MQ_hd_fixed_size (get, + GNUNET_MESSAGE_TYPE_DATASTORE_GET, + struct GetMessage, + NULL), + GNUNET_MQ_hd_fixed_size (get_key, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY, + struct GetKeyMessage, + NULL), + GNUNET_MQ_hd_fixed_size (get_replication, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_fixed_size (get_zero_anonymity, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, + struct GetZeroAnonymityMessage, + NULL), + GNUNET_MQ_hd_var_size (remove, + GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, + struct DataMessage, + NULL), + GNUNET_MQ_hd_fixed_size (drop, + GNUNET_MESSAGE_TYPE_DATASTORE_DROP, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_handler_end ()); /* end of gnunet-service-datastore.c */