From: Christian Grothoff Date: Sun, 31 Jul 2016 21:23:23 +0000 (+0000) Subject: converting FS to new MQ-based core API X-Git-Tag: initial-import-from-subversion-38251~440 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=a78990b412db2c0ead2da8061c4f454f068991d1;p=oweals%2Fgnunet.git converting FS to new MQ-based core API --- diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 348bab092..bc0da09bc 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009-2014 GNUnet e.V. + Copyright (C) 2009-2014, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -177,7 +177,7 @@ static struct GNUNET_LOAD_Value *datastore_get_load; /** * Identity of this peer. */ -static struct GNUNET_PeerIdentity my_id; +struct GNUNET_PeerIdentity GSF_my_id; /** @@ -277,33 +277,26 @@ update_latencies (void *cls, /** - * Handle P2P "PUT" message. + * Check P2P "PUT" message. * - * @param cls closure, always NULL - * @param other the other peer involved (sender or receiver, NULL - * for loopback messages where we are both sender and receiver) + * @param cls closure with the `struct GSF_ConnectedPeer` * @param message the actual message * @return #GNUNET_OK to keep the connection open, * #GNUNET_SYSERR to close it (signal serious error) */ static int -handle_p2p_put (void *cls, - const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message) +check_p2p_put (void *cls, + const struct PutMessage *put) { - struct GSF_ConnectedPeer *cp; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received P2P PUT from %s\n", - GNUNET_i2s (other)); - cp = GSF_peer_get_ (other); - if (NULL == cp) + enum GNUNET_BLOCK_Type type; + + type = ntohl (put->type); + if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) { - GNUNET_break (0); - return GNUNET_OK; + GNUNET_break_op (0); + return GNUNET_SYSERR; } - GSF_cover_content_count++; - return GSF_handle_p2p_content_ (cp, message); + return GNUNET_OK; } @@ -324,7 +317,8 @@ consider_request_for_forwarding (void *cls, { struct GSF_PendingRequest *pr = cls; - if (GNUNET_YES != GSF_pending_request_test_target_ (pr, peer)) + if (GNUNET_YES != + GSF_pending_request_test_target_ (pr, peer)) { #if INSANE_STATISTICS GNUNET_STATISTICS_update (GSF_stats, @@ -333,7 +327,8 @@ consider_request_for_forwarding (void *cls, #endif return; } - GSF_plan_add_ (cp, pr); + GSF_plan_add_ (cp, + pr); } @@ -347,10 +342,10 @@ consider_request_for_forwarding (void *cls, * @param pr the pending request we were processing * @param result final datastore lookup result */ -static void -consider_forwarding (void *cls, - struct GSF_PendingRequest *pr, - enum GNUNET_BLOCK_EvaluationResult result) +void +GSF_consider_forwarding (void *cls, + struct GSF_PendingRequest *pr, + enum GNUNET_BLOCK_EvaluationResult result) { if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) return; /* we're done... */ @@ -363,31 +358,44 @@ consider_forwarding (void *cls, /** - * Handle P2P "GET" request. + * Check P2P "GET" request. * - * @param cls closure, always NULL - * @param other the other peer involved (sender or receiver, NULL - * for loopback messages where we are both sender and receiver) - * @param message the actual message + * @param cls closure + * @param gm the actual message * @return #GNUNET_OK to keep the connection open, * #GNUNET_SYSERR to close it (signal serious error) */ static int -handle_p2p_get (void *cls, - const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message) +check_p2p_get (void *cls, + const struct GetMessage *gm) { - struct GSF_PendingRequest *pr; - - pr = GSF_handle_p2p_query_ (other, - message); - if (NULL == pr) - return GNUNET_OK; /* exists, identical to existing request, or malformed */ - GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; - GSF_local_lookup_ (pr, - &consider_forwarding, - NULL); - return GNUNET_OK; + size_t msize; + unsigned int bm; + unsigned int bits; + size_t bfsize; + + msize = ntohs (gm->header.size); + bm = ntohl (gm->hash_bitmap); + bits = 0; + while (bm > 0) + { + if (1 == (bm & 1)) + bits++; + bm >>= 1; + } + if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity); + /* bfsize must be power of 2, check! */ + if (0 != ((bfsize - 1) & bfsize)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } @@ -416,7 +424,8 @@ start_p2p_processing (void *cls, prd = GSF_pending_request_get_data_ (pr); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Finished database lookup for local request `%s' with result %d\n", - GNUNET_h2s (&prd->query), result); + GNUNET_h2s (&prd->query), + result); if (0 == prd->anonymity_level) { switch (prd->type) @@ -439,7 +448,7 @@ start_p2p_processing (void *cls, break; } } - consider_forwarding (NULL, pr, result); + GSF_consider_forwarding (NULL, pr, result); } @@ -538,7 +547,7 @@ shutdown_task (void *cls) GSF_cadet_stop_server (); if (NULL != GSF_core) { - GNUNET_CORE_disconnect (GSF_core); + GNUNET_CORE_disconnecT (GSF_core); GSF_core = NULL; } if (NULL != GSF_ats) @@ -575,80 +584,7 @@ shutdown_task (void *cls) /** - * Function called for each pending request whenever a new - * peer connects, giving us a chance to decide about submitting - * the existing request to the new peer. - * - * @param cls the `struct GSF_ConnectedPeer` of the new peer - * @param key query for the request - * @param pr handle to the pending request - * @return #GNUNET_YES to continue to iterate - */ -static int -consider_peer_for_forwarding (void *cls, - const struct GNUNET_HashCode *key, - struct GSF_PendingRequest *pr) -{ - struct GSF_ConnectedPeer *cp = cls; - struct GNUNET_PeerIdentity pid; - - if (GNUNET_YES != - GSF_pending_request_test_active_ (pr)) - return GNUNET_YES; /* request is not actually active, skip! */ - GSF_connected_peer_get_identity_ (cp, &pid); - if (GNUNET_YES != - GSF_pending_request_test_target_ (pr, &pid)) - { - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# Loopback routes suppressed"), - 1, - GNUNET_NO); - return GNUNET_YES; - } - GSF_plan_add_ (cp, pr); - return GNUNET_YES; -} - - -/** - * Function called after the creation of a connected peer record is complete. - * - * @param cls closure (unused) - * @param cp handle to the newly created connected peer record - */ -static void -connected_peer_cb (void *cls, - struct GSF_ConnectedPeer *cp) -{ - if (NULL == cp) - return; - GSF_iterate_pending_requests_ (&consider_peer_for_forwarding, - cp); -} - - -/** - * Method called whenever a given peer connects. - * - * @param cls closure, not used - * @param peer peer identity this notification is about - */ -static void -peer_connect_handler (void *cls, - const struct GNUNET_PeerIdentity *peer) -{ - if (0 == - GNUNET_CRYPTO_cmp_peer_identity (&my_id, - peer)) - return; - GSF_peer_connect_handler_ (peer, - &connected_peer_cb, - NULL); -} - - -/** - * Function called after GNUNET_CORE_connect has succeeded + * Function called after GNUNET_CORE_connecT has succeeded * (or failed for good). Note that the private key of the * peer is intentionally not exposed here; if you need it, * your process should try to read the private key file @@ -661,7 +597,7 @@ static void peer_init_handler (void *cls, const struct GNUNET_PeerIdentity *my_identity) { - if (0 != GNUNET_CRYPTO_cmp_peer_identity (&my_id, + if (0 != GNUNET_CRYPTO_cmp_peer_identity (&GSF_my_id, my_identity)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -681,18 +617,23 @@ static int main_init (struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) { - static const struct GNUNET_CORE_MessageHandler no_p2p_handlers[] = { - { NULL, 0, 0 } + GNUNET_MQ_hd_var_size (p2p_get, + GNUNET_MESSAGE_TYPE_FS_GET, + struct GetMessage); + GNUNET_MQ_hd_var_size (p2p_put, + GNUNET_MESSAGE_TYPE_FS_PUT, + struct PutMessage); + GNUNET_MQ_hd_fixed_size (p2p_migration_stop, + GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, + struct MigrationStopMessage); + struct GNUNET_MQ_MessageHandler no_p2p_handlers[] = { + GNUNET_MQ_handler_end () }; - static const struct GNUNET_CORE_MessageHandler p2p_handlers[] = { - { &handle_p2p_get, - GNUNET_MESSAGE_TYPE_FS_GET, 0 }, - { &handle_p2p_put, - GNUNET_MESSAGE_TYPE_FS_PUT, 0 }, - { &GSF_handle_p2p_migration_stop_, - GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, - sizeof (struct MigrationStopMessage) }, - { NULL, 0, 0 } + struct GNUNET_MQ_MessageHandler p2p_handlers[] = { + make_p2p_get_handler (NULL), + make_p2p_put_handler (NULL), + make_p2p_migration_stop_handler (NULL), + GNUNET_MQ_handler_end () }; static const struct GNUNET_SERVER_MessageHandler handlers[] = { { &GNUNET_FS_handle_index_start, NULL, @@ -735,28 +676,29 @@ main_init (struct GNUNET_SERVER_Handle *server, GNUNET_free (keyfile); GNUNET_assert (NULL != pk); GNUNET_CRYPTO_eddsa_key_get_public (pk, - &my_id.public_key); + &GSF_my_id.public_key); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "I am peer %s\n", - GNUNET_i2s (&my_id)); + GNUNET_i2s (&GSF_my_id)); GSF_core - = GNUNET_CORE_connect (GSF_cfg, NULL, + = GNUNET_CORE_connecT (GSF_cfg, + NULL, &peer_init_handler, - &peer_connect_handler, - &GSF_peer_disconnect_handler_, - NULL, GNUNET_NO, - NULL, GNUNET_NO, + &GSF_peer_connect_handler, + &GSF_peer_disconnect_handler, (GNUNET_YES == anon_p2p_off) ? no_p2p_handlers : p2p_handlers); if (NULL == GSF_core) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Failed to connect to `%s' service.\n"), "core"); + _("Failed to connect to `%s' service.\n"), + "core"); return GNUNET_SYSERR; } - GNUNET_SERVER_disconnect_notify (server, &GSF_client_disconnect_handler_, + GNUNET_SERVER_disconnect_notify (server, + &GSF_client_disconnect_handler_, NULL); GNUNET_SERVER_add_handlers (server, handlers); cover_age_task = diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h index 92cb4088e..2a0f7ba29 100644 --- a/src/fs/gnunet-service-fs.h +++ b/src/fs/gnunet-service-fs.h @@ -223,6 +223,10 @@ extern struct GNUNET_TIME_Relative GSF_avg_latency; */ extern struct GNUNET_ATS_PerformanceHandle *GSF_ats; +/** + * Identity of this peer. + */ +extern struct GNUNET_PeerIdentity GSF_my_id; /** * Typical priorities we're seeing from other peers right now. Since @@ -264,14 +268,30 @@ extern int GSF_enable_randomized_delays; extern unsigned int GSF_datastore_queue_size; +/** + * Function to be called after we're done processing + * replies from the local lookup. If the result status + * code indicates that there may be more replies, plan + * forwarding the request. + * + * @param cls closure (NULL) + * @param pr the pending request we were processing + * @param result final datastore lookup result + */ +void +GSF_consider_forwarding (void *cls, + struct GSF_PendingRequest *pr, + enum GNUNET_BLOCK_EvaluationResult result); + + /** * Test if the DATABASE (GET) load on this peer is too high * to even consider processing the query at * all. * - * @return GNUNET_YES if the load is too high to do anything (load high) - * GNUNET_NO to process normally (load normal) - * GNUNET_SYSERR to process for free (load low) + * @return #GNUNET_YES if the load is too high to do anything (load high) + * #GNUNET_NO to process normally (load normal) + * #GNUNET_SYSERR to process for free (load low) */ int GSF_test_get_load_too_high_ (uint32_t priority); diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index bda33d766..3f7783ded 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2011 GNUnet e.V. + Copyright (C) 2011, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -78,35 +78,15 @@ struct GSF_PeerTransmitHandle struct GNUNET_TIME_Absolute transmission_request_start_time; /** - * Timeout for this request. + * Envelope with the actual message. */ - struct GNUNET_TIME_Absolute timeout; - - /** - * Task called on timeout, or 0 for none. - */ - struct GNUNET_SCHEDULER_Task *timeout_task; - - /** - * Function to call to get the actual message. - */ - GSF_GetMessageCallback gmc; + struct GNUNET_MQ_Envelope *env; /** * Peer this request targets. */ struct GSF_ConnectedPeer *cp; - /** - * Closure for @e gmc. - */ - void *gmc_cls; - - /** - * Size of the message to be transmitted. - */ - size_t size; - /** * #GNUNET_YES if this is a query, #GNUNET_NO for content. */ @@ -147,9 +127,9 @@ struct GSF_DelayedHandle struct GSF_ConnectedPeer *cp; /** - * The PUT that was delayed. + * Envelope of the message that was delayed. */ - struct PutMessage *pm; + struct GNUNET_MQ_Envelope *env; /** * Task for the delay. @@ -234,11 +214,6 @@ struct GSF_ConnectedPeer */ struct GSF_DelayedHandle *delayed_tail; - /** - * Migration stop message in our queue, or NULL if we have none pending. - */ - struct GSF_PeerTransmitHandle *migration_pth; - /** * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL). */ @@ -256,9 +231,9 @@ struct GSF_ConnectedPeer /** * Handle for an active request for transmission to this - * peer, or NULL (if core queue was full). + * peer. */ - struct GNUNET_CORE_TransmitHandle *cth; + struct GNUNET_MQ_Handle *mq; /** * Increase in traffic preference still to be submitted @@ -266,14 +241,6 @@ struct GSF_ConnectedPeer */ uint64_t inc_preference; - /** - * Set to 1 if we're currently in the process of calling - * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is - * NULL, we should not call notify_transmit_ready for this - * handle right now). - */ - unsigned int cth_in_progress; - /** * Number of entries in @e delayed_head DLL. */ @@ -307,16 +274,6 @@ struct GSF_ConnectedPeer */ int did_reserve; - /** - * Function called when the creation of this record is complete. - */ - GSF_ConnectedPeerCreationCallback creation_cb; - - /** - * Closure for @e creation_cb - */ - void *creation_cb_cls; - /** * Handle to the PEERSTORE iterate request for peer respect value */ @@ -377,15 +334,10 @@ GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp) /** * Core is ready to transmit to a peer, get the message. * - * @param cls the `struct GSF_PeerTransmitHandle` of the message - * @param size number of bytes core is willing to take - * @param buf where to copy the message - * @return number of bytes copied to @a buf + * @param cp which peer to send a message to */ -static size_t -peer_transmit_ready_cb (void *cls, - size_t size, - void *buf); +static void +peer_transmit (struct GSF_ConnectedPeer *cp); /** @@ -418,8 +370,6 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth) struct GNUNET_PeerIdentity target; cp = pth->cp; - if ((NULL != cp->cth) || (0 != cp->cth_in_progress)) - return; /* already done */ GNUNET_assert (0 != cp->ppd.pid); GNUNET_PEER_resolve (cp->ppd.pid, &target); @@ -449,52 +399,23 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth) cp); return; } - GNUNET_assert (NULL == cp->cth); - cp->cth_in_progress++; - cp->cth = - GNUNET_CORE_notify_transmit_ready (GSF_core, - GNUNET_YES, - GNUNET_CORE_PRIO_BACKGROUND, - GNUNET_TIME_absolute_get_remaining (pth->timeout), - &target, - pth->size, - &peer_transmit_ready_cb, cp); - GNUNET_assert (NULL != cp->cth); - GNUNET_assert (0 < cp->cth_in_progress--); + peer_transmit (cp); } /** * Core is ready to transmit to a peer, get the message. * - * @param cls the `struct GSF_PeerTransmitHandle` of the message - * @param size number of bytes core is willing to take - * @param buf where to copy the message - * @return number of bytes copied to @a buf + * @param cp which peer to send a message to */ -static size_t -peer_transmit_ready_cb (void *cls, - size_t size, - void *buf) +static void +peer_transmit (struct GSF_ConnectedPeer *cp) { - struct GSF_ConnectedPeer *cp = cls; struct GSF_PeerTransmitHandle *pth = cp->pth_head; struct GSF_PeerTransmitHandle *pos; - size_t ret; - cp->cth = NULL; if (NULL == pth) - return 0; - if (pth->size > size) - { - schedule_transmission (pth); - return 0; - } - if (NULL != pth->timeout_task) - { - GNUNET_SCHEDULER_cancel (pth->timeout_task); - pth->timeout_task = NULL; - } + return; GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth); @@ -512,14 +433,14 @@ peer_transmit_ready_cb (void *cls, GNUNET_LOAD_update (cp->ppd.transmission_delay, GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value_us); - ret = pth->gmc (pth->gmc_cls, size, buf); + GNUNET_MQ_send (cp->mq, + pth->env); + GNUNET_free (pth); if (NULL != (pos = cp->pth_head)) { GNUNET_assert (pos != pth); schedule_transmission (pos); } - GNUNET_free (pth); - return ret; } @@ -578,23 +499,10 @@ ats_reserve_callback (void *cls, } cp->did_reserve = GNUNET_YES; pth = cp->pth_head; - if ( (NULL != pth) && - (NULL == cp->cth) && - (0 == cp->cth_in_progress) ) + if (NULL != pth) { /* reservation success, try transmission now! */ - cp->cth_in_progress++; - cp->cth = - GNUNET_CORE_notify_transmit_ready (GSF_core, - GNUNET_YES, - GNUNET_CORE_PRIO_BACKGROUND, - GNUNET_TIME_absolute_get_remaining (pth->timeout), - peer, - pth->size, - &peer_transmit_ready_cb, - cp); - GNUNET_assert (NULL != cp->cth); - GNUNET_assert (0 < cp->cth_in_progress--); + peer_transmit (cp); } } @@ -614,37 +522,82 @@ peer_respect_cb (void *cls, struct GSF_ConnectedPeer *cp = cls; GNUNET_assert (NULL != cp->respect_iterate_req); - if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size)) - cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value); + if ( (NULL != record) && + (sizeof (cp->disk_respect) == record->value_size)) + { + cp->disk_respect = *((uint32_t *)record->value); + cp->ppd.respect += *((uint32_t *)record->value); + } GSF_push_start_ (cp); - if (NULL != cp->creation_cb) - cp->creation_cb (cp->creation_cb_cls, cp); if (NULL != record) GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); cp->respect_iterate_req = NULL; } +/** + * Function called for each pending request whenever a new + * peer connects, giving us a chance to decide about submitting + * the existing request to the new peer. + * + * @param cls the `struct GSF_ConnectedPeer` of the new peer + * @param key query for the request + * @param pr handle to the pending request + * @return #GNUNET_YES to continue to iterate + */ +static int +consider_peer_for_forwarding (void *cls, + const struct GNUNET_HashCode *key, + struct GSF_PendingRequest *pr) +{ + struct GSF_ConnectedPeer *cp = cls; + struct GNUNET_PeerIdentity pid; + + if (GNUNET_YES != + GSF_pending_request_test_active_ (pr)) + return GNUNET_YES; /* request is not actually active, skip! */ + GSF_connected_peer_get_identity_ (cp, &pid); + if (GNUNET_YES != + GSF_pending_request_test_target_ (pr, &pid)) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Loopback routes suppressed"), + 1, + GNUNET_NO); + return GNUNET_YES; + } + GSF_plan_add_ (cp, pr); + return GNUNET_YES; +} + + /** * A peer connected to us. Setup the connected peer * records. * + * @param cls NULL * @param peer identity of peer that connected - * @param creation_cb callback function when the record is created. - * @param creation_cb_cls closure for @creation_cb + * @param mq message queue for talking to @a peer + * @return our internal handle for the peer */ -void -GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, - GSF_ConnectedPeerCreationCallback creation_cb, - void *creation_cb_cls) +void * +GSF_peer_connect_handler (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Handle *mq) { struct GSF_ConnectedPeer *cp; + if (0 == + GNUNET_CRYPTO_cmp_peer_identity (&GSF_my_id, + peer)) + return NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to peer %s\n", GNUNET_i2s (peer)); cp = GNUNET_new (struct GSF_ConnectedPeer); cp->ppd.pid = GNUNET_PEER_intern (peer); + cp->ppd.peer = peer; + cp->mq = mq; cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO); cp->rc = GNUNET_ATS_reserve_bandwidth (GSF_ats, @@ -662,14 +615,17 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, gettext_noop ("# peers connected"), GNUNET_CONTAINER_multipeermap_size (cp_map), GNUNET_NO); - cp->creation_cb = creation_cb; - cp->creation_cb_cls = creation_cb_cls; - cp->respect_iterate_req = - GNUNET_PEERSTORE_iterate (peerstore, "fs", - peer, "respect", + cp->respect_iterate_req + = GNUNET_PEERSTORE_iterate (peerstore, + "fs", + peer, + "respect", GNUNET_TIME_UNIT_FOREVER_REL, &peer_respect_cb, cp); + GSF_iterate_pending_requests_ (&consider_peer_for_forwarding, + cp); + return cp; } @@ -714,38 +670,25 @@ GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer) /** - * Handle P2P "MIGRATION_STOP" message. + * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message. * - * @param cls closure, always NULL - * @param other the other peer involved (sender or receiver, NULL - * for loopback messages where we are both sender and receiver) - * @param message the actual message - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @param cls closure, the `struct GSF_ConnectedPeer` + * @param msm the actual message */ -int -GSF_handle_p2p_migration_stop_ (void *cls, - const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message) +void +handle_p2p_migration_stop (void *cls, + const struct MigrationStopMessage *msm) { - struct GSF_ConnectedPeer *cp; - const struct MigrationStopMessage *msm; + struct GSF_ConnectedPeer *cp = cls; struct GNUNET_TIME_Relative bt; - msm = (const struct MigrationStopMessage *) message; - cp = GSF_peer_get_ (other); - if (NULL == cp) - { - GNUNET_break (0); - return GNUNET_OK; - } GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# migration stop messages received"), 1, GNUNET_NO); bt = GNUNET_TIME_relative_ntoh (msm->duration); GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Migration of content to peer `%s' blocked for %s\n"), - GNUNET_i2s (other), + GNUNET_i2s (cp->ppd.peer), GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES)); cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt); if ( (NULL == cp->mig_revive_task) && @@ -756,46 +699,6 @@ GSF_handle_p2p_migration_stop_ (void *cls, GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp); } - return GNUNET_OK; -} - - -/** - * Copy reply and free put message. - * - * @param cls the `struct PutMessage` - * @param buf_size number of bytes available in @a buf - * @param buf where to copy the message, NULL on error (peer disconnect) - * @return number of bytes copied to @a buf, can be 0 (without indicating an error) - */ -static size_t -copy_reply (void *cls, - size_t buf_size, - void *buf) -{ - struct PutMessage *pm = cls; - size_t size; - - if (NULL != buf) - { - GNUNET_assert (buf_size >= ntohs (pm->header.size)); - size = ntohs (pm->header.size); - GNUNET_memcpy (buf, pm, size); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# replies transmitted to other peers"), - 1, - GNUNET_NO); - } - else - { - size = 0; - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# replies dropped"), - 1, - GNUNET_NO); - } - GNUNET_free (pm); - return size; } @@ -886,13 +789,10 @@ transmit_delayed_now (void *cls) cp->delayed_tail, dh); cp->delay_queue_size--; - (void) GSF_peer_transmit_ (cp, - GNUNET_NO, - UINT32_MAX, - REPLY_TIMEOUT, - dh->msize, - ©_reply, - dh->pm); + GSF_peer_transmit_ (cp, + GNUNET_NO, + UINT32_MAX, + dh->env); GNUNET_free (dh); } @@ -954,6 +854,7 @@ handle_p2p_reply (void *cls, struct PeerRequest *peerreq = cls; struct GSF_ConnectedPeer *cp = peerreq->cp; struct GSF_PendingRequestData *prd; + struct GNUNET_MQ_Envelope *env; struct PutMessage *pm; size_t msize; @@ -1000,12 +901,14 @@ handle_p2p_reply (void *cls, GSF_cover_content_count -= (reply_anonymity_level - 1); } - pm = GNUNET_malloc (msize); - pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); - pm->header.size = htons (msize); + env = GNUNET_MQ_msg_extra (pm, + data_len, + GNUNET_MESSAGE_TYPE_FS_PUT); pm->type = htonl (type); pm->expiration = GNUNET_TIME_absolute_hton (expiration); - GNUNET_memcpy (&pm[1], data, data_len); + GNUNET_memcpy (&pm[1], + data, + data_len); if ( (UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) && (GNUNET_YES == GSF_enable_randomized_delays) ) @@ -1014,7 +917,7 @@ handle_p2p_reply (void *cls, dh = GNUNET_new (struct GSF_DelayedHandle); dh->cp = cp; - dh->pm = pm; + dh->env = env; dh->msize = msize; GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, @@ -1027,13 +930,10 @@ handle_p2p_reply (void *cls, } else { - (void) GSF_peer_transmit_ (cp, - GNUNET_NO, - UINT32_MAX, - REPLY_TIMEOUT, - msize, - ©_reply, - pm); + GSF_peer_transmit_ (cp, + GNUNET_NO, + UINT32_MAX, + env); } if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval) return; @@ -1265,23 +1165,20 @@ test_exist_cb (void *cls, * process replies properly. Does not initiate forwarding or * local database lookups. * - * @param other the other peer involved (sender or receiver, NULL - * for loopback messages where we are both sender and receiver) - * @param message the actual message - * @return pending request handle, NULL on error + * @param cls the other peer involved (sender of the message) + * @param gm the GET message */ -struct GSF_PendingRequest * -GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message) +void +handle_p2p_get (void *cls, + const struct GetMessage *gm) { + struct GSF_ConnectedPeer *cps = cls; struct PeerRequest *peerreq; struct GSF_PendingRequest *pr; struct GSF_ConnectedPeer *cp; - struct GSF_ConnectedPeer *cps; const struct GNUNET_PeerIdentity *target; enum GSF_PendingRequestOptions options; uint16_t msize; - const struct GetMessage *gm; unsigned int bits; const struct GNUNET_PeerIdentity *opt; uint32_t bm; @@ -1291,18 +1188,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, GNUNET_PEER_Id spid; const struct GSF_PendingRequestData *prd; - msize = ntohs (message->size); - if (msize < sizeof (struct GetMessage)) - { - GNUNET_break_op (0); - return NULL; - } - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop - ("# GET requests received (from other peers)"), - 1, - GNUNET_NO); - gm = (const struct GetMessage *) message; + msize = ntohs (gm->header.size); tec.type = ntohl (gm->type); bm = ntohl (gm->hash_bitmap); bits = 0; @@ -1312,32 +1198,16 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, bits++; bm >>= 1; } - if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity)) - { - GNUNET_break_op (0); - return NULL; - } opt = (const struct GNUNET_PeerIdentity *) &gm[1]; bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity); - /* bfsize must be power of 2, check! */ - if (0 != ((bfsize - 1) & bfsize)) - { - GNUNET_break_op (0); - return NULL; - } + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop + ("# GET requests received (from other peers)"), + 1, + GNUNET_NO); GSF_cover_query_count++; bm = ntohl (gm->hash_bitmap); bits = 0; - cps = GSF_peer_get_ (other); - if (NULL == cps) - { - /* peer must have just disconnected */ - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop - ("# requests dropped due to initiator not being connected"), - 1, GNUNET_NO); - return NULL; - } if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) cp = GSF_peer_get_ (&opt[bits++]); else @@ -1352,24 +1222,24 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, else GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to find peer `%s' in connection set. Dropping query.\n", - GNUNET_i2s (other)); + GNUNET_i2s (cps->ppd.peer)); GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests dropped due to missing reverse route"), 1, GNUNET_NO); - return NULL; + return; } if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer `%s' has too many replies queued already. Dropping query.\n", - GNUNET_i2s (other)); + GNUNET_i2s (cps->ppd.peer)); GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests dropped due to full reply queue"), 1, GNUNET_NO); - return NULL; + return; } /* note that we can really only check load here since otherwise * peers could find out that we are overloaded by not being @@ -1380,14 +1250,14 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Dropping query from `%s', this peer is too busy.\n", - GNUNET_i2s (other)); - return NULL; + GNUNET_i2s (cps->ppd.peer)); + return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received request for `%s' of type %u from peer `%s' with flags %u\n", GNUNET_h2s (&gm->query), (unsigned int) tec.type, - GNUNET_i2s (other), + GNUNET_i2s (cps->ppd.peer), (unsigned int) bm); target = (0 != @@ -1403,7 +1273,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, * so at best indirect the query */ tec.priority = 0; options |= GSF_PRO_FORWARD_ONLY; - spid = GNUNET_PEER_intern (other); + spid = GNUNET_PEER_intern (cps->ppd.peer); GNUNET_assert (0 != spid); } tec.ttl = bound_ttl (ntohl (gm->ttl), @@ -1412,11 +1282,12 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, ttl_decrement = 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, TTL_DECREMENT); - if ((tec.ttl < 0) && (((int32_t) (tec.ttl - ttl_decrement)) > 0)) + if ( (tec.ttl < 0) && + (((int32_t) (tec.ttl - ttl_decrement)) > 0) ) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Dropping query from `%s' due to TTL underflow (%d - %u).\n", - GNUNET_i2s (other), + GNUNET_i2s (cps->ppd.peer), tec.ttl, ttl_decrement); GNUNET_STATISTICS_update (GSF_stats, @@ -1424,7 +1295,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, ("# requests dropped due TTL underflow"), 1, GNUNET_NO); /* integer underflow => drop (should be very rare)! */ - return NULL; + return; } tec.ttl -= ttl_decrement; @@ -1435,7 +1306,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, &test_exist_cb, &tec); if (GNUNET_YES == tec.finished) - return NULL; /* merged into existing request, we're done */ + return; /* merged into existing request, we're done */ peerreq = GNUNET_new (struct PeerRequest); peerreq->cp = cp; @@ -1452,7 +1323,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, (uint32_t) tec.priority, tec.ttl, spid, - GNUNET_PEER_intern (other), + GNUNET_PEER_intern (cps->ppd.peer), NULL, 0, /* replies_seen */ &handle_p2p_reply, peerreq); @@ -1472,43 +1343,10 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, gettext_noop ("# P2P searches active"), 1, GNUNET_NO); - return pr; -} - - -/** - * Function called if there has been a timeout trying to satisfy - * a transmission request. - * - * @param cls the `struct GSF_PeerTransmitHandle` of the request - */ -static void -peer_transmit_timeout (void *cls) -{ - struct GSF_PeerTransmitHandle *pth = cls; - struct GSF_ConnectedPeer *cp; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Timeout trying to transmit to other peer\n"); - pth->timeout_task = NULL; - cp = pth->cp; - GNUNET_CONTAINER_DLL_remove (cp->pth_head, - cp->pth_tail, - pth); - if (GNUNET_YES == pth->is_query) - GNUNET_assert (0 < cp->ppd.pending_queries--); - else if (GNUNET_NO == pth->is_query) - GNUNET_assert (0 < cp->ppd.pending_replies--); - GNUNET_LOAD_update (cp->ppd.transmission_delay, - UINT64_MAX); - if (NULL != cp->cth) - { - GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); - cp->cth = NULL; - } - pth->gmc (pth->gmc_cls, 0, NULL); - GNUNET_assert (0 == cp->cth_in_progress); - GNUNET_free (pth); + GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; + GSF_local_lookup_ (pr, + &GSF_consider_forwarding, + NULL); } @@ -1520,19 +1358,15 @@ peer_transmit_timeout (void *cls) * @param cp target peer * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR) * @param priority how important is this request? - * @param timeout when does this request timeout (call gmc with error) + * @param timeout when does this request timeout * @param size number of bytes we would like to send to the peer - * @param gmc function to call to get the message - * @param gmc_cls closure for @a gmc - * @return handle to cancel request + * @param env message to send */ -struct GSF_PeerTransmitHandle * +void GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, int is_query, uint32_t priority, - struct GNUNET_TIME_Relative timeout, - size_t size, - GSF_GetMessageCallback gmc, void *gmc_cls) + struct GNUNET_MQ_Envelope *env) { struct GSF_PeerTransmitHandle *pth; struct GSF_PeerTransmitHandle *pos; @@ -1540,10 +1374,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, pth = GNUNET_new (struct GSF_PeerTransmitHandle); pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); - pth->timeout = GNUNET_TIME_relative_to_absolute (timeout); - pth->gmc = gmc; - pth->gmc_cls = gmc_cls; - pth->size = size; + pth->env = env; pth->is_query = is_query; pth->priority = priority; pth->cp = cp; @@ -1563,39 +1394,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, cp->ppd.pending_queries++; else if (GNUNET_NO == is_query) cp->ppd.pending_replies++; - pth->timeout_task - = GNUNET_SCHEDULER_add_delayed (timeout, - &peer_transmit_timeout, - pth); schedule_transmission (pth); - return pth; -} - - -/** - * Cancel an earlier request for transmission. - * - * @param pth request to cancel - */ -void -GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth) -{ - struct GSF_ConnectedPeer *cp; - - if (NULL != pth->timeout_task) - { - GNUNET_SCHEDULER_cancel (pth->timeout_task); - pth->timeout_task = NULL; - } - cp = pth->cp; - GNUNET_CONTAINER_DLL_remove (cp->pth_head, - cp->pth_tail, - pth); - if (GNUNET_YES == pth->is_query) - GNUNET_assert (0 < cp->ppd.pending_queries--); - else if (GNUNET_NO == pth->is_query) - GNUNET_assert (0 < cp->ppd.pending_replies--); - GNUNET_free (pth); } @@ -1683,7 +1482,9 @@ flush_respect (void *cls, GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect, sizeof (cp->ppd.respect), GNUNET_TIME_UNIT_FOREVER_ABS, - GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL); + GNUNET_PEERSTORE_STOREOPTION_REPLACE, + NULL, + NULL); return GNUNET_OK; } @@ -1693,26 +1494,30 @@ flush_respect (void *cls, * record. * * @param cls unused - * @param peer identity of peer that connected + * @param peer identity of peer that disconnected + * @param internal_cls the corresponding `struct GSF_ConnectedPeer` */ void -GSF_peer_disconnect_handler_ (void *cls, - const struct GNUNET_PeerIdentity *peer) +GSF_peer_disconnect_handler (void *cls, + const struct GNUNET_PeerIdentity *peer, + void *internal_cls) { - struct GSF_ConnectedPeer *cp; + struct GSF_ConnectedPeer *cp = internal_cls; struct GSF_PeerTransmitHandle *pth; struct GSF_DelayedHandle *dh; - cp = GSF_peer_get_ (peer); if (NULL == cp) - return; /* must have been disconnect from core with - * 'peer' == my_id, ignore */ - flush_respect (NULL, peer, cp); + return; /* must have been disconnect from core with + * 'peer' == my_id, ignore */ + flush_respect (NULL, + peer, + cp); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (cp_map, peer, cp)); - GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"), + GNUNET_STATISTICS_set (GSF_stats, + gettext_noop ("# peers connected"), GNUNET_CONTAINER_multipeermap_size (cp_map), GNUNET_NO); if (NULL != cp->respect_iterate_req) @@ -1720,11 +1525,6 @@ GSF_peer_disconnect_handler_ (void *cls, GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); cp->respect_iterate_req = NULL; } - if (NULL != cp->migration_pth) - { - GSF_peer_transmit_cancel_ (cp->migration_pth); - cp->migration_pth = NULL; - } if (NULL != cp->rc) { GNUNET_ATS_reserve_bandwidth_cancel (cp->rc); @@ -1748,19 +1548,8 @@ GSF_peer_disconnect_handler_ (void *cls, 0, sizeof (cp->ppd.last_p2p_replies)); GSF_push_stop_ (cp); - if (NULL != cp->cth) - { - GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); - cp->cth = NULL; - } - GNUNET_assert (0 == cp->cth_in_progress); while (NULL != (pth = cp->pth_head)) { - if (pth->timeout_task != NULL) - { - GNUNET_SCHEDULER_cancel (pth->timeout_task); - pth->timeout_task = NULL; - } GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth); @@ -1768,7 +1557,6 @@ GSF_peer_disconnect_handler_ (void *cls, GNUNET_assert (0 < cp->ppd.pending_queries--); else if (GNUNET_NO == pth->is_query) GNUNET_assert (0 < cp->ppd.pending_replies--); - pth->gmc (pth->gmc_cls, 0, NULL); GNUNET_free (pth); } while (NULL != (dh = cp->delayed_head)) @@ -1776,9 +1564,9 @@ GSF_peer_disconnect_handler_ (void *cls, GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh); + GNUNET_MQ_discard (dh->env); cp->delay_queue_size--; GNUNET_SCHEDULER_cancel (dh->delay_task); - GNUNET_free (dh->pm); GNUNET_free (dh); } GNUNET_PEER_change_rc (cp->ppd.pid, -1); @@ -1882,40 +1670,6 @@ GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp) } -/** - * Assemble a migration stop message for transmission. - * - * @param cls the `struct GSF_ConnectedPeer` to use - * @param size number of bytes we're allowed to write to @a buf - * @param buf where to copy the message - * @return number of bytes copied to @a buf - */ -static size_t -create_migration_stop_message (void *cls, - size_t size, - void *buf) -{ - struct GSF_ConnectedPeer *cp = cls; - struct MigrationStopMessage msm; - - cp->migration_pth = NULL; - if (NULL == buf) - return 0; - GNUNET_assert (size >= sizeof (struct MigrationStopMessage)); - msm.header.size = htons (sizeof (struct MigrationStopMessage)); - msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); - msm.reserved = htonl (0); - msm.duration = - GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining - (cp->last_migration_block)); - GNUNET_memcpy (buf, &msm, sizeof (struct MigrationStopMessage)); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# migration stop messages sent"), - 1, GNUNET_NO); - return sizeof (struct MigrationStopMessage); -} - - /** * Ask a peer to stop migrating data to us until the given point * in time. @@ -1927,6 +1681,9 @@ void GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, struct GNUNET_TIME_Absolute block_time) { + struct GNUNET_MQ_Envelope *env; + struct MigrationStopMessage *msm; + if (cp->last_migration_block.abs_value_us > block_time.abs_value_us) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1939,13 +1696,20 @@ GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time), GNUNET_YES)); cp->last_migration_block = block_time; - if (NULL != cp->migration_pth) - GSF_peer_transmit_cancel_ (cp->migration_pth); - cp->migration_pth = - GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, - sizeof (struct MigrationStopMessage), - &create_migration_stop_message, cp); + env = GNUNET_MQ_msg (msm, + GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); + msm->reserved = htonl (0); + msm->duration + = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining + (cp->last_migration_block)); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# migration stop messages sent"), + 1, + GNUNET_NO); + GSF_peer_transmit_ (cp, + GNUNET_SYSERR, + UINT32_MAX, + env); } @@ -1997,24 +1761,6 @@ GSF_connected_peer_init_ () } -/** - * Iterator to free peer entries. - * - * @param cls closure, unused - * @param key current key code - * @param value value in the hash map (peer entry) - * @return #GNUNET_YES (we should continue to iterate) - */ -static int -clean_peer (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) -{ - GSF_peer_disconnect_handler_ (NULL, key); - return GNUNET_YES; -} - - /** * Shutdown peer management subsystem. */ @@ -2024,9 +1770,6 @@ GSF_connected_peer_done_ () GNUNET_CONTAINER_multipeermap_iterate (cp_map, &flush_respect, NULL); - GNUNET_CONTAINER_multipeermap_iterate (cp_map, - &clean_peer, - NULL); GNUNET_SCHEDULER_cancel (fr_task); fr_task = NULL; GNUNET_CONTAINER_multipeermap_destroy (cp_map); @@ -2072,7 +1815,8 @@ GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc) { if (NULL == cp_map) return; /* already cleaned up */ - GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client, + GNUNET_CONTAINER_multipeermap_iterate (cp_map, + &clean_local_client, (void *) lc); } diff --git a/src/fs/gnunet-service-fs_cp.h b/src/fs/gnunet-service-fs_cp.h index 491ab34e3..3aba5c6a6 100644 --- a/src/fs/gnunet-service-fs_cp.h +++ b/src/fs/gnunet-service-fs_cp.h @@ -120,10 +120,15 @@ struct GSF_PeerPerformanceData double avg_priority; /** - * The peer's identity. + * The peer's identity (interned version). */ GNUNET_PEER_Id pid; + /** + * The peer's identity (pointer). + */ + const struct GNUNET_PeerIdentity *peer; + /** * Respect rating for this peer */ @@ -184,17 +189,6 @@ typedef void int success); -/** - * Function called after the creation of a connected peer record is complete. - * - * @param cls closure - * @param cp handle to the newly created connected peer record - */ -typedef void -(*GSF_ConnectedPeerCreationCallback) (void *cls, - struct GSF_ConnectedPeer *cp); - - /** * Handle to cancel a transmission request. */ @@ -205,14 +199,15 @@ struct GSF_PeerTransmitHandle; * A peer connected to us. Setup the connected peer * records. * + * @param cls NULL * @param peer identity of peer that connected - * @param creation_cb callback function when the record is created. - * @param creation_cb_cls closure for @creation_cb + * @param mq queue for sending messages to @a peer + * @return internal handle for the peer */ -void -GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, - GSF_ConnectedPeerCreationCallback creation_cb, - void *creation_cb_cls); +void * +GSF_peer_connect_handler (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Handle *mq); /** @@ -242,30 +237,15 @@ GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id, * the callback is invoked with a 'NULL' buffer. * * @param cp target peer - * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) + * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) * @param priority how important is this request? - * @param timeout when does this request timeout (call gmc with error) - * @param size number of bytes we would like to send to the peer - * @param gmc function to call to get the message - * @param gmc_cls closure for gmc - * @return handle to cancel request + * @param env envelope of message to send */ -struct GSF_PeerTransmitHandle * +void GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, int is_query, uint32_t priority, - struct GNUNET_TIME_Relative timeout, - size_t size, GSF_GetMessageCallback gmc, - void *gmc_cls); - - -/** - * Cancel an earlier request for transmission. - * - * @param pth request to cancel - */ -void -GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth); + struct GNUNET_MQ_Envelope *env); /** @@ -307,35 +287,25 @@ GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp, /** - * Handle P2P "MIGRATION_STOP" message. + * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message. * - * @param cls closure, always NULL - * @param other the other peer involved (sender or receiver, NULL - * for loopback messages where we are both sender and receiver) - * @param message the actual message - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @param cls closure, the `struct GSF_ConnectedPeer` + * @param msm the actual message */ -int -GSF_handle_p2p_migration_stop_ (void *cls, - const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message); +void +handle_p2p_migration_stop (void *cls, + const struct MigrationStopMessage *message); /** - * Handle P2P "QUERY" message. Only responsible for creating the - * request entry itself and setting up reply callback and cancellation - * on peer disconnect. Does NOT execute the actual request strategy - * (planning) or local database operations. + * Handle P2P "QUERY" message. * - * @param other the other peer involved (sender or receiver, NULL - * for loopback messages where we are both sender and receiver) - * @param message the actual message - * @return pending request handle, NULL on error + * @param cls the `struct GSF_ConnectedPeer` of the other sender + * @param gm the actual message */ -struct GSF_PendingRequest * -GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message); +void +handle_p2p_get (void *cls, + const struct GetMessage *gm); /** @@ -366,10 +336,12 @@ GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, * * @param cls unused * @param peer identity of peer that connected + * @param internal_cls our `struct GSF_ConnectedPeer` for @a peer */ void -GSF_peer_disconnect_handler_ (void *cls, - const struct GNUNET_PeerIdentity *peer); +GSF_peer_disconnect_handler (void *cls, + const struct GNUNET_PeerIdentity *peer, + void *internal_cls); /** diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index b338c1a13..098c3d180 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c @@ -188,11 +188,6 @@ struct PeerPlan */ struct GNUNET_CONTAINER_MultiHashMap *plan_map; - /** - * Current transmission request handle. - */ - struct GSF_PeerTransmitHandle *pth; - /** * Peer for which this is the plan. */ @@ -202,6 +197,12 @@ struct PeerPlan * Current task for executing the plan. */ struct GNUNET_SCHEDULER_Task *task; + + /** + * Current message under transmission for the plan. + */ + struct GNUNET_MQ_Envelope *env; + }; @@ -240,15 +241,6 @@ get_rp_key (struct GSF_RequestPlan *rp) } -/** - * Figure out when and how to transmit to the given peer. - * - * @param cls the `struct GSF_ConnectedPeer` for transmission - */ -static void -schedule_peer_transmission (void *cls); - - /** * Insert the given request plan into the heap with the appropriate weight. * @@ -329,21 +321,22 @@ plan (struct PeerPlan *pp, rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Earliest (re)transmission for `%s' in %us\n", - GNUNET_h2s (&prd->query), rp->transmission_counter); + GNUNET_h2s (&prd->query), + rp->transmission_counter); GNUNET_assert (rp->hn == NULL); if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us) - rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority); + rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, + rp, + rp->priority); else rp->hn = - GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp, + GNUNET_CONTAINER_heap_insert (pp->delay_heap, + rp, rp->earliest_transmission.abs_value_us); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map, get_rp_key (rp), rp)); - if (NULL != pp->task) - GNUNET_SCHEDULER_cancel (pp->task); - pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); #undef N } @@ -382,75 +375,6 @@ get_latest (const struct GSF_RequestPlan *rp) } -/** - * Function called to get a message for transmission. - * - * @param cls closure - * @param buf_size number of bytes available in @a buf - * @param buf where to copy the message, NULL on error (peer disconnect) - * @return number of bytes copied to @a buf, can be 0 (without indicating an error) - */ -static size_t -transmit_message_callback (void *cls, - size_t buf_size, - void *buf) -{ - struct PeerPlan *pp = cls; - struct GSF_RequestPlan *rp; - size_t msize; - - pp->pth = NULL; - if (NULL == buf) - { - /* failed, try again... */ - if (NULL != pp->task) - GNUNET_SCHEDULER_cancel (pp->task); - - pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop - ("# transmission failed (core has no bandwidth)"), - 1, GNUNET_NO); - return 0; - } - rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); - if (NULL == rp) - { - if (NULL != pp->task) - GNUNET_SCHEDULER_cancel (pp->task); - pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); - return 0; - } - msize = GSF_pending_request_get_message_ (get_latest (rp), - buf_size, - buf); - if (msize > buf_size) - { - if (NULL != pp->task) - GNUNET_SCHEDULER_cancel (pp->task); - /* buffer to small (message changed), try again */ - pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); - return 0; - } - /* remove from root, add again elsewhere... */ - GNUNET_assert (rp == - GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)); - rp->hn = NULL; - rp->last_transmission = GNUNET_TIME_absolute_get (); - rp->transmission_counter++; - total_delay++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Executing plan %p executed %u times, planning retransmission\n", - rp, rp->transmission_counter); - plan (pp, rp); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# query messages sent to other peers"), - 1, - GNUNET_NO); - return msize; -} - - /** * Figure out when and how to transmit to the given peer. * @@ -461,14 +385,16 @@ schedule_peer_transmission (void *cls) { struct PeerPlan *pp = cls; struct GSF_RequestPlan *rp; - size_t msize; struct GNUNET_TIME_Relative delay; - pp->task = NULL; - if (NULL != pp->pth) + if (NULL != pp->task) + { + pp->task = NULL; + } + else { - GSF_peer_transmit_cancel_ (pp->pth); - pp->pth = NULL; + GNUNET_assert (NULL != pp->env); + pp->env = NULL; } /* move ready requests to priority queue */ while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && @@ -508,23 +434,40 @@ schedule_peer_transmission (void *cls) return; } #if INSANE_STATISTICS - GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"), - 1, GNUNET_NO); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# query plans executed"), + 1, + GNUNET_NO); #endif /* process from priority heap */ - rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); + rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp); GNUNET_assert (NULL != rp); - msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL); - pp->pth = - GSF_peer_transmit_ (pp->cp, GNUNET_YES, - rp->priority, - GNUNET_TIME_UNIT_FOREVER_REL, - msize, - &transmit_message_callback, pp); - GNUNET_assert (NULL != pp->pth); + rp->hn = NULL; + rp->last_transmission = GNUNET_TIME_absolute_get (); + rp->transmission_counter++; + total_delay++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Executing plan %p executed %u times, planning retransmission\n", + rp, + rp->transmission_counter); + GNUNET_assert (NULL == pp->env); + pp->env = GSF_pending_request_get_message_ (get_latest (rp)); + GNUNET_MQ_notify_sent (pp->env, + &schedule_peer_transmission, + pp); + GSF_peer_transmit_ (pp->cp, + GNUNET_YES, + rp->priority, + pp->env); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# query messages sent to other peers"), + 1, + GNUNET_NO); + plan (pp, + rp); } @@ -646,6 +589,8 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, id, pp, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, + pp); } mpc.merged = GNUNET_NO; mpc.pr = pr; @@ -710,16 +655,16 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (plans, id, pp)); - if (NULL != pp->pth) - { - GSF_peer_transmit_cancel_ (pp->pth); - pp->pth = NULL; - } if (NULL != pp->task) { GNUNET_SCHEDULER_cancel (pp->task); pp->task = NULL; } + if (NULL != pp->env) + { + GNUNET_MQ_send_cancel (pp->env); + pp->env = NULL; + } while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) { GNUNET_break (GNUNET_YES == diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index cd58992c1..f8a7b61f0 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -512,21 +512,17 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, /** * Generate the message corresponding to the given pending request for - * transmission to other peers (or at least determine its size). + * transmission to other peers. * * @param pr request to generate the message for - * @param buf_size number of bytes available in @a buf - * @param buf where to copy the message (can be NULL) - * @return number of bytes needed (if `>` @a buf_size) or used + * @return envelope with the request message */ -size_t -GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, - size_t buf_size, void *buf) +struct GNUNET_MQ_Envelope * +GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr) { - char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE]; + struct GNUNET_MQ_Envelope *env; struct GetMessage *gm; struct GNUNET_PeerIdentity *ext; - size_t msize; unsigned int k; uint32_t bm; uint32_t prio; @@ -535,11 +531,10 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, int64_t ttl; int do_route; - if (buf_size > 0) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Building request message for `%s' of type %d\n", - GNUNET_h2s (&pr->public_data.query), - pr->public_data.type); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Building request message for `%s' of type %d\n", + GNUNET_h2s (&pr->public_data.query), + pr->public_data.type); k = 0; bm = 0; do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY)); @@ -559,13 +554,9 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, k++; } bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf); - msize = sizeof (struct GetMessage) + bf_size + k * sizeof (struct GNUNET_PeerIdentity); - GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); - if (buf_size < msize) - return msize; - gm = (struct GetMessage *) lbuf; - gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); - gm->header.size = htons (msize); + env = GNUNET_MQ_msg_extra (gm, + bf_size + k * sizeof (struct GNUNET_PeerIdentity), + GNUNET_MESSAGE_TYPE_FS_GET); gm->type = htonl (pr->public_data.type); if (do_route) prio = @@ -585,7 +576,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, gm->query = pr->public_data.query; ext = (struct GNUNET_PeerIdentity *) &gm[1]; k = 0; - if (!do_route) + if (! do_route) GNUNET_PEER_resolve (pr->sender_pid, &ext[k++]); if (NULL != pr->public_data.target) @@ -595,8 +586,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, (char *) &ext[k], bf_size)); - GNUNET_memcpy (buf, gm, msize); - return msize; + return env; } @@ -1699,18 +1689,14 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, * this content and possibly passes it on (to local clients or other * peers). Does NOT perform migration (content caching at this peer). * - * @param cp the other peer involved (sender or receiver, NULL - * for loopback messages where we are both sender and receiver) - * @param message the actual message - * @return #GNUNET_OK if the message was well-formed, - * #GNUNET_SYSERR if the message was malformed (close connection, - * do not cache under any circumstances) + * @param cls the other peer involved + * @param put the actual message */ -int -GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, - const struct GNUNET_MessageHeader *message) +void +handle_p2p_put (void *cls, + const struct PutMessage *put) { - const struct PutMessage *put; + struct GSF_ConnectedPeer *cp = cls; uint16_t msize; size_t dsize; enum GNUNET_BLOCK_Type type; @@ -1721,21 +1707,17 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, double putl; struct PutMigrationContext *pmc; - msize = ntohs (message->size); - if (msize < sizeof (struct PutMessage)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - put = (const struct PutMessage *) message; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received P2P PUT from %s\n", + GNUNET_i2s (GSF_get_peer_performance_data_ (cp)->peer)); + GSF_cover_content_count++; + msize = ntohs (put->header.size); dsize = msize - sizeof (struct PutMessage); type = ntohl (put->type); expiration = GNUNET_TIME_absolute_ntoh (put->expiration); /* do not allow migrated content to live longer than 1 year */ expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), expiration); - if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) - return GNUNET_SYSERR; if (GNUNET_OK != GNUNET_BLOCK_get_key (GSF_block_ctx, type, @@ -1744,7 +1726,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, &query)) { GNUNET_break_op (0); - return GNUNET_SYSERR; + return; } GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# GAP PUT messages received"), @@ -1786,11 +1768,19 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid, &pmc->origin); if (NULL == - GNUNET_DATASTORE_put (GSF_dsh, 0, &query, dsize, &put[1], type, - prq.priority, 1 /* anonymity */ , + GNUNET_DATASTORE_put (GSF_dsh, + 0, + &query, + dsize, + &put[1], + type, + prq.priority, + 1 /* anonymity */ , 0 /* replication */ , - expiration, 1 + prq.priority, MAX_DATASTORE_QUEUE, - &put_migration_continuation, pmc)) + expiration, 1 + prq.priority, + MAX_DATASTORE_QUEUE, + &put_migration_continuation, + pmc)) { put_migration_continuation (pmc, GNUNET_SYSERR, @@ -1802,7 +1792,8 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Choosing not to keep content `%s' (%d/%d)\n", - GNUNET_h2s (&query), active_to_migration, + GNUNET_h2s (&query), + active_to_migration, test_put_load_too_high (prq.priority)); } putl = GNUNET_LOAD_get_load (datastore_put_load); @@ -1826,9 +1817,9 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, putl, active_to_migration, (GNUNET_NO == prq.request_found)); - GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (block_time)); + GSF_block_peer_migration_ (cp, + GNUNET_TIME_relative_to_absolute (block_time)); } - return GNUNET_OK; } diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h index 2765f9b3d..fe4297414 100644 --- a/src/fs/gnunet-service-fs_pr.h +++ b/src/fs/gnunet-service-fs_pr.h @@ -40,34 +40,34 @@ enum GSF_PendingRequestOptions */ GSF_PRO_DEFAULTS = 0, - /** - * Request must only be processed locally. - */ + /** + * Request must only be processed locally. + */ GSF_PRO_LOCAL_ONLY = 1, - /** - * Request must only be forwarded (no routing) - */ + /** + * Request must only be forwarded (no routing) + */ GSF_PRO_FORWARD_ONLY = 2, - /** - * Request persists indefinitely (no expiration). - */ + /** + * Request persists indefinitely (no expiration). + */ GSF_PRO_REQUEST_NEVER_EXPIRES = 4, - /** - * Request is allowed to refresh bloomfilter and change mingle value. - */ + /** + * Request is allowed to refresh bloomfilter and change mingle value. + */ GSF_PRO_BLOOMFILTER_FULL_REFRESH = 8, - /** - * Request priority is allowed to be exceeded. - */ + /** + * Request priority is allowed to be exceeded. + */ GSF_PRO_PRIORITY_UNLIMITED = 16, - /** - * Option mask for typical local requests. - */ + /** + * Option mask for typical local requests. + */ GSF_PRO_LOCAL_REQUEST = (GSF_PRO_BLOOMFILTER_FULL_REFRESH | GSF_PRO_PRIORITY_UNLIMITED | GSF_PRO_REQUEST_NEVER_EXPIRES) }; @@ -288,17 +288,13 @@ GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra, /** * Generate the message corresponding to the given pending request for - * transmission to other peers (or at least determine its size). + * transmission to other peers. * * @param pr request to generate the message for - * @param buf_size number of bytes available in @a buf - * @param buf where to copy the message (can be NULL) - * @return number of bytes needed (if @a buf_size too small) or used + * @return envelope with the request message */ -size_t -GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, - size_t buf_size, - void *buf); +struct GNUNET_MQ_Envelope * +GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr); /** @@ -344,16 +340,12 @@ GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, * this content and possibly passes it on (to local clients or other * peers). Does NOT perform migration (content caching at this peer). * - * @param cp the other peer involved (sender or receiver, NULL - * for loopback messages where we are both sender and receiver) - * @param message the actual message - * @return #GNUNET_OK if the message was well-formed, - * #GNUNET_SYSERR if the message was malformed (close connection, - * do not cache under any circumstances) + * @param cls the other peer involved (sender) + * @param put the actual message */ -int -GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, - const struct GNUNET_MessageHeader *message); +void +handle_p2p_put (void *cls, + const struct PutMessage *put); /** diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c index 59f3772f5..1573bc160 100644 --- a/src/fs/gnunet-service-fs_push.c +++ b/src/fs/gnunet-service-fs_push.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2011 GNUnet e.V. + Copyright (C) 2011, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -102,8 +102,7 @@ struct MigrationReadyBlock /** - * Information about a peer waiting for - * migratable data. + * Information about a peer waiting for migratable data. */ struct MigrationReadyPeer { @@ -123,15 +122,9 @@ struct MigrationReadyPeer struct GSF_ConnectedPeer *peer; /** - * Handle for current transmission request, - * or NULL for none. + * Envelope of the currently pushed message. */ - struct GSF_PeerTransmitHandle *th; - - /** - * Message we are trying to push right now (or NULL) - */ - struct PutMessage *msg; + struct GNUNET_MQ_Envelope *env; }; @@ -163,7 +156,7 @@ static struct GNUNET_DATASTORE_QueueEntry *mig_qe; /** * ID of task that collects blocks for migration. */ -static struct GNUNET_SCHEDULER_Task * mig_task; +static struct GNUNET_SCHEDULER_Task *mig_task; /** * What is the maximum frequency at which we are allowed to @@ -195,8 +188,11 @@ static int value_found; static void delete_migration_block (struct MigrationReadyBlock *mb) { - GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb); - GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE); + GNUNET_CONTAINER_DLL_remove (mig_head, + mig_tail, + mb); + GNUNET_PEER_decrement_rcs (mb->target_list, + MIGRATION_LIST_SIZE); mig_size--; GNUNET_free (mb); } @@ -204,49 +200,11 @@ delete_migration_block (struct MigrationReadyBlock *mb) /** * Find content for migration to this peer. - */ -static void -find_content (struct MigrationReadyPeer *mrp); - - -/** - * Transmit the message currently scheduled for transmission. * - * @param cls the `struct MigrationReadyPeer` - * @param buf_size number of bytes available in @a buf - * @param buf where to copy the message, NULL on error (peer disconnect) - * @return number of bytes copied to @a buf, can be 0 (without indicating an error) + * @param cls a `struct MigrationReadyPeer *` */ -static size_t -transmit_message (void *cls, - size_t buf_size, - void *buf) -{ - struct MigrationReadyPeer *peer = cls; - struct PutMessage *msg; - uint16_t msize; - - peer->th = NULL; - msg = peer->msg; - peer->msg = NULL; - if (NULL == buf) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to migrate content to another peer (disconnect)\n"); - GNUNET_free (msg); - return 0; - } - msize = ntohs (msg->header.size); - GNUNET_assert (msize <= buf_size); - GNUNET_memcpy (buf, msg, msize); - GNUNET_free (msg); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Pushing %u bytes to %s\n", - msize, - GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); - find_content (peer); - return msize; -} +static void +find_content (void *cls); /** @@ -257,31 +215,30 @@ transmit_message (void *cls, * @return #GNUNET_YES if the block was deleted (!) */ static int -transmit_content (struct MigrationReadyPeer *peer, +transmit_content (struct MigrationReadyPeer *mrp, struct MigrationReadyBlock *block) { - size_t msize; struct PutMessage *msg; unsigned int i; struct GSF_PeerPerformanceData *ppd; int ret; - ppd = GSF_get_peer_performance_data_ (peer->peer); - GNUNET_assert (NULL == peer->th); - msize = sizeof (struct PutMessage) + block->size; - msg = GNUNET_malloc (msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); - msg->header.size = htons (msize); + ppd = GSF_get_peer_performance_data_ (mrp->peer); + mrp->env = GNUNET_MQ_msg_extra (msg, + block->size, + GNUNET_MESSAGE_TYPE_FS_PUT); msg->type = htonl (block->type); msg->expiration = GNUNET_TIME_absolute_hton (block->expiration); - GNUNET_memcpy (&msg[1], &block[1], block->size); - peer->msg = msg; + GNUNET_memcpy (&msg[1], + &block[1], + block->size); for (i = 0; i < MIGRATION_LIST_SIZE; i++) { if (block->target_list[i] == 0) { block->target_list[i] = ppd->pid; - GNUNET_PEER_change_rc (block->target_list[i], 1); + GNUNET_PEER_change_rc (block->target_list[i], + 1); break; } } @@ -294,15 +251,13 @@ transmit_content (struct MigrationReadyPeer *peer, { ret = GNUNET_NO; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking for transmission of %u bytes to %s for migration\n", - (unsigned int) msize, - GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); - peer->th = GSF_peer_transmit_ (peer->peer, - GNUNET_NO, 0 /* priority */ , - GNUNET_TIME_UNIT_FOREVER_REL, - msize, - &transmit_message, peer); + GNUNET_MQ_notify_sent (mrp->env, + &find_content, + mrp); + GSF_peer_transmit_ (mrp->peer, + GNUNET_NO, + 0 /* priority */ , + mrp->env); return ret; } @@ -330,12 +285,12 @@ count_targets (struct MigrationReadyBlock *block) * Check if sending this block to this peer would * be a good idea. * - * @param peer target peer + * @param mrp target peer * @param block the block * @return score (>= 0: feasible, negative: infeasible) */ static long -score_content (struct MigrationReadyPeer *peer, +score_content (struct MigrationReadyPeer *mrp, struct MigrationReadyBlock *block) { unsigned int i; @@ -344,14 +299,18 @@ score_content (struct MigrationReadyPeer *peer, struct GNUNET_HashCode hc; uint32_t dist; - ppd = GSF_get_peer_performance_data_ (peer->peer); + ppd = GSF_get_peer_performance_data_ (mrp->peer); for (i = 0; i < MIGRATION_LIST_SIZE; i++) if (block->target_list[i] == ppd->pid) return -1; GNUNET_assert (0 != ppd->pid); - GNUNET_PEER_resolve (ppd->pid, &id); - GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc); - dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc); + GNUNET_PEER_resolve (ppd->pid, + &id); + GNUNET_CRYPTO_hash (&id, + sizeof (struct GNUNET_PeerIdentity), + &hc); + dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, + &hc); /* closer distance, higher score: */ return UINT32_MAX - dist; } @@ -368,17 +327,18 @@ consider_gathering (void); /** * Find content for migration to this peer. * - * @param mrp peer to find content for + * @param cls peer to find content for */ static void -find_content (struct MigrationReadyPeer *mrp) +find_content (void *cls) { + struct MigrationReadyPeer *mrp = cls; struct MigrationReadyBlock *pos; long score; long best_score; struct MigrationReadyBlock *best; - GNUNET_assert (NULL == mrp->th); + mrp->env = NULL; best = NULL; best_score = -1; pos = mig_head; @@ -423,7 +383,8 @@ find_content (struct MigrationReadyPeer *mrp) } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Preparing to push best content to peer\n"); - transmit_content (mrp, best); + transmit_content (mrp, + best); } @@ -454,9 +415,12 @@ consider_gathering () return; if (mig_size >= MAX_MIGRATION_QUEUE) return; - delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); - delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE); - delay = GNUNET_TIME_relative_max (delay, min_migration_delay); + delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + mig_size); + delay = GNUNET_TIME_relative_divide (delay, + MAX_MIGRATION_QUEUE); + delay = GNUNET_TIME_relative_max (delay, + min_migration_delay); if (GNUNET_NO == value_found) { /* wait at least 5s if the datastore is empty */ @@ -467,8 +431,9 @@ consider_gathering () GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling gathering task (queue size: %u)\n", mig_size); - mig_task = - GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL); + mig_task = GNUNET_SCHEDULER_add_delayed (delay, + &gather_migration_blocks, + NULL); } @@ -549,14 +514,12 @@ process_migration_content (void *cls, mig_size++; for (pos = peer_head; NULL != pos; pos = pos->next) { - if (NULL == pos->th) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Preparing to push best content to peer %s\n", - GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer))); - if (GNUNET_YES == transmit_content (pos, mb)) - break; /* 'mb' was freed! */ - } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Preparing to push best content to peer %s\n", + GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer))); + if (GNUNET_YES == transmit_content (pos, + mb)) + break; /* 'mb' was freed! */ } consider_gathering (); } @@ -580,9 +543,11 @@ gather_migration_blocks (void *cls) "Asking datastore for content for replication (queue size: %u)\n", mig_size); value_found = GNUNET_NO; - mig_qe = - GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, - &process_migration_content, NULL); + mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh, + 0, + UINT_MAX, + &process_migration_content, + NULL); if (NULL == mig_qe) consider_gathering (); } @@ -640,19 +605,11 @@ GSF_push_stop_ (struct GSF_ConnectedPeer *peer) break; if (NULL == pos) return; + if (NULL != pos->env) + GNUNET_MQ_send_cancel (pos->env); GNUNET_CONTAINER_DLL_remove (peer_head, peer_tail, pos); - if (NULL != pos->th) - { - GSF_peer_transmit_cancel_ (pos->th); - pos->th = NULL; - } - if (NULL != pos->msg) - { - GNUNET_free (pos->msg); - pos->msg = NULL; - } GNUNET_free (pos); } @@ -664,16 +621,21 @@ void GSF_push_init_ () { enabled = - GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING"); + GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, + "FS", + "CONTENT_PUSHING"); if (GNUNET_YES != enabled) return; if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY", + GNUNET_CONFIGURATION_get_value_time (GSF_cfg, + "fs", + "MIN_MIGRATION_DELAY", &min_migration_delay)) { GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING, - "fs", "MIN_MIGRATION_DELAY", + "fs", + "MIN_MIGRATION_DELAY", _("time required, content pushing disabled")); return; }