From c2bd55c1e68522d685c1ac539949918ad612ce16 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 26 Jan 2010 13:29:45 +0000 Subject: [PATCH] stuff --- src/fs/gnunet-service-fs.c | 343 +++++++++++++++++++++---------------- 1 file changed, 194 insertions(+), 149 deletions(-) diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 0339567f3..57b6dd421 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -50,8 +50,6 @@ #define DEBUG_FS GNUNET_NO - - /** * Signature of a function that is called whenever a datastore * request can be processed (or an entry put on the queue times out). @@ -397,6 +395,13 @@ struct PendingRequest */ struct GNUNET_CORE_InformationRequestContext *irc; + /** + * Handle for an active request for transmission to this peer, or + * NULL. Only used for replies that we are trying to send to a peer + * that we are not yet connected to. + */ + struct GNUNET_CORE_TransmitHandle *cth; + /** * Replies that we have received but were unable to forward yet * (typically non-null only if we have a pending transmission @@ -404,15 +409,6 @@ struct PendingRequest */ struct PendingMessage *replies_pending; - /** - * Pending transmission request with the core service for the target - * peer (for processing of 'replies_pending') or Handle for a - * pending query-request for P2P-transmission with the core service. - * If non-NULL, this request must be cancelled should this struct be - * destroyed! - */ - struct GNUNET_CORE_TransmitHandle *cth; - /** * Pending transmission request for the target client (for processing of * 'replies_pending'). @@ -558,8 +554,8 @@ struct ClientRequestList /** - * Linked list of all clients that we are - * currently processing requests for. + * Linked list of all clients that we are currently processing + * requests for. */ struct ClientList { @@ -658,7 +654,7 @@ struct ConnectedPeer * Handle for an active request for transmission to this * peer, or NULL. */ - struct GNUNET_CORE_PeerRequestHandle *prh; + struct GNUNET_CORE_TransmitHandle *cth; /** * Messages (replies, queries, content migration) we would like to @@ -779,9 +775,6 @@ static uint64_t max_pending_requests = 32; - - - /** * Run the next DS request in our * queue, we're done with the current one. @@ -950,8 +943,7 @@ transmit_local_result (void *cls, /** - * Mingle hash with the mingle_number to - * produce different bits. + * Mingle hash with the mingle_number to produce different bits. */ static void mingle_hash (const GNUNET_HashCode * in, @@ -1114,12 +1106,10 @@ get_processing_delay () /** - * Task that is run for each request with the - * goal of forwarding the associated query to - * other peers. The task should re-schedule - * itself to be re-run once the TTL has expired. - * (or at a later time if more peers should - * be queried earlier). + * Task that is run for each request with the goal of forwarding the + * associated query to other peers. The task should re-schedule + * itself to be re-run once the TTL has expired. (or at a later time + * if more peers should be queried earlier). * * @param cls the requests "struct PendingRequest*" * @param tc task context (unused) @@ -1130,9 +1120,9 @@ forward_request_task (void *cls, /** - * We've selected a peer for forwarding of a query. - * Construct the message and then re-schedule the - * task to forward again to (other) peers. + * We've selected a peer for forwarding of a query. Construct the + * message and then re-schedule the task to forward again to (other) + * peers. * * @param cls closure * @param size number of bytes available in buf @@ -1143,30 +1133,103 @@ static size_t transmit_request_cb (void *cls, size_t size, void *buf) +{ + struct ConnectedPeer *cp = cls; + char *cbuf = buf; + struct GNUNET_PeerIdentity target; + struct PendingMessage *pr; + size_t tot; + + cp->cth = NULL; + tot = 0; + while ( (NULL != (pr = cp->pending_messages)) && + (pr->msize < size - tot) ) + { + memcpy (&cbuf[tot], + &pr[1], + pr->msize); + tot += pr->msize; + cp->pending_messages = pr->next; + GNUNET_free (pr); + } + if (NULL != pr) + { + GNUNET_PEER_resolve (cp->pid, + &target); + cp->cth = GNUNET_CORE_notify_transmit_ready (core, + pr->priority, + GNUNET_TIME_UNIT_FOREVER_REL, + &target, + pr->msize, + &transmit_request_cb, + cp); + } + return tot; +} + + +/** + * Function called after we've tried to reserve a certain amount of + * bandwidth for a reply. Check if we succeeded and if so send our + * query. + * + * @param cls the requests "struct PendingRequest*" + * @param peer identifies the peer + * @param bpm_in set to the current bandwidth limit (receiving) for this peer + * @param bpm_out set to the current bandwidth limit (sending) for this peer + * @param amount set to the amount that was actually reserved or unreserved + * @param preference current traffic preference for the given peer + */ +static void +target_reservation_cb (void *cls, + const struct + GNUNET_PeerIdentity * peer, + unsigned int bpm_in, + unsigned int bpm_out, + int amount, + uint64_t preference) { struct PendingRequest *pr = cls; + struct ConnectedPeer *cp; + struct PendingMessage *pm; + struct PendingMessage *pos; + struct PendingMessage *prev; struct GetMessage *gm; GNUNET_HashCode *ext; char *bfdata; size_t msize; unsigned int k; - pr->cth = NULL; - /* (1) check for timeout */ - if (NULL == buf) + pr->task = GNUNET_SCHEDULER_add_delayed (sched, + get_processing_delay (), // FIXME: longer? + &forward_request_task, + pr); + pr->irc = NULL; + GNUNET_assert (peer != NULL); + if (amount != DBLOCK_SIZE) { - /* timeout, try another peer immediately again */ - pr->task = GNUNET_SCHEDULER_add_with_priority (sched, - GNUNET_SCHEDULER_PRIORITY_IDLE, - &forward_request_task, - pr); - return 0; + /* FIXME: call stats... */ + return; /* this target round failed */ + } + // (2) transmit, update ttl/priority + cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, + &peer->hashPubKey); + if (cp == NULL) + { + /* Peer must have just left; try again immediately */ + pr->task = GNUNET_SCHEDULER_add_now (sched, + &forward_request_task, + pr); + return; } - /* (2) build query message */ + /* build message and insert message into priority queue */ k = 0; // FIXME: count hash codes! msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode); GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); - gm = (struct GetMessage*) buf; + pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); + pm->msize = msize; + pm->priority = 0; // FIXME: calculate priority properly! + gm = (struct GetMessage*) &pm[1]; gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); gm->header.size = htons (msize); gm->type = htonl (pr->type); @@ -1177,91 +1240,52 @@ transmit_request_cb (void *cls, gm->hash_bitmap = htonl (42); gm->query = pr->query; ext = (GNUNET_HashCode*) &gm[1]; + // FIXME: setup "ext[0]..[k-1]" bfdata = (char *) &ext[k]; if (pr->bf != NULL) GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, bfdata, pr->bf_size); - - /* (3) schedule job to do it again (or another peer, etc.) */ - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), // FIXME! - &forward_request_task, - pr); - return msize; -} - -/** - * Function called after we've tried to reserve - * a certain amount of bandwidth for a reply. - * Check if we succeeded and if so send our query. - * - * @param cls the requests "struct PendingRequest*" - * @param peer identifies the peer - * @param bpm_in set to the current bandwidth limit (receiving) for this peer - * @param bpm_out set to the current bandwidth limit (sending) for this peer - * @param amount set to the amount that was actually reserved or unreserved - * @param preference current traffic preference for the given peer - */ -static void -target_reservation_cb (void *cls, - const struct - GNUNET_PeerIdentity * peer, - unsigned int bpm_in, - unsigned int bpm_out, - int amount, - uint64_t preference) -{ - struct PendingRequest *pr = cls; - uint32_t priority; - uint16_t size; - struct GNUNET_TIME_Relative maxdelay; - - pr->irc = NULL; - GNUNET_assert (peer != NULL); - if ( (amount != DBLOCK_SIZE) || - (pr->cth != NULL) ) + prev = NULL; + pos = cp->pending_messages; + while ( (pos != NULL) && + (pm->priority < pos->priority) ) { - /* try again later; FIXME: we may need to un-reserve "amount"? */ - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), // FIXME: longer? - &forward_request_task, - pr); - return; + prev = pos; + pos = pos->next; } - // (2) transmit, update ttl/priority - // FIXME: calculate priority, maxdelay, size properly! - priority = 0; - size = 60000; - maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT; - pr->cth = GNUNET_CORE_notify_transmit_ready (core, - priority, - maxdelay, - peer, - size, - &transmit_request_cb, - pr); - if (pr->cth == NULL) + if (prev == NULL) + cp->pending_messages = pm; + else + prev->next = pm; + pm->next = pos; + if (cp->cth == NULL) + cp->cth = GNUNET_CORE_notify_transmit_ready (core, + cp->pending_messages->priority, + GNUNET_TIME_UNIT_FOREVER_REL, + peer, + msize, + &transmit_request_cb, + cp); + if (cp->cth == NULL) { - /* try again later */ - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), // FIXME: longer? - &forward_request_task, - pr); + /* technically, this should not be a 'break'; but + we don't handle this (rare) case yet, so let's warn + about it... */ + GNUNET_break (0); + // FIXME: now what? } } /** - * Task that is run for each request with the - * goal of forwarding the associated query to - * other peers. The task should re-schedule - * itself to be re-run once the TTL has expired. - * (or at a later time if more peers should - * be queried earlier). + * Task that is run for each request with the goal of forwarding the + * associated query to other peers. The task should re-schedule + * itself to be re-run once the TTL has expired. (or at a later time + * if more peers should be queried earlier). * * @param cls the requests "struct PendingRequest*" * @param tc task context (unused) @@ -1274,15 +1298,6 @@ forward_request_task (void *cls, struct PeerSelectionContext psc; pr->task = GNUNET_SCHEDULER_NO_TASK; - if (pr->cth != NULL) - { - /* we're busy transmitting a result, wait a bit */ - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), - &forward_request_task, - pr); - return; - } /* (1) select target */ psc.pr = pr; psc.target_score = DBL_MIN; @@ -1301,13 +1316,13 @@ forward_request_task (void *cls, /* (2) reserve reply bandwidth */ GNUNET_assert (NULL == pr->irc); pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg, - &psc.target, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - -1, - DBLOCK_SIZE, // FIXME: make dependent on type? - 0, - &target_reservation_cb, - pr); + &psc.target, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + -1, + DBLOCK_SIZE, // FIXME: make dependent on type? + 0, + &target_reservation_cb, + pr); } @@ -1668,8 +1683,6 @@ destroy_pending_request (struct PendingRequest *pr) } if (GNUNET_SCHEDULER_NO_TASK != pr->task) GNUNET_SCHEDULER_cancel (sched, pr->task); - if (NULL != pr->cth) - GNUNET_CORE_notify_transmit_ready_cancel (pr->cth); if (NULL != pr->bf) GNUNET_CONTAINER_bloomfilter_free (pr->bf); if (NULL != pr->th) @@ -1679,6 +1692,8 @@ destroy_pending_request (struct PendingRequest *pr) pr->replies_pending = reply->next; GNUNET_free (reply); } + if (NULL != pr->cth) + GNUNET_CORE_notify_transmit_ready_cancel (pr->cth); GNUNET_PEER_change_rc (pr->source_pid, -1); GNUNET_PEER_change_rc (pr->target_pid, -1); GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off); @@ -1862,12 +1877,23 @@ peer_disconnect_handler (void *cls, GNUNET_PeerIdentity * peer) { struct ConnectedPeer *cp; + struct PendingMessage *pm; cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, &peer->hashPubKey); - GNUNET_PEER_change_rc (cp->pid, -1); - GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); - GNUNET_free (cp); + if (cp != NULL) + { + GNUNET_PEER_change_rc (cp->pid, -1); + GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); + if (NULL != cp->cth) + GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); + while (NULL != (pm = cp->pending_messages)) + { + cp->pending_messages = pm->next; + GNUNET_free (pm); + } + GNUNET_free (cp); + } GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer, &peer->hashPubKey, &destroy_request, @@ -1876,9 +1902,8 @@ peer_disconnect_handler (void *cls, /** - * We're processing a GET request from - * another peer and have decided to forward - * it to other peers. + * We're processing a GET request from another peer and have decided + * to forward it to other peers. * * @param cls our "struct ProcessGetContext *" * @param tc unused @@ -2407,7 +2432,7 @@ transmit_result (void *cls, /** - * Iterator over pending requests. + * We have received a reply; handle it! * * @param cls response (struct ProcessReplyClosure) * @param key our query @@ -2425,6 +2450,7 @@ process_reply (void *cls, struct PendingMessage *reply; struct PutMessage *pm; struct ContentMessage *cm; + struct ConnectedPeer *cp; GNUNET_HashCode chash; GNUNET_HashCode mhash; struct GNUNET_PeerIdentity target; @@ -2472,19 +2498,20 @@ process_reply (void *cls, } if (pr->client == NULL) { + GNUNET_PEER_resolve (pr->source_pid, + &target); + cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, + &target.hashPubKey); msize = sizeof (struct ContentMessage) + prq->size; reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); reply->msize = msize; + reply->priority = (uint32_t) -1; /* send replies first! */ cm = (struct ContentMessage*) &reply[1]; cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT); cm->header.size = htons (msize); cm->type = htonl (prq->type); cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); - reply->next = pr->replies_pending; - pr->replies_pending = reply; memcpy (&reply[1], prq->data, prq->size); - if (pr->cth != NULL) - return GNUNET_YES; max_delay = GNUNET_TIME_UNIT_FOREVER_REL; if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests) { @@ -2494,18 +2521,37 @@ process_reply (void *cls, max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time, eer->start_time); } - GNUNET_PEER_resolve (pr->source_pid, - &target); - pr->cth = GNUNET_CORE_notify_transmit_ready (core, - prio, - max_delay, - &target, - msize, - &transmit_result, - pr); - if (NULL == pr->cth) + + if (cp == NULL) + { + /* FIXME: bound queue size! */ + reply->next = pr->replies_pending; + pr->replies_pending = reply; + if (pr->cth == NULL) + { + /* implicitly tries to connect */ + pr->cth = GNUNET_CORE_notify_transmit_ready (core, + prio, + max_delay, + &target, + msize, + &transmit_result, + pr); + } + } + else { - // FIXME: now what? discard? + /* insert replies always at the head */ + reply->next = cp->pending_messages; + cp->pending_messages = reply; + if (cp->cth == NULL) + cp->cth = GNUNET_CORE_notify_transmit_ready (core, + reply->priority, + GNUNET_TIME_UNIT_FOREVER_REL, + &target, + msize, + &transmit_request_cb, + cp); } } else @@ -2746,7 +2792,6 @@ run (void *cls, sched = s; cfg = c; - requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config connected_peers = GNUNET_CONTAINER_multihashmap_create (64); -- 2.25.1