#define DEBUG_FS GNUNET_NO
-
-
/**
* Signature of a function that is called whenever a datastore
* request can be processed (or an entry put on the queue times out).
*/
struct GNUNET_CORE_InformationRequestContext *irc;
+ /**
+ * Handle for an active request for transmission to this peer, or
+ * NULL. Only used for replies that we are trying to send to a peer
+ * that we are not yet connected to.
+ */
+ struct GNUNET_CORE_TransmitHandle *cth;
+
/**
* Replies that we have received but were unable to forward yet
* (typically non-null only if we have a pending transmission
*/
struct PendingMessage *replies_pending;
- /**
- * Pending transmission request with the core service for the target
- * peer (for processing of 'replies_pending') or Handle for a
- * pending query-request for P2P-transmission with the core service.
- * If non-NULL, this request must be cancelled should this struct be
- * destroyed!
- */
- struct GNUNET_CORE_TransmitHandle *cth;
-
/**
* Pending transmission request for the target client (for processing of
* 'replies_pending').
/**
- * Linked list of all clients that we are
- * currently processing requests for.
+ * Linked list of all clients that we are currently processing
+ * requests for.
*/
struct ClientList
{
* Handle for an active request for transmission to this
* peer, or NULL.
*/
- struct GNUNET_CORE_PeerRequestHandle *prh;
+ struct GNUNET_CORE_TransmitHandle *cth;
/**
* Messages (replies, queries, content migration) we would like to
-
-
-
/**
* Run the next DS request in our
* queue, we're done with the current one.
/**
- * Mingle hash with the mingle_number to
- * produce different bits.
+ * Mingle hash with the mingle_number to produce different bits.
*/
static void
mingle_hash (const GNUNET_HashCode * in,
/**
- * Task that is run for each request with the
- * goal of forwarding the associated query to
- * other peers. The task should re-schedule
- * itself to be re-run once the TTL has expired.
- * (or at a later time if more peers should
- * be queried earlier).
+ * Task that is run for each request with the goal of forwarding the
+ * associated query to other peers. The task should re-schedule
+ * itself to be re-run once the TTL has expired. (or at a later time
+ * if more peers should be queried earlier).
*
* @param cls the requests "struct PendingRequest*"
* @param tc task context (unused)
/**
- * We've selected a peer for forwarding of a query.
- * Construct the message and then re-schedule the
- * task to forward again to (other) peers.
+ * We've selected a peer for forwarding of a query. Construct the
+ * message and then re-schedule the task to forward again to (other)
+ * peers.
*
* @param cls closure
* @param size number of bytes available in buf
transmit_request_cb (void *cls,
size_t size,
void *buf)
+{
+ struct ConnectedPeer *cp = cls;
+ char *cbuf = buf;
+ struct GNUNET_PeerIdentity target;
+ struct PendingMessage *pr;
+ size_t tot;
+
+ cp->cth = NULL;
+ tot = 0;
+ while ( (NULL != (pr = cp->pending_messages)) &&
+ (pr->msize < size - tot) )
+ {
+ memcpy (&cbuf[tot],
+ &pr[1],
+ pr->msize);
+ tot += pr->msize;
+ cp->pending_messages = pr->next;
+ GNUNET_free (pr);
+ }
+ if (NULL != pr)
+ {
+ GNUNET_PEER_resolve (cp->pid,
+ &target);
+ cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+ pr->priority,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &target,
+ pr->msize,
+ &transmit_request_cb,
+ cp);
+ }
+ return tot;
+}
+
+
+/**
+ * Function called after we've tried to reserve a certain amount of
+ * bandwidth for a reply. Check if we succeeded and if so send our
+ * query.
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param peer identifies the peer
+ * @param bpm_in set to the current bandwidth limit (receiving) for this peer
+ * @param bpm_out set to the current bandwidth limit (sending) for this peer
+ * @param amount set to the amount that was actually reserved or unreserved
+ * @param preference current traffic preference for the given peer
+ */
+static void
+target_reservation_cb (void *cls,
+ const struct
+ GNUNET_PeerIdentity * peer,
+ unsigned int bpm_in,
+ unsigned int bpm_out,
+ int amount,
+ uint64_t preference)
{
struct PendingRequest *pr = cls;
+ struct ConnectedPeer *cp;
+ struct PendingMessage *pm;
+ struct PendingMessage *pos;
+ struct PendingMessage *prev;
struct GetMessage *gm;
GNUNET_HashCode *ext;
char *bfdata;
size_t msize;
unsigned int k;
- pr->cth = NULL;
- /* (1) check for timeout */
- if (NULL == buf)
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ get_processing_delay (), // FIXME: longer?
+ &forward_request_task,
+ pr);
+ pr->irc = NULL;
+ GNUNET_assert (peer != NULL);
+ if (amount != DBLOCK_SIZE)
{
- /* timeout, try another peer immediately again */
- pr->task = GNUNET_SCHEDULER_add_with_priority (sched,
- GNUNET_SCHEDULER_PRIORITY_IDLE,
- &forward_request_task,
- pr);
- return 0;
+ /* FIXME: call stats... */
+ return; /* this target round failed */
+ }
+ // (2) transmit, update ttl/priority
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &peer->hashPubKey);
+ if (cp == NULL)
+ {
+ /* Peer must have just left; try again immediately */
+ pr->task = GNUNET_SCHEDULER_add_now (sched,
+ &forward_request_task,
+ pr);
+ return;
}
- /* (2) build query message */
+ /* build message and insert message into priority queue */
k = 0; // FIXME: count hash codes!
msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
- gm = (struct GetMessage*) buf;
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+ pm->msize = msize;
+ pm->priority = 0; // FIXME: calculate priority properly!
+ gm = (struct GetMessage*) &pm[1];
gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
gm->header.size = htons (msize);
gm->type = htonl (pr->type);
gm->hash_bitmap = htonl (42);
gm->query = pr->query;
ext = (GNUNET_HashCode*) &gm[1];
+
// FIXME: setup "ext[0]..[k-1]"
bfdata = (char *) &ext[k];
if (pr->bf != NULL)
GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
bfdata,
pr->bf_size);
-
- /* (3) schedule job to do it again (or another peer, etc.) */
- pr->task = GNUNET_SCHEDULER_add_delayed (sched,
- get_processing_delay (), // FIXME!
- &forward_request_task,
- pr);
- return msize;
-}
-
-/**
- * Function called after we've tried to reserve
- * a certain amount of bandwidth for a reply.
- * Check if we succeeded and if so send our query.
- *
- * @param cls the requests "struct PendingRequest*"
- * @param peer identifies the peer
- * @param bpm_in set to the current bandwidth limit (receiving) for this peer
- * @param bpm_out set to the current bandwidth limit (sending) for this peer
- * @param amount set to the amount that was actually reserved or unreserved
- * @param preference current traffic preference for the given peer
- */
-static void
-target_reservation_cb (void *cls,
- const struct
- GNUNET_PeerIdentity * peer,
- unsigned int bpm_in,
- unsigned int bpm_out,
- int amount,
- uint64_t preference)
-{
- struct PendingRequest *pr = cls;
- uint32_t priority;
- uint16_t size;
- struct GNUNET_TIME_Relative maxdelay;
-
- pr->irc = NULL;
- GNUNET_assert (peer != NULL);
- if ( (amount != DBLOCK_SIZE) ||
- (pr->cth != NULL) )
+ prev = NULL;
+ pos = cp->pending_messages;
+ while ( (pos != NULL) &&
+ (pm->priority < pos->priority) )
{
- /* try again later; FIXME: we may need to un-reserve "amount"? */
- pr->task = GNUNET_SCHEDULER_add_delayed (sched,
- get_processing_delay (), // FIXME: longer?
- &forward_request_task,
- pr);
- return;
+ prev = pos;
+ pos = pos->next;
}
- // (2) transmit, update ttl/priority
- // FIXME: calculate priority, maxdelay, size properly!
- priority = 0;
- size = 60000;
- maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
- pr->cth = GNUNET_CORE_notify_transmit_ready (core,
- priority,
- maxdelay,
- peer,
- size,
- &transmit_request_cb,
- pr);
- if (pr->cth == NULL)
+ if (prev == NULL)
+ cp->pending_messages = pm;
+ else
+ prev->next = pm;
+ pm->next = pos;
+ if (cp->cth == NULL)
+ cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+ cp->pending_messages->priority,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ peer,
+ msize,
+ &transmit_request_cb,
+ cp);
+ if (cp->cth == NULL)
{
- /* try again later */
- pr->task = GNUNET_SCHEDULER_add_delayed (sched,
- get_processing_delay (), // FIXME: longer?
- &forward_request_task,
- pr);
+ /* technically, this should not be a 'break'; but
+ we don't handle this (rare) case yet, so let's warn
+ about it... */
+ GNUNET_break (0);
+ // FIXME: now what?
}
}
/**
- * Task that is run for each request with the
- * goal of forwarding the associated query to
- * other peers. The task should re-schedule
- * itself to be re-run once the TTL has expired.
- * (or at a later time if more peers should
- * be queried earlier).
+ * Task that is run for each request with the goal of forwarding the
+ * associated query to other peers. The task should re-schedule
+ * itself to be re-run once the TTL has expired. (or at a later time
+ * if more peers should be queried earlier).
*
* @param cls the requests "struct PendingRequest*"
* @param tc task context (unused)
struct PeerSelectionContext psc;
pr->task = GNUNET_SCHEDULER_NO_TASK;
- if (pr->cth != NULL)
- {
- /* we're busy transmitting a result, wait a bit */
- pr->task = GNUNET_SCHEDULER_add_delayed (sched,
- get_processing_delay (),
- &forward_request_task,
- pr);
- return;
- }
/* (1) select target */
psc.pr = pr;
psc.target_score = DBL_MIN;
/* (2) reserve reply bandwidth */
GNUNET_assert (NULL == pr->irc);
pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
- &psc.target,
- GNUNET_CONSTANTS_SERVICE_TIMEOUT,
- -1,
- DBLOCK_SIZE, // FIXME: make dependent on type?
- 0,
- &target_reservation_cb,
- pr);
+ &psc.target,
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ -1,
+ DBLOCK_SIZE, // FIXME: make dependent on type?
+ 0,
+ &target_reservation_cb,
+ pr);
}
}
if (GNUNET_SCHEDULER_NO_TASK != pr->task)
GNUNET_SCHEDULER_cancel (sched, pr->task);
- if (NULL != pr->cth)
- GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
if (NULL != pr->bf)
GNUNET_CONTAINER_bloomfilter_free (pr->bf);
if (NULL != pr->th)
pr->replies_pending = reply->next;
GNUNET_free (reply);
}
+ if (NULL != pr->cth)
+ GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
GNUNET_PEER_change_rc (pr->source_pid, -1);
GNUNET_PEER_change_rc (pr->target_pid, -1);
GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
GNUNET_PeerIdentity * peer)
{
struct ConnectedPeer *cp;
+ struct PendingMessage *pm;
cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
&peer->hashPubKey);
- GNUNET_PEER_change_rc (cp->pid, -1);
- GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
- GNUNET_free (cp);
+ if (cp != NULL)
+ {
+ GNUNET_PEER_change_rc (cp->pid, -1);
+ GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
+ if (NULL != cp->cth)
+ GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ while (NULL != (pm = cp->pending_messages))
+ {
+ cp->pending_messages = pm->next;
+ GNUNET_free (pm);
+ }
+ GNUNET_free (cp);
+ }
GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
&peer->hashPubKey,
&destroy_request,
/**
- * We're processing a GET request from
- * another peer and have decided to forward
- * it to other peers.
+ * We're processing a GET request from another peer and have decided
+ * to forward it to other peers.
*
* @param cls our "struct ProcessGetContext *"
* @param tc unused
/**
- * Iterator over pending requests.
+ * We have received a reply; handle it!
*
* @param cls response (struct ProcessReplyClosure)
* @param key our query
struct PendingMessage *reply;
struct PutMessage *pm;
struct ContentMessage *cm;
+ struct ConnectedPeer *cp;
GNUNET_HashCode chash;
GNUNET_HashCode mhash;
struct GNUNET_PeerIdentity target;
}
if (pr->client == NULL)
{
+ GNUNET_PEER_resolve (pr->source_pid,
+ &target);
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &target.hashPubKey);
msize = sizeof (struct ContentMessage) + prq->size;
reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
reply->msize = msize;
+ reply->priority = (uint32_t) -1; /* send replies first! */
cm = (struct ContentMessage*) &reply[1];
cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
cm->header.size = htons (msize);
cm->type = htonl (prq->type);
cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
- reply->next = pr->replies_pending;
- pr->replies_pending = reply;
memcpy (&reply[1], prq->data, prq->size);
- if (pr->cth != NULL)
- return GNUNET_YES;
max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
{
max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
eer->start_time);
}
- GNUNET_PEER_resolve (pr->source_pid,
- &target);
- pr->cth = GNUNET_CORE_notify_transmit_ready (core,
- prio,
- max_delay,
- &target,
- msize,
- &transmit_result,
- pr);
- if (NULL == pr->cth)
+
+ if (cp == NULL)
+ {
+ /* FIXME: bound queue size! */
+ reply->next = pr->replies_pending;
+ pr->replies_pending = reply;
+ if (pr->cth == NULL)
+ {
+ /* implicitly tries to connect */
+ pr->cth = GNUNET_CORE_notify_transmit_ready (core,
+ prio,
+ max_delay,
+ &target,
+ msize,
+ &transmit_result,
+ pr);
+ }
+ }
+ else
{
- // FIXME: now what? discard?
+ /* insert replies always at the head */
+ reply->next = cp->pending_messages;
+ cp->pending_messages = reply;
+ if (cp->cth == NULL)
+ cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+ reply->priority,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &target,
+ msize,
+ &transmit_request_cb,
+ cp);
}
}
else
sched = s;
cfg = c;
-
requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
connected_peers = GNUNET_CONTAINER_multihashmap_create (64);