-do even fewer stats by default
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32 /**
33  * Collect an instane number of statistics?  May cause excessive IPC.
34  */
35 #define INSANE_STATISTICS GNUNET_NO
36
37 /**
38  * List of GSF_PendingRequests this request plan
39  * participates with.
40  */
41 struct PendingRequestList;
42
43 /**
44  * Transmission plan for a peer.
45  */
46 struct PeerPlan;
47
48
49 /**
50  * DLL of request plans a particular pending request is
51  * involved with.
52  */
53 struct GSF_RequestPlanReference
54 {
55
56   /**
57    * This is a doubly-linked list.
58    */
59   struct GSF_RequestPlanReference *next;
60
61   /**
62    * This is a doubly-linked list.
63    */
64   struct GSF_RequestPlanReference *prev;
65
66   /**
67    * Associated request plan.
68    */
69   struct GSF_RequestPlan *rp;
70
71   /**
72    * Corresponding PendingRequestList.
73    */
74   struct PendingRequestList *prl;
75 };
76
77
78 /**
79  * List of GSF_PendingRequests this request plan
80  * participates with.
81  */
82 struct PendingRequestList
83 {
84
85   /**
86    * This is a doubly-linked list.
87    */
88   struct PendingRequestList *next;
89
90   /**
91    * This is a doubly-linked list.
92    */
93   struct PendingRequestList *prev;
94
95   /**
96    * Associated pending request.
97    */
98   struct GSF_PendingRequest *pr;
99
100   /**
101    * Corresponding GSF_RequestPlanReference.
102    */
103   struct GSF_RequestPlanReference *rpr;
104
105 };
106
107
108 /**
109  * Information we keep per request per peer.  This is a doubly-linked
110  * list (with head and tail in the 'struct GSF_PendingRequestData')
111  * with one entry in each heap of each 'struct PeerPlan'.  Each
112  * entry tracks information relevant for this request and this peer.
113  */
114 struct GSF_RequestPlan
115 {
116
117   /**
118    * This is a doubly-linked list.
119    */
120   struct GSF_RequestPlan *next;
121
122   /**
123    * This is a doubly-linked list.
124    */
125   struct GSF_RequestPlan *prev;
126
127   /**
128    * Heap node associated with this request and this peer.
129    */
130   struct GNUNET_CONTAINER_HeapNode *hn;
131
132   /**
133    * The transmission plan for a peer that this request is associated with.
134    */
135   struct PeerPlan *pp;
136
137   /**
138    * Head of list of associated pending requests.
139    */
140   struct PendingRequestList *prl_head;
141
142   /**
143    * Tail of list of associated pending requests.
144    */
145   struct PendingRequestList *prl_tail;
146
147   /**
148    * Earliest time we'd be happy to (re)transmit this request.
149    */
150   struct GNUNET_TIME_Absolute earliest_transmission;
151
152   /**
153    * When was the last time we transmitted this request to this peer? 0 for never.
154    */
155   struct GNUNET_TIME_Absolute last_transmission;
156
157   /**
158    * Current priority for this request for this target.
159    */
160   uint64_t priority;
161
162   /**
163    * How often did we transmit this request to this peer?
164    */
165   unsigned int transmission_counter;
166
167 };
168
169
170 /**
171  * Transmission plan for a peer.
172  */
173 struct PeerPlan
174 {
175   /**
176    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
177    */
178   struct GNUNET_CONTAINER_Heap *priority_heap;
179
180   /**
181    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
182    */
183   struct GNUNET_CONTAINER_Heap *delay_heap;
184
185   /**
186    * Map of queries to plan entries.  All entries in the priority_heap or delay_heap
187    * should be in the plan map.  Note that it IS possible for the plan map to have
188    * multiple entries for the same query.
189    */
190   struct GNUNET_CONTAINER_MultiHashMap *plan_map;
191
192   /**
193    * Current transmission request handle.
194    */
195   struct GSF_PeerTransmitHandle *pth;
196
197   /**
198    * Peer for which this is the plan.
199    */
200   struct GSF_ConnectedPeer *cp;
201
202   /**
203    * Current task for executing the plan.
204    */
205   GNUNET_SCHEDULER_TaskIdentifier task;
206 };
207
208
209 /**
210  * Hash map from peer identities to PeerPlans.
211  */
212 static struct GNUNET_CONTAINER_MultiHashMap *plans;
213
214 /**
215  * Sum of all transmission counters (equals total delay for all plan entries).
216  */
217 static unsigned long long total_delay;
218
219 /**
220  * Number of plan entries.
221  */
222 static unsigned long long plan_count;
223
224
225 /**
226  * Return the query (key in the plan_map) for the given request plan.
227  * Note that this key may change as there can be multiple pending
228  * requests for the same key and we just return _one_ of them; this
229  * particular one might complete while another one might still be
230  * active, hence the lifetime of the returned hash code is NOT
231  * necessarily identical to that of the 'struct GSF_RequestPlan'
232  * given.
233  *
234  * @param rp a request plan
235  * @return the associated query
236  */
237 static const struct GNUNET_HashCode *
238 get_rp_key (struct GSF_RequestPlan *rp)
239 {
240   return &GSF_pending_request_get_data_ (rp->prl_head->pr)->query;
241 }
242
243
244 /**
245  * Figure out when and how to transmit to the given peer.
246  *
247  * @param cls the 'struct GSF_ConnectedPeer' for transmission
248  * @param tc scheduler context
249  */
250 static void
251 schedule_peer_transmission (void *cls,
252                             const struct GNUNET_SCHEDULER_TaskContext *tc);
253
254
255 /**
256  * Insert the given request plan into the heap with the appropriate weight.
257  *
258  * @param pp associated peer's plan
259  * @param rp request to plan
260  */
261 static void
262 plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
263 {
264 #define N ((double)128.0)
265   /**
266    * Running average delay we currently impose.
267    */
268   static double avg_delay;
269
270   struct GSF_PendingRequestData *prd;
271   struct GNUNET_TIME_Relative delay;
272
273   GNUNET_assert (rp->pp == pp);
274   GNUNET_STATISTICS_set (GSF_stats,
275                          gettext_noop ("# average retransmission delay (ms)"),
276                          total_delay * 1000LL / plan_count, GNUNET_NO);
277   prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
278
279   if (rp->transmission_counter < 8)
280     delay =
281         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
282                                        rp->transmission_counter);
283   else if (rp->transmission_counter < 32)
284     delay =
285         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
286                                        8 +
287                                        (1LL << (rp->transmission_counter - 8)));
288   else
289     delay =
290         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
291                                        8 + (1LL << 24));
292   delay.rel_value =
293       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
294                                 delay.rel_value + 1);
295   /* Add 0.01 to avg_delay to avoid division-by-zero later */
296   avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value) / N) + 0.01;
297
298   /*
299    * For the priority, we need to consider a few basic rules:
300    * 1) if we just started requesting (delay is small), we should
301    * virtually always have a priority of zero.
302    * 2) for requests with average latency, our priority should match
303    * the average priority observed on the network
304    * 3) even the longest-running requests should not be WAY out of
305    * the observed average (thus we bound by a factor of 2)
306    * 4) we add +1 to the observed average priority to avoid everyone
307    * staying put at zero (2 * 0 = 0...).
308    *
309    * Using the specific calculation below, we get:
310    *
311    * delay = 0 => priority = 0;
312    * delay = avg delay => priority = running-average-observed-priority;
313    * delay >> avg_delay => priority = 2 * running-average-observed-priority;
314    *
315    * which satisfies all of the rules above.
316    *
317    * Note: M_PI_4 = PI/4 = arctan(1)
318    */
319   rp->priority =
320       round ((GSF_current_priorities +
321               1.0) * atan (delay.rel_value / avg_delay)) / M_PI_4;
322   /* Note: usage of 'round' and 'atan' requires -lm */
323
324   if (rp->transmission_counter != 0)
325     delay.rel_value += TTL_DECREMENT;
326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327               "Considering (re)transmission number %u in %llu ms\n",
328               (unsigned int) rp->transmission_counter,
329               (unsigned long long) delay.rel_value);
330   rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
331   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
332               "Earliest (re)transmission for `%s' in %us\n",
333               GNUNET_h2s (&prd->query), rp->transmission_counter);
334   GNUNET_assert (rp->hn == NULL);
335   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value
336       == 0)
337     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
338   else
339     rp->hn =
340         GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp,
341                                       rp->earliest_transmission.abs_value);
342   GNUNET_assert (GNUNET_YES ==
343                  GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
344                                                                get_rp_key (rp),
345                                                                rp));
346   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
347     GNUNET_SCHEDULER_cancel (pp->task);
348   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
349 #undef N
350 }
351
352
353 /**
354  * Get the pending request with the highest TTL from the given plan.
355  *
356  * @param rp plan to investigate
357  * @return pending request with highest TTL
358  */
359 struct GSF_PendingRequest *
360 get_latest (const struct GSF_RequestPlan *rp)
361 {
362   struct GSF_PendingRequest *ret;
363   struct PendingRequestList *prl;
364
365   prl = rp->prl_head;
366   ret = prl->pr;
367   prl = prl->next;
368   while (NULL != prl)
369   {
370     if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
371         GSF_pending_request_get_data_ (ret)->ttl.abs_value)
372       ret = prl->pr;
373     prl = prl->next;
374   }
375   return ret;
376 }
377
378
379 /**
380  * Function called to get a message for transmission.
381  *
382  * @param cls closure
383  * @param buf_size number of bytes available in buf
384  * @param buf where to copy the message, NULL on error (peer disconnect)
385  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
386  */
387 static size_t
388 transmit_message_callback (void *cls, size_t buf_size, void *buf)
389 {
390   struct PeerPlan *pp = cls;
391   struct GSF_RequestPlan *rp;
392   size_t msize;
393
394   pp->pth = NULL;
395   if (NULL == buf)
396   {
397     /* failed, try again... */
398     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
399     GNUNET_STATISTICS_update (GSF_stats,
400                               gettext_noop
401                               ("# transmission failed (core has no bandwidth)"),
402                               1, GNUNET_NO);
403     return 0;
404   }
405   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
406   if (NULL == rp)
407   {
408     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
409     return 0;
410   }
411   msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
412   if (msize > buf_size)
413   {
414     /* buffer to small (message changed), try again */
415     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
416     return 0;
417   }
418   /* remove from root, add again elsewhere... */
419   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
420   rp->hn = NULL;
421   rp->last_transmission = GNUNET_TIME_absolute_get ();
422   rp->transmission_counter++;
423   total_delay++;
424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425               "Executing plan %p executed %u times, planning retransmission\n",
426               rp, rp->transmission_counter);
427   plan (pp, rp);
428   GNUNET_STATISTICS_update (GSF_stats,
429                             gettext_noop
430                             ("# query messages sent to other peers"), 1,
431                             GNUNET_NO);
432   return msize;
433 }
434
435
436 /**
437  * Figure out when and how to transmit to the given peer.
438  *
439  * @param cls the 'struct PeerPlan'
440  * @param tc scheduler context
441  */
442 static void
443 schedule_peer_transmission (void *cls,
444                             const struct GNUNET_SCHEDULER_TaskContext *tc)
445 {
446   struct PeerPlan *pp = cls;
447   struct GSF_RequestPlan *rp;
448   size_t msize;
449   struct GNUNET_TIME_Relative delay;
450
451   pp->task = GNUNET_SCHEDULER_NO_TASK;
452   if (NULL != pp->pth)
453   {
454     GSF_peer_transmit_cancel_ (pp->pth);
455     pp->pth = NULL;
456   }
457   /* move ready requests to priority queue */
458   while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
459          (GNUNET_TIME_absolute_get_remaining
460           (rp->earliest_transmission).rel_value == 0))
461   {
462     GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
463     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
464   }
465   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
466   {
467     /* priority heap (still) empty, check for delay... */
468     rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
469     if (NULL == rp)
470     {
471       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No active requests for plan %p.\n",
472                   pp);
473       return;                   /* both queues empty */
474     }
475     delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
476     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
477                 "Sleeping for %llu ms before retrying requests on plan %p.\n",
478                 (unsigned long long) delay.rel_value, pp);
479     GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# delay heap timeout"),
480                            delay.rel_value, GNUNET_NO);
481
482     pp->task =
483         GNUNET_SCHEDULER_add_delayed (delay, &schedule_peer_transmission, pp);
484     return;
485   }
486 #if INSANE_STATISTICS
487   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"),
488                             1, GNUNET_NO);
489 #endif
490   /* process from priority heap */
491   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
492   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp);
493   GNUNET_assert (NULL != rp);
494   msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
495   pp->pth =
496       GSF_peer_transmit_ (pp->cp, GNUNET_YES, rp->priority,
497                           GNUNET_TIME_UNIT_FOREVER_REL, msize,
498                           &transmit_message_callback, pp);
499   GNUNET_assert (NULL != pp->pth);
500 }
501
502
503 /**
504  * Closure for 'merge_pr'.
505  */
506 struct MergeContext
507 {
508
509   struct GSF_PendingRequest *pr;
510
511   int merged;
512
513 };
514
515
516 /**
517  * Iterator that checks if an equivalent request is already
518  * present for this peer.
519  *
520  * @param cls closure
521  * @param query the query
522  * @param element request plan stored at the node
523  * @return GNUNET_YES if we should continue to iterate,
524  *         GNUNET_NO if not (merge success)
525  */
526 static int
527 merge_pr (void *cls, const struct GNUNET_HashCode * query, void *element)
528 {
529   struct MergeContext *mpr = cls;
530   struct GSF_RequestPlan *rp = element;
531   struct GSF_PendingRequestData *prd;
532   struct GSF_RequestPlanReference *rpr;
533   struct PendingRequestList *prl;
534   struct GSF_PendingRequest *latest;
535
536   if (GNUNET_OK !=
537       GSF_pending_request_is_compatible_ (mpr->pr, rp->prl_head->pr))
538     return GNUNET_YES;
539   /* merge new request with existing request plan */
540   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
541   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
542   rpr->rp = rp;
543   rpr->prl = prl;
544   prl->rpr = rpr;
545   prl->pr = mpr->pr;
546   prd = GSF_pending_request_get_data_ (mpr->pr);
547   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
548   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
549   mpr->merged = GNUNET_YES;
550 #if INSANE_STATISTICS
551   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests merged"), 1,
552                             GNUNET_NO);
553 #endif
554   latest = get_latest (rp);
555   if (GSF_pending_request_get_data_ (latest)->ttl.abs_value <
556       prd->ttl.abs_value)
557   {
558 #if INSANE_STATISTICS
559     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests refreshed"),
560                               1, GNUNET_NO);
561 #endif
562     rp->transmission_counter = 0;       /* reset */
563   }
564   return GNUNET_NO;
565 }
566
567
568 /**
569  * Create a new query plan entry.
570  *
571  * @param cp peer with the entry
572  * @param pr request with the entry
573  */
574 void
575 GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct GSF_PendingRequest *pr)
576 {
577   const struct GNUNET_PeerIdentity *id;
578   struct PeerPlan *pp;
579   struct GSF_PendingRequestData *prd;
580   struct GSF_RequestPlan *rp;
581   struct GSF_RequestPlanReference *rpr;
582   struct PendingRequestList *prl;
583   struct MergeContext mpc;
584
585   GNUNET_assert (NULL != cp);
586   id = GSF_connected_peer_get_identity2_ (cp);
587   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id->hashPubKey);
588   if (NULL == pp)
589   {
590     pp = GNUNET_malloc (sizeof (struct PeerPlan));
591     pp->plan_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
592     pp->priority_heap =
593         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
594     pp->delay_heap =
595         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
596     pp->cp = cp;
597     GNUNET_CONTAINER_multihashmap_put (plans, 
598                                        &id->hashPubKey, pp,
599                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
600   }
601   mpc.merged = GNUNET_NO;
602   mpc.pr = pr;
603   GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
604                                               &GSF_pending_request_get_data_
605                                               (pr)->query, &merge_pr, &mpc); // 8 MB in 'merge_pr'
606   if (GNUNET_NO != mpc.merged)
607     return;
608   GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
609                                               &GSF_pending_request_get_data_
610                                               (pr)->query, &merge_pr, &mpc);
611   if (GNUNET_NO != mpc.merged)
612     return;
613   plan_count++;
614   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plan entries"), 1,
615                             GNUNET_NO);
616   prd = GSF_pending_request_get_data_ (pr);
617   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618               "Planning transmission of query `%s' to peer `%s'\n",
619               GNUNET_h2s (&prd->query), GNUNET_i2s (id));
620   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan)); // 8 MB
621   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
622   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
623   rpr->rp = rp;
624   rpr->prl = prl;
625   prl->rpr = rpr;
626   prl->pr = pr;
627   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
628   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
629   rp->pp = pp;
630   GNUNET_assert (GNUNET_YES ==
631                  GNUNET_CONTAINER_multihashmap_put (pp->plan_map,
632                                                     get_rp_key (rp), rp,
633                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); // 8 MB
634   plan (pp, rp); // +5 MB (plan/heap-insert)
635 }
636
637
638 /**
639  * Notify the plan about a peer being no longer available;
640  * destroy all entries associated with this peer.
641  *
642  * @param cp connected peer
643  */
644 void
645 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
646 {
647   const struct GNUNET_PeerIdentity *id;
648   struct PeerPlan *pp;
649   struct GSF_RequestPlan *rp;
650   struct GSF_PendingRequestData *prd;
651   struct PendingRequestList *prl;
652
653   id = GSF_connected_peer_get_identity2_ (cp);
654   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id->hashPubKey);
655   if (NULL == pp)
656     return;                     /* nothing was ever planned for this peer */
657   GNUNET_assert (GNUNET_YES ==
658                  GNUNET_CONTAINER_multihashmap_remove (plans, &id->hashPubKey,
659                                                        pp));
660   if (NULL != pp->pth)
661   {
662     GSF_peer_transmit_cancel_ (pp->pth);
663     pp->pth = NULL;
664   }
665   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
666   {
667     GNUNET_SCHEDULER_cancel (pp->task);
668     pp->task = GNUNET_SCHEDULER_NO_TASK;
669   }
670   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
671   {
672     GNUNET_break (GNUNET_YES ==
673                   GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
674                                                         get_rp_key (rp), rp));
675     while (NULL != (prl = rp->prl_head))
676     {
677       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
678       prd = GSF_pending_request_get_data_ (prl->pr);
679       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
680       GNUNET_free (prl->rpr);
681       GNUNET_free (prl);
682     }
683     plan_count--;
684     GNUNET_free (rp);
685   }
686   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
687   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
688   {
689     GNUNET_break (GNUNET_YES ==
690                   GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
691                                                         get_rp_key (rp), rp));
692     while (NULL != (prl = rp->prl_head))
693     {
694       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
695       prd = GSF_pending_request_get_data_ (prl->pr);
696       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
697       GNUNET_free (prl->rpr);
698       GNUNET_free (prl);
699     }
700     plan_count--;
701     GNUNET_free (rp);
702   }
703   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
704                          plan_count, GNUNET_NO);
705   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
706   GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map);
707   GNUNET_free (pp);
708 }
709
710 /**
711  * Get the last transmission attempt time for the request plan list
712  * referenced by 'rpr_head', that was sent to 'sender'
713  *
714  * @param rpr_head request plan reference list to check.
715  * @param sender the peer that we've sent the request to.
716  * @param result the timestamp to fill.
717  * @return GNUNET_YES if 'result' was changed, GNUNET_NO otherwise.
718  */
719 int
720 GSF_request_plan_reference_get_last_transmission_ (
721     struct GSF_RequestPlanReference *rpr_head, struct GSF_ConnectedPeer *sender,
722     struct GNUNET_TIME_Absolute *result)
723 {
724   struct GSF_RequestPlanReference *rpr;
725   for (rpr = rpr_head; rpr; rpr = rpr->next)
726   {
727     if (rpr->rp->pp->cp == sender)
728     {
729       *result = rpr->rp->last_transmission;
730       return GNUNET_YES;
731     }
732   }
733   return GNUNET_NO;
734 }
735
736 /**
737  * Notify the plan about a request being done; destroy all entries
738  * associated with this request.
739  *
740  * @param pr request that is done
741  */
742 void
743 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
744 {
745   struct GSF_RequestPlan *rp;
746   struct GSF_PendingRequestData *prd;
747   struct GSF_RequestPlanReference *rpr;
748
749   prd = GSF_pending_request_get_data_ (pr);
750   while (NULL != (rpr = prd->rpr_head))
751   {
752     GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, rpr);
753     rp = rpr->rp;
754     GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, rpr->prl);
755     if (NULL == rp->prl_head)
756     {
757       GNUNET_CONTAINER_heap_remove_node (rp->hn);
758       plan_count--;
759       GNUNET_break (GNUNET_YES ==
760                     GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map,
761                                                           &GSF_pending_request_get_data_
762                                                           (rpr->prl->pr)->query,
763                                                           rp));
764       GNUNET_free (rp);
765     }
766     GNUNET_free (rpr->prl);
767     GNUNET_free (rpr);
768   }
769   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
770                          plan_count, GNUNET_NO);
771 }
772
773
774 /**
775  * Initialize plan subsystem.
776  */
777 void
778 GSF_plan_init ()
779 {
780   plans = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_YES);
781 }
782
783
784 /**
785  * Shutdown plan subsystem.
786  */
787 void
788 GSF_plan_done ()
789 {
790   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (plans));
791   GNUNET_CONTAINER_multihashmap_destroy (plans);
792 }
793
794
795
796 /* end of gnunet-service-fs_pe.h */