#include "gnunet-service-fs_pr.h"
+/**
+ * List of GSF_PendingRequests this request plan
+ * participates with.
+ */
+struct PendingRequestList;
+
+
+/**
+ * DLL of request plans a particular pending request is
+ * involved with.
+ */
+struct GSF_RequestPlanReference
+{
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct GSF_RequestPlanReference *next;
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct GSF_RequestPlanReference *prev;
+
+ /**
+ * Associated request plan.
+ */
+ struct GSF_RequestPlan *rp;
+
+ /**
+ * Corresponding PendingRequestList.
+ */
+ struct PendingRequestList *prl;
+};
+
+
+/**
+ * List of GSF_PendingRequests this request plan
+ * participates with.
+ */
+struct PendingRequestList
+{
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct PendingRequestList *next;
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct PendingRequestList *prev;
+
+ /**
+ * Array of associated pending requests.
+ */
+ struct GSF_PendingRequest *pr;
+
+ /**
+ * Corresponding GSF_RequestPlanReference.
+ */
+ struct GSF_RequestPlanReference *rpr;
+
+};
+
+
/**
* Information we keep per request per peer. This is a doubly-linked
* list (with head and tail in the 'struct GSF_PendingRequestData')
struct GNUNET_CONTAINER_HeapNode *hn;
/**
- * Array of associated pending requests.
+ * Head of list of associated pending requests.
*/
- struct GSF_PendingRequest **prs;
+ struct PendingRequestList *prl_head;
+
+ /**
+ * Tail of list of associated pending requests.
+ */
+ struct PendingRequestList *prl_tail;
/**
* Earliest time we'd be happy to (re)transmit this request.
*/
struct GNUNET_TIME_Absolute last_transmission;
- /**
- * Number of entries in 'prs'.
- */
- unsigned int prs_length;
-
/**
* Current priority for this request for this target.
*/
gettext_noop ("# average retransmission delay (ms)"),
total_delay * 1000LL / plan_count,
GNUNET_NO);
- prd = GSF_pending_request_get_data_ (rp->prs[0]);
+ prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
// FIXME: calculate 'rp->priority'!
if (rp->transmission_counter < 32)
delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
}
+/**
+ * Get the pending request with the highest TTL from the given plan.
+ *
+ * @param rp plan to investigate
+ * @return pending request with highest TTL
+ */
+struct GSF_PendingRequest *
+get_latest (const struct GSF_RequestPlan *rp)
+{
+ struct GSF_PendingRequest *ret;
+ struct PendingRequestList *prl;
+
+ prl = rp->prl_head;
+ ret = prl->pr;
+ prl = prl->next;
+ while (NULL != prl)
+ {
+ if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
+ GSF_pending_request_get_data_ (ret)->ttl.abs_value)
+ ret = prl->pr;
+ prl = prl->next;
+ }
+ return ret;
+}
+
+
/**
* Function called to get a message for transmission.
*
pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
return 0;
}
- msize = GSF_pending_request_get_message_ (rp->prs[0], buf_size, buf);
+ msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
if (msize > buf_size)
{
/* buffer to small (message changed), try again */
rp);
#endif
GNUNET_assert (NULL != rp);
- msize = GSF_pending_request_get_message_ (rp->prs[0], 0, NULL);
+ msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
pp->pth = GSF_peer_transmit_ (pp->cp,
GNUNET_YES,
rp->priority,
}
+/**
+ * Closure for 'merge_pr'.
+ */
+struct MergeContext
+{
+
+ struct GSF_PendingRequest *pr;
+
+ int merged;
+
+};
+
+
+/**
+ * Iterator that checks if an equivalent request is already
+ * present for this peer.
+ *
+ * @param cls closure
+ * @param node internal node of the heap (ignored)
+ * @param element request plan stored at the node
+ * @param cost cost associated with the node (ignored)
+ * @return GNUNET_YES if we should continue to iterate,
+ * GNUNET_NO if not (merge success)
+ */
+static int
+merge_pr (void *cls,
+ struct GNUNET_CONTAINER_HeapNode *node,
+ void *element,
+ GNUNET_CONTAINER_HeapCostType cost)
+{
+ struct MergeContext *mpr = cls;
+ struct GSF_RequestPlan *rp = element;
+ struct GSF_PendingRequestData *prd;
+ struct GSF_RequestPlanReference *rpr;
+ struct PendingRequestList *prl;
+ struct GSF_PendingRequest *latest;
+
+ if (GNUNET_OK !=
+ GSF_pending_request_is_compatible_ (mpr->pr,
+ rp->prl_head->pr))
+ return GNUNET_YES;
+ /* merge new request with existing request plan */
+ rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
+ prl = GNUNET_malloc (sizeof (struct PendingRequestList));
+ rpr->rp = rp;
+ rpr->prl = prl;
+ prl->rpr = rpr;
+ prl->pr = mpr->pr;
+ prd = GSF_pending_request_get_data_ (mpr->pr);
+ GNUNET_CONTAINER_DLL_insert (prd->rpr_head,
+ prd->rpr_tail,
+ rpr);
+ GNUNET_CONTAINER_DLL_insert (rp->prl_head,
+ rp->prl_tail,
+ prl);
+ mpr->merged = GNUNET_YES;
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# requests merged"),
+ 1,
+ GNUNET_NO);
+ latest = get_latest (rp);
+ if (GSF_pending_request_get_data_ (latest)->ttl.abs_value < prd->ttl.abs_value)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# requests refreshed"),
+ 1,
+ GNUNET_NO);
+ rp->transmission_counter = 0; /* reset */
+ }
+ return GNUNET_NO;
+}
+
+
/**
* Create a new query plan entry.
*
struct PeerPlan *pp;
struct GSF_PendingRequestData *prd;
struct GSF_RequestPlan *rp;
- unsigned int i;
+ struct GSF_RequestPlanReference *rpr;
+ struct PendingRequestList *prl;
+ struct MergeContext mpc;
size_t msize;
GNUNET_assert (NULL != cp);
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
}
msize = GSF_pending_request_get_message_ (pr, 0, NULL);
- prd = GSF_pending_request_get_data_ (pr);
- for (rp = prd->rp_head; NULL != rp; rp = rp->next)
- {
- char mbuf[msize];
- char xbuf[msize];
-
- GNUNET_assert (msize == GSF_pending_request_get_message_ (pr, msize, mbuf));
- if ( (msize == GSF_pending_request_get_message_ (rp->prs[0], msize, xbuf)) &&
- (0 == memcmp (xbuf, mbuf, msize)) )
- {
- /* add request to existing plan */
- GNUNET_array_append (rp->prs,
- rp->prs_length,
- pr);
- for (i=0;i<rp->prs_length;i++)
- if (GSF_pending_request_get_data_ (rp->prs[0])->ttl.abs_value < prd->ttl.abs_value)
- {
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# requests refreshed"),
- 1,
- GNUNET_NO);
- rp->transmission_counter = 0; /* reset */
- break;
- }
- return;
- }
- }
+ mpc.merged = GNUNET_NO;
+ mpc.pr = pr;
+ GNUNET_CONTAINER_heap_iterate (pp->priority_heap, &merge_pr, &mpc);
+ if (mpc.merged != GNUNET_NO)
+ return;
+ GNUNET_CONTAINER_heap_iterate (pp->delay_heap, &merge_pr, &mpc);
+ if (mpc.merged != GNUNET_NO)
+ return;
plan_count++;
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# query plan entries"),
1,
GNUNET_NO);
- rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
+ prd = GSF_pending_request_get_data_ (pr);
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Planning transmission of query `%s' to peer `%s' (%p)\n",
+ "Planning transmission of query `%s' to peer `%s'\n",
GNUNET_h2s (&prd->query),
- GNUNET_i2s (&id),
- rp);
+ GNUNET_i2s (&id));
#endif
- GNUNET_array_append (rp->prs,
- rp->prs_length,
- pr);
- GNUNET_CONTAINER_DLL_insert (prd->rp_head,
- prd->rp_tail,
- rp);
+ rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
+ rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
+ prl = GNUNET_malloc (sizeof (struct PendingRequestList));
+ rpr->rp = rp;
+ rpr->prl = prl;
+ prl->rpr = rpr;
+ prl->pr = pr;
+ GNUNET_CONTAINER_DLL_insert (prd->rpr_head,
+ prd->rpr_tail,
+ rpr);
+ GNUNET_CONTAINER_DLL_insert (rp->prl_head,
+ rp->prl_tail,
+ prl);
plan (pp, rp);
}
struct PeerPlan *pp;
struct GSF_RequestPlan *rp;
struct GSF_PendingRequestData *prd;
- unsigned int i;
+ struct PendingRequestList *prl;
GSF_connected_peer_get_identity_ (cp, &id);
pp = GNUNET_CONTAINER_multihashmap_get (plans,
}
while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
{
- for (i=0;i<rp->prs_length;i++)
+ while (NULL != (prl = rp->prl_head))
{
- prd = GSF_pending_request_get_data_ (rp->prs[i]);
- GNUNET_CONTAINER_DLL_remove (prd->rp_head,
- prd->rp_tail,
- rp);
+ GNUNET_CONTAINER_DLL_remove (rp->prl_head,
+ rp->prl_tail,
+ prl);
+ prd = GSF_pending_request_get_data_ (prl->pr);
+ GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
+ prd->rpr_tail,
+ prl->rpr);
+ GNUNET_free (prl->rpr);
+ GNUNET_free (prl);
}
- plan_count--;
- GNUNET_array_grow (rp->prs, rp->prs_length, 0);
GNUNET_free (rp);
}
GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
{
- for (i=0;i<rp->prs_length;i++)
+ while (NULL != (prl = rp->prl_head))
{
- prd = GSF_pending_request_get_data_ (rp->prs[i]);
- GNUNET_CONTAINER_DLL_remove (prd->rp_head,
- prd->rp_tail,
- rp);
+ GNUNET_CONTAINER_DLL_remove (rp->prl_head,
+ rp->prl_tail,
+ prl);
+ prd = GSF_pending_request_get_data_ (prl->pr);
+ GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
+ prd->rpr_tail,
+ prl->rpr);
+ GNUNET_free (prl->rpr);
+ GNUNET_free (prl);
}
- plan_count--;
- GNUNET_array_grow (rp->prs, rp->prs_length, 0);
GNUNET_free (rp);
}
GNUNET_STATISTICS_set (GSF_stats,
{
struct GSF_RequestPlan *rp;
struct GSF_PendingRequestData *prd;
- unsigned int i;
+ struct GSF_RequestPlanReference *rpr;
prd = GSF_pending_request_get_data_ (pr);
- while (NULL != (rp = prd->rp_head))
+ while (NULL != (rpr = prd->rpr_head))
{
- for (i=0;i<rp->prs_length;i++)
+ GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
+ prd->rpr_tail,
+ rpr);
+ rp = rpr->rp;
+ GNUNET_CONTAINER_DLL_remove (rp->prl_head,
+ rp->prl_tail,
+ rpr->prl);
+ GNUNET_free (rpr->prl);
+ GNUNET_free (rpr);
+ if (rp->prl_head == 0)
{
- if (rp->prs[i] == pr)
- {
- rp->prs[i] = rp->prs[rp->prs_length - 1];
- GNUNET_array_grow (rp->prs, rp->prs_length, rp->prs_length-1);
- if (rp->prs_length == 0)
- {
- GNUNET_CONTAINER_heap_remove_node (rp->hn);
- GNUNET_CONTAINER_DLL_remove (prd->rp_head,
- prd->rp_tail,
- rp);
- plan_count--;
- GNUNET_free (rp);
- break;
- }
- }
+ GNUNET_CONTAINER_heap_remove_node (rp->hn);
+ plan_count--;
+ GNUNET_free (rp);
}
}
GNUNET_STATISTICS_set (GSF_stats,
GNUNET_TIME_UNIT_FOREVER_ABS,
GNUNET_BLOCK_TYPE_ANY,
NULL, 0);
- GSF_pending_request_cancel_ (dpr);
+ GSF_pending_request_cancel_ (dpr, GNUNET_YES);
}
}
GNUNET_STATISTICS_update (GSF_stats,
}
+/**
+ * Test if two pending requests are compatible (would generate
+ * the same query modulo filters and should thus be processed
+ * jointly).
+ *
+ * @param pra a pending request
+ * @param pra another pending request
+ * @return GNUNET_OK if the requests are compatible
+ */
+int
+GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
+ struct GSF_PendingRequest *prb)
+{
+ if ( (pra->public_data.type != prb->public_data.type) ||
+ (0 != memcmp (&pra->public_data.query,
+ &prb->public_data.query,
+ sizeof (GNUNET_HashCode))) ||
+ ( (pra->public_data.type == GNUNET_BLOCK_TYPE_FS_SBLOCK) &&
+ (0 != memcmp (&pra->public_data.namespace,
+ &prb->public_data.namespace,
+ sizeof (GNUNET_HashCode))) ) )
+ return GNUNET_NO;
+ return GNUNET_OK;
+}
+
+
+
/**
* Update a given pending request with additional replies
* that have been seen.
* Explicitly cancel a pending request.
*
* @param pr request to cancel
+ * @param full_cleanup fully purge the request
*/
void
-GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
+GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr,
+ int full_cleanup)
{
+ GSF_LocalLookupContinuation cont;
+
if (NULL == pr_map)
return; /* already cleaned up! */
+ if (GNUNET_YES != full_cleanup)
+ {
+ /* make request inactive (we're no longer interested in more results),
+ but do NOT remove from our data-structures, we still need it there
+ to prevent the request from looping */
+ pr->rh = NULL;
+ if (NULL != (cont = pr->llc_cont))
+ {
+ pr->llc_cont = NULL;
+ cont (pr->llc_cont_cls,
+ pr,
+ pr->local_result);
+ }
+ GSF_plan_notify_request_done_ (pr);
+ if (NULL != pr->qe)
+ {
+ GNUNET_DATASTORE_cancel (pr->qe);
+ pr->qe = NULL;
+ }
+ if (NULL != pr->gh)
+ {
+ GNUNET_DHT_get_stop (pr->gh);
+ pr->gh = NULL;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
+ {
+ GNUNET_SCHEDULER_cancel (pr->warn_task);
+ pr->warn_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ return;
+ }
GNUNET_assert (GNUNET_YES ==
clean_request (NULL, &pr->public_data.query, pr));
}
struct GSF_PendingRequest *pr = value;
GNUNET_HashCode chash;
+ if (NULL == pr->rh)
+ return GNUNET_YES;
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Matched result (type %u) for query `%s' with pending request\n",