-fix the fix
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32
33 /**
34  * List of GSF_PendingRequests this request plan
35  * participates with.
36  */
37 struct PendingRequestList;
38
39 /**
40  * Transmission plan for a peer.
41  */
42 struct PeerPlan;
43
44
45 /**
46  * DLL of request plans a particular pending request is
47  * involved with.
48  */
49 struct GSF_RequestPlanReference
50 {
51
52   /**
53    * This is a doubly-linked list.
54    */
55   struct GSF_RequestPlanReference *next;
56
57   /**
58    * This is a doubly-linked list.
59    */
60   struct GSF_RequestPlanReference *prev;
61
62   /**
63    * Associated request plan.
64    */
65   struct GSF_RequestPlan *rp;
66
67   /**
68    * Corresponding PendingRequestList.
69    */
70   struct PendingRequestList *prl;
71 };
72
73
74 /**
75  * List of GSF_PendingRequests this request plan
76  * participates with.
77  */
78 struct PendingRequestList
79 {
80
81   /**
82    * This is a doubly-linked list.
83    */
84   struct PendingRequestList *next;
85
86   /**
87    * This is a doubly-linked list.
88    */
89   struct PendingRequestList *prev;
90
91   /**
92    * Associated pending request.
93    */
94   struct GSF_PendingRequest *pr;
95
96   /**
97    * Corresponding GSF_RequestPlanReference.
98    */
99   struct GSF_RequestPlanReference *rpr;
100
101 };
102
103
104 /**
105  * Information we keep per request per peer.  This is a doubly-linked
106  * list (with head and tail in the 'struct GSF_PendingRequestData')
107  * with one entry in each heap of each 'struct PeerPlan'.  Each
108  * entry tracks information relevant for this request and this peer.
109  */
110 struct GSF_RequestPlan
111 {
112
113   /**
114    * This is a doubly-linked list.
115    */
116   struct GSF_RequestPlan *next;
117
118   /**
119    * This is a doubly-linked list.
120    */
121   struct GSF_RequestPlan *prev;
122
123   /**
124    * Heap node associated with this request and this peer.
125    */
126   struct GNUNET_CONTAINER_HeapNode *hn;
127
128   /**
129    * The transmission plan for a peer that this request is associated with.
130    */
131   struct PeerPlan *pp;
132
133   /**
134    * Head of list of associated pending requests.
135    */
136   struct PendingRequestList *prl_head;
137
138   /**
139    * Tail of list of associated pending requests.
140    */
141   struct PendingRequestList *prl_tail;
142
143   /**
144    * Earliest time we'd be happy to (re)transmit this request.
145    */
146   struct GNUNET_TIME_Absolute earliest_transmission;
147
148   /**
149    * When was the last time we transmitted this request to this peer? 0 for never.
150    */
151   struct GNUNET_TIME_Absolute last_transmission;
152
153   /**
154    * Current priority for this request for this target.
155    */
156   uint64_t priority;
157
158   /**
159    * How often did we transmit this request to this peer?
160    */
161   unsigned int transmission_counter;
162
163 };
164
165
166 /**
167  * Transmission plan for a peer.
168  */
169 struct PeerPlan
170 {
171   /**
172    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
173    */
174   struct GNUNET_CONTAINER_Heap *priority_heap;
175
176   /**
177    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
178    */
179   struct GNUNET_CONTAINER_Heap *delay_heap;
180
181   /**
182    * Map of queries to plan entries.  All entries in the priority_heap or delay_heap
183    * should be in the plan map.  Note that it IS possible for the plan map to have
184    * multiple entries for the same query.
185    */
186   struct GNUNET_CONTAINER_MultiHashMap *plan_map;
187
188   /**
189    * Current transmission request handle.
190    */
191   struct GSF_PeerTransmitHandle *pth;
192
193   /**
194    * Peer for which this is the plan.
195    */
196   struct GSF_ConnectedPeer *cp;
197
198   /**
199    * Current task for executing the plan.
200    */
201   GNUNET_SCHEDULER_TaskIdentifier task;
202 };
203
204
205 /**
206  * Hash map from peer identities to PeerPlans.
207  */
208 static struct GNUNET_CONTAINER_MultiHashMap *plans;
209
210 /**
211  * Sum of all transmission counters (equals total delay for all plan entries).
212  */
213 static unsigned long long total_delay;
214
215 /**
216  * Number of plan entries.
217  */
218 static unsigned long long plan_count;
219
220
221 /**
222  * Return the query (key in the plan_map) for the given request plan.
223  * Note that this key may change as there can be multiple pending
224  * requests for the same key and we just return _one_ of them; this
225  * particular one might complete while another one might still be
226  * active, hence the lifetime of the returned hash code is NOT
227  * necessarily identical to that of the 'struct GSF_RequestPlan'
228  * given.
229  *
230  * @param rp a request plan
231  * @return the associated query
232  */
233 static const struct GNUNET_HashCode *
234 get_rp_key (struct GSF_RequestPlan *rp)
235 {
236   return &GSF_pending_request_get_data_ (rp->prl_head->pr)->query;
237 }
238
239
240 /**
241  * Figure out when and how to transmit to the given peer.
242  *
243  * @param cls the 'struct GSF_ConnectedPeer' for transmission
244  * @param tc scheduler context
245  */
246 static void
247 schedule_peer_transmission (void *cls,
248                             const struct GNUNET_SCHEDULER_TaskContext *tc);
249
250
251 /**
252  * Insert the given request plan into the heap with the appropriate weight.
253  *
254  * @param pp associated peer's plan
255  * @param rp request to plan
256  */
257 static void
258 plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
259 {
260 #define N ((double)128.0)
261   /**
262    * Running average delay we currently impose.
263    */
264   static double avg_delay;
265
266   struct GSF_PendingRequestData *prd;
267   struct GNUNET_TIME_Relative delay;
268
269   GNUNET_assert (rp->pp == pp);
270   GNUNET_STATISTICS_set (GSF_stats,
271                          gettext_noop ("# average retransmission delay (ms)"),
272                          total_delay * 1000LL / plan_count, GNUNET_NO);
273   prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
274
275   if (rp->transmission_counter < 8)
276     delay =
277         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
278                                        rp->transmission_counter);
279   else if (rp->transmission_counter < 32)
280     delay =
281         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
282                                        8 +
283                                        (1LL << (rp->transmission_counter - 8)));
284   else
285     delay =
286         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
287                                        8 + (1LL << 24));
288   delay.rel_value =
289       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
290                                 delay.rel_value + 1);
291   /* Add 0.01 to avg_delay to avoid division-by-zero later */
292   avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value) / N) + 0.01;
293
294   /*
295    * For the priority, we need to consider a few basic rules:
296    * 1) if we just started requesting (delay is small), we should
297    * virtually always have a priority of zero.
298    * 2) for requests with average latency, our priority should match
299    * the average priority observed on the network
300    * 3) even the longest-running requests should not be WAY out of
301    * the observed average (thus we bound by a factor of 2)
302    * 4) we add +1 to the observed average priority to avoid everyone
303    * staying put at zero (2 * 0 = 0...).
304    *
305    * Using the specific calculation below, we get:
306    *
307    * delay = 0 => priority = 0;
308    * delay = avg delay => priority = running-average-observed-priority;
309    * delay >> avg_delay => priority = 2 * running-average-observed-priority;
310    *
311    * which satisfies all of the rules above.
312    *
313    * Note: M_PI_4 = PI/4 = arctan(1)
314    */
315   rp->priority =
316       round ((GSF_current_priorities +
317               1.0) * atan (delay.rel_value / avg_delay)) / M_PI_4;
318   /* Note: usage of 'round' and 'atan' requires -lm */
319
320   if (rp->transmission_counter != 0)
321     delay.rel_value += TTL_DECREMENT;
322   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
323               "Considering (re)transmission number %u in %llu ms\n",
324               (unsigned int) rp->transmission_counter,
325               (unsigned long long) delay.rel_value);
326   rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
327   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
328               "Earliest (re)transmission for `%s' in %us\n",
329               GNUNET_h2s (&prd->query), rp->transmission_counter);
330   GNUNET_assert (rp->hn == NULL);
331   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value
332       == 0)
333     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
334   else
335     rp->hn =
336         GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp,
337                                       rp->earliest_transmission.abs_value);
338   GNUNET_assert (GNUNET_YES ==
339                  GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
340                                                                get_rp_key (rp),
341                                                                rp));
342   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
343     GNUNET_SCHEDULER_cancel (pp->task);
344   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
345 #undef N
346 }
347
348
349 /**
350  * Get the pending request with the highest TTL from the given plan.
351  *
352  * @param rp plan to investigate
353  * @return pending request with highest TTL
354  */
355 struct GSF_PendingRequest *
356 get_latest (const struct GSF_RequestPlan *rp)
357 {
358   struct GSF_PendingRequest *ret;
359   struct PendingRequestList *prl;
360
361   prl = rp->prl_head;
362   ret = prl->pr;
363   prl = prl->next;
364   while (NULL != prl)
365   {
366     if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
367         GSF_pending_request_get_data_ (ret)->ttl.abs_value)
368       ret = prl->pr;
369     prl = prl->next;
370   }
371   return ret;
372 }
373
374
375 /**
376  * Function called to get a message for transmission.
377  *
378  * @param cls closure
379  * @param buf_size number of bytes available in buf
380  * @param buf where to copy the message, NULL on error (peer disconnect)
381  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
382  */
383 static size_t
384 transmit_message_callback (void *cls, size_t buf_size, void *buf)
385 {
386   struct PeerPlan *pp = cls;
387   struct GSF_RequestPlan *rp;
388   size_t msize;
389
390   pp->pth = NULL;
391   if (NULL == buf)
392   {
393     /* failed, try again... */
394     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
395     GNUNET_STATISTICS_update (GSF_stats,
396                               gettext_noop
397                               ("# transmission failed (core has no bandwidth)"),
398                               1, GNUNET_NO);
399     return 0;
400   }
401   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
402   if (NULL == rp)
403   {
404     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
405     return 0;
406   }
407   msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
408   if (msize > buf_size)
409   {
410     /* buffer to small (message changed), try again */
411     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
412     return 0;
413   }
414   /* remove from root, add again elsewhere... */
415   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
416   rp->hn = NULL;
417   rp->last_transmission = GNUNET_TIME_absolute_get ();
418   rp->transmission_counter++;
419   total_delay++;
420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
421               "Executing plan %p executed %u times, planning retransmission\n",
422               rp, rp->transmission_counter);
423   plan (pp, rp);
424   GNUNET_STATISTICS_update (GSF_stats,
425                             gettext_noop
426                             ("# query messages sent to other peers"), 1,
427                             GNUNET_NO);
428   return msize;
429 }
430
431
432 /**
433  * Figure out when and how to transmit to the given peer.
434  *
435  * @param cls the 'struct PeerPlan'
436  * @param tc scheduler context
437  */
438 static void
439 schedule_peer_transmission (void *cls,
440                             const struct GNUNET_SCHEDULER_TaskContext *tc)
441 {
442   struct PeerPlan *pp = cls;
443   struct GSF_RequestPlan *rp;
444   size_t msize;
445   struct GNUNET_TIME_Relative delay;
446
447   pp->task = GNUNET_SCHEDULER_NO_TASK;
448   if (NULL != pp->pth)
449   {
450     GSF_peer_transmit_cancel_ (pp->pth);
451     pp->pth = NULL;
452   }
453   /* move ready requests to priority queue */
454   while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
455          (GNUNET_TIME_absolute_get_remaining
456           (rp->earliest_transmission).rel_value == 0))
457   {
458     GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
459     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
460   }
461   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
462   {
463     /* priority heap (still) empty, check for delay... */
464     rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
465     if (NULL == rp)
466     {
467       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No active requests for plan %p.\n",
468                   pp);
469       return;                   /* both queues empty */
470     }
471     delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
472     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
473                 "Sleeping for %llu ms before retrying requests on plan %p.\n",
474                 (unsigned long long) delay.rel_value, pp);
475     GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# delay heap timeout"),
476                            delay.rel_value, GNUNET_NO);
477
478     pp->task =
479         GNUNET_SCHEDULER_add_delayed (delay, &schedule_peer_transmission, pp);
480     return;
481   }
482   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"),
483                             1, GNUNET_NO);
484   /* process from priority heap */
485   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
486   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp);
487   GNUNET_assert (NULL != rp);
488   msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
489   pp->pth =
490       GSF_peer_transmit_ (pp->cp, GNUNET_YES, rp->priority,
491                           GNUNET_TIME_UNIT_FOREVER_REL, msize,
492                           &transmit_message_callback, pp);
493   GNUNET_assert (NULL != pp->pth);
494 }
495
496
497 /**
498  * Closure for 'merge_pr'.
499  */
500 struct MergeContext
501 {
502
503   struct GSF_PendingRequest *pr;
504
505   int merged;
506
507 };
508
509
510 /**
511  * Iterator that checks if an equivalent request is already
512  * present for this peer.
513  *
514  * @param cls closure
515  * @param query the query
516  * @param element request plan stored at the node
517  * @return GNUNET_YES if we should continue to iterate,
518  *         GNUNET_NO if not (merge success)
519  */
520 static int
521 merge_pr (void *cls, const struct GNUNET_HashCode * query, void *element)
522 {
523   struct MergeContext *mpr = cls;
524   struct GSF_RequestPlan *rp = element;
525   struct GSF_PendingRequestData *prd;
526   struct GSF_RequestPlanReference *rpr;
527   struct PendingRequestList *prl;
528   struct GSF_PendingRequest *latest;
529
530   if (GNUNET_OK !=
531       GSF_pending_request_is_compatible_ (mpr->pr, rp->prl_head->pr))
532     return GNUNET_YES;
533   /* merge new request with existing request plan */
534   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
535   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
536   rpr->rp = rp;
537   rpr->prl = prl;
538   prl->rpr = rpr;
539   prl->pr = mpr->pr;
540   prd = GSF_pending_request_get_data_ (mpr->pr);
541   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
542   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
543   mpr->merged = GNUNET_YES;
544   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests merged"), 1,
545                             GNUNET_NO);
546   latest = get_latest (rp);
547   if (GSF_pending_request_get_data_ (latest)->ttl.abs_value <
548       prd->ttl.abs_value)
549   {
550     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests refreshed"),
551                               1, GNUNET_NO);
552     rp->transmission_counter = 0;       /* reset */
553   }
554   return GNUNET_NO;
555 }
556
557
558 /**
559  * Create a new query plan entry.
560  *
561  * @param cp peer with the entry
562  * @param pr request with the entry
563  */
564 void
565 GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct GSF_PendingRequest *pr)
566 {
567   const struct GNUNET_PeerIdentity *id;
568   struct PeerPlan *pp;
569   struct GSF_PendingRequestData *prd;
570   struct GSF_RequestPlan *rp;
571   struct GSF_RequestPlanReference *rpr;
572   struct PendingRequestList *prl;
573   struct MergeContext mpc;
574
575   GNUNET_assert (NULL != cp);
576   id = GSF_connected_peer_get_identity2_ (cp);
577   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id->hashPubKey);
578   if (NULL == pp)
579   {
580     pp = GNUNET_malloc (sizeof (struct PeerPlan));
581     pp->plan_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
582     pp->priority_heap =
583         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
584     pp->delay_heap =
585         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
586     pp->cp = cp;
587     GNUNET_CONTAINER_multihashmap_put (plans, 
588                                        &id->hashPubKey, pp,
589                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
590   }
591   mpc.merged = GNUNET_NO;
592   mpc.pr = pr;
593   GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
594                                               &GSF_pending_request_get_data_
595                                               (pr)->query, &merge_pr, &mpc); // 8 MB in 'merge_pr'
596   if (mpc.merged != GNUNET_NO)
597     return;
598   GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
599                                               &GSF_pending_request_get_data_
600                                               (pr)->query, &merge_pr, &mpc);
601   if (mpc.merged != GNUNET_NO)
602     return;
603   plan_count++;
604   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plan entries"), 1,
605                             GNUNET_NO);
606   prd = GSF_pending_request_get_data_ (pr);
607   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
608               "Planning transmission of query `%s' to peer `%s'\n",
609               GNUNET_h2s (&prd->query), GNUNET_i2s (id));
610   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan)); // 8 MB
611   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
612   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
613   rpr->rp = rp;
614   rpr->prl = prl;
615   prl->rpr = rpr;
616   prl->pr = pr;
617   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
618   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
619   rp->pp = pp;
620   GNUNET_assert (GNUNET_YES ==
621                  GNUNET_CONTAINER_multihashmap_put (pp->plan_map,
622                                                     get_rp_key (rp), rp,
623                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); // 8 MB
624   plan (pp, rp); // +5 MB (plan/heap-insert)
625 }
626
627
628 /**
629  * Notify the plan about a peer being no longer available;
630  * destroy all entries associated with this peer.
631  *
632  * @param cp connected peer
633  */
634 void
635 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
636 {
637   const struct GNUNET_PeerIdentity *id;
638   struct PeerPlan *pp;
639   struct GSF_RequestPlan *rp;
640   struct GSF_PendingRequestData *prd;
641   struct PendingRequestList *prl;
642
643   id = GSF_connected_peer_get_identity2_ (cp);
644   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id->hashPubKey);
645   if (NULL == pp)
646     return;                     /* nothing was ever planned for this peer */
647   GNUNET_assert (GNUNET_YES ==
648                  GNUNET_CONTAINER_multihashmap_remove (plans, &id->hashPubKey,
649                                                        pp));
650   if (NULL != pp->pth)
651   {
652     GSF_peer_transmit_cancel_ (pp->pth);
653     pp->pth = NULL;
654   }
655   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
656   {
657     GNUNET_SCHEDULER_cancel (pp->task);
658     pp->task = GNUNET_SCHEDULER_NO_TASK;
659   }
660   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
661   {
662     GNUNET_break (GNUNET_YES ==
663                   GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
664                                                         get_rp_key (rp), rp));
665     while (NULL != (prl = rp->prl_head))
666     {
667       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
668       prd = GSF_pending_request_get_data_ (prl->pr);
669       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
670       GNUNET_free (prl->rpr);
671       GNUNET_free (prl);
672     }
673     plan_count--;
674     GNUNET_free (rp);
675   }
676   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
677   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
678   {
679     GNUNET_break (GNUNET_YES ==
680                   GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
681                                                         get_rp_key (rp), rp));
682     while (NULL != (prl = rp->prl_head))
683     {
684       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
685       prd = GSF_pending_request_get_data_ (prl->pr);
686       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
687       GNUNET_free (prl->rpr);
688       GNUNET_free (prl);
689     }
690     plan_count--;
691     GNUNET_free (rp);
692   }
693   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
694                          plan_count, GNUNET_NO);
695   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
696   GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map);
697   GNUNET_free (pp);
698 }
699
700 /**
701  * Get the last transmission attempt time for the request plan list
702  * referenced by 'rpr_head', that was sent to 'sender'
703  *
704  * @param rpr_head request plan reference list to check.
705  * @param sender the peer that we've sent the request to.
706  * @param result the timestamp to fill.
707  * @return GNUNET_YES if 'result' was changed, GNUNET_NO otherwise.
708  */
709 int
710 GSF_request_plan_reference_get_last_transmission_ (
711     struct GSF_RequestPlanReference *rpr_head, struct GSF_ConnectedPeer *sender,
712     struct GNUNET_TIME_Absolute *result)
713 {
714   struct GSF_RequestPlanReference *rpr;
715   for (rpr = rpr_head; rpr; rpr = rpr->next)
716   {
717     if (rpr->rp->pp->cp == sender)
718     {
719       *result = rpr->rp->last_transmission;
720       return GNUNET_YES;
721     }
722   }
723   return GNUNET_NO;
724 }
725
726 /**
727  * Notify the plan about a request being done; destroy all entries
728  * associated with this request.
729  *
730  * @param pr request that is done
731  */
732 void
733 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
734 {
735   struct GSF_RequestPlan *rp;
736   struct GSF_PendingRequestData *prd;
737   struct GSF_RequestPlanReference *rpr;
738
739   prd = GSF_pending_request_get_data_ (pr);
740   while (NULL != (rpr = prd->rpr_head))
741   {
742     GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, rpr);
743     rp = rpr->rp;
744     GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, rpr->prl);
745     if (NULL == rp->prl_head)
746     {
747       GNUNET_CONTAINER_heap_remove_node (rp->hn);
748       plan_count--;
749       GNUNET_break (GNUNET_YES ==
750                     GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map,
751                                                           &GSF_pending_request_get_data_
752                                                           (rpr->prl->pr)->query,
753                                                           rp));
754       GNUNET_free (rp);
755     }
756     GNUNET_free (rpr->prl);
757     GNUNET_free (rpr);
758   }
759   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
760                          plan_count, GNUNET_NO);
761 }
762
763
764 /**
765  * Initialize plan subsystem.
766  */
767 void
768 GSF_plan_init ()
769 {
770   plans = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_YES);
771 }
772
773
774 /**
775  * Shutdown plan subsystem.
776  */
777 void
778 GSF_plan_done ()
779 {
780   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (plans));
781   GNUNET_CONTAINER_multihashmap_destroy (plans);
782 }
783
784
785
786 /* end of gnunet-service-fs_pe.h */