226137d7342ef326f240896a6269541a27d42bb3
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32 /**
33  * Collect an instane number of statistics?  May cause excessive IPC.
34  */
35 #define INSANE_STATISTICS GNUNET_NO
36
37 /**
38  * List of GSF_PendingRequests this request plan
39  * participates with.
40  */
41 struct PendingRequestList;
42
43 /**
44  * Transmission plan for a peer.
45  */
46 struct PeerPlan;
47
48
49 /**
50  * DLL of request plans a particular pending request is
51  * involved with.  The corresponding head and tail are
52  * stored in a 'struct GSF_PendingRequest'.  (We need
53  * to be able to lookup all 'plan' entries for a given
54  * request easily).
55  *
56  * There is always one 'struct PendingRequestList'
57  * for each 'struct GSF_RequestPlanReference'.
58  */
59 struct GSF_RequestPlanReference
60 {
61
62   /**
63    * This is a doubly-linked list.
64    */
65   struct GSF_RequestPlanReference *next_PR;
66
67   /**
68    * This is a doubly-linked list.
69    */
70   struct GSF_RequestPlanReference *prev_PR;
71
72   /**
73    * Associated request plan.
74    */
75   struct GSF_RequestPlan *rp;
76
77   /**
78    * Corresponding PendingRequestList.
79    */
80   struct PendingRequestList *prl;
81 };
82
83
84 /**
85  * List of GSF_PendingRequests this request plan
86  * participates with.  The corresponding head and tail
87  * are stored in a 'struct GSF_RequestPlan'. (We need
88  * to be able to lookup all pending requests corresponding
89  * to a given plan entry.)
90  *
91  * There is always one 'struct PendingRequestList'
92  * for each 'struct GSF_RequestPlanReference'.
93  */
94 struct PendingRequestList
95 {
96
97   /**
98    * This is a doubly-linked list.
99    */
100   struct PendingRequestList *next_PE;
101
102   /**
103    * This is a doubly-linked list.
104    */
105   struct PendingRequestList *prev_PE;
106
107   /**
108    * Associated pending request.
109    */
110   struct GSF_PendingRequest *pr;
111
112   /**
113    * Corresponding GSF_RequestPlanReference.
114    */
115   struct GSF_RequestPlanReference *rpr;
116
117 };
118
119
120 /**
121  * Information we keep per request per peer.  This is a doubly-linked
122  * list (with head and tail in the 'struct GSF_PendingRequestData')
123  * with one entry in each heap of each 'struct PeerPlan'.  Each
124  * entry tracks information relevant for this request and this peer.
125  */
126 struct GSF_RequestPlan
127 {
128
129   /**
130    * This is a doubly-linked list.
131    */
132   struct GSF_RequestPlan *next;
133
134   /**
135    * This is a doubly-linked list.
136    */
137   struct GSF_RequestPlan *prev;
138
139   /**
140    * Heap node associated with this request and this peer.
141    */
142   struct GNUNET_CONTAINER_HeapNode *hn;
143
144   /**
145    * The transmission plan for a peer that this request is associated with.
146    */
147   struct PeerPlan *pp;
148
149   /**
150    * Head of list of associated pending requests.
151    */
152   struct PendingRequestList *prl_head;
153
154   /**
155    * Tail of list of associated pending requests.
156    */
157   struct PendingRequestList *prl_tail;
158
159   /**
160    * Earliest time we'd be happy to (re)transmit this request.
161    */
162   struct GNUNET_TIME_Absolute earliest_transmission;
163
164   /**
165    * When was the last time we transmitted this request to this peer? 0 for never.
166    */
167   struct GNUNET_TIME_Absolute last_transmission;
168
169   /**
170    * Current priority for this request for this target.
171    */
172   uint64_t priority;
173
174   /**
175    * How often did we transmit this request to this peer?
176    */
177   unsigned int transmission_counter;
178
179 };
180
181
182 /**
183  * Transmission plan for a peer.
184  */
185 struct PeerPlan
186 {
187   /**
188    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
189    */
190   struct GNUNET_CONTAINER_Heap *priority_heap;
191
192   /**
193    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
194    */
195   struct GNUNET_CONTAINER_Heap *delay_heap;
196
197   /**
198    * Map of queries to plan entries.  All entries in the priority_heap or delay_heap
199    * should be in the plan map.  Note that it IS possible for the plan map to have
200    * multiple entries for the same query.
201    */
202   struct GNUNET_CONTAINER_MultiHashMap *plan_map;
203
204   /**
205    * Current transmission request handle.
206    */
207   struct GSF_PeerTransmitHandle *pth;
208
209   /**
210    * Peer for which this is the plan.
211    */
212   struct GSF_ConnectedPeer *cp;
213
214   /**
215    * Current task for executing the plan.
216    */
217   GNUNET_SCHEDULER_TaskIdentifier task;
218 };
219
220
221 /**
222  * Hash map from peer identities to PeerPlans.
223  */
224 static struct GNUNET_CONTAINER_MultiHashMap *plans;
225
226 /**
227  * Sum of all transmission counters (equals total delay for all plan entries).
228  */
229 static unsigned long long total_delay;
230
231 /**
232  * Number of plan entries.
233  */
234 static unsigned long long plan_count;
235
236
237 /**
238  * Return the query (key in the plan_map) for the given request plan.
239  * Note that this key may change as there can be multiple pending
240  * requests for the same key and we just return _one_ of them; this
241  * particular one might complete while another one might still be
242  * active, hence the lifetime of the returned hash code is NOT
243  * necessarily identical to that of the 'struct GSF_RequestPlan'
244  * given.
245  *
246  * @param rp a request plan
247  * @return the associated query
248  */
249 static const struct GNUNET_HashCode *
250 get_rp_key (struct GSF_RequestPlan *rp)
251 {
252   return &GSF_pending_request_get_data_ (rp->prl_head->pr)->query;
253 }
254
255
256 /**
257  * Figure out when and how to transmit to the given peer.
258  *
259  * @param cls the 'struct GSF_ConnectedPeer' for transmission
260  * @param tc scheduler context
261  */
262 static void
263 schedule_peer_transmission (void *cls,
264                             const struct GNUNET_SCHEDULER_TaskContext *tc);
265
266
267 /**
268  * Insert the given request plan into the heap with the appropriate weight.
269  *
270  * @param pp associated peer's plan
271  * @param rp request to plan
272  */
273 static void
274 plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
275 {
276 #define N ((double)128.0)
277   /**
278    * Running average delay we currently impose.
279    */
280   static double avg_delay;
281
282   struct GSF_PendingRequestData *prd;
283   struct GNUNET_TIME_Relative delay;
284
285   GNUNET_assert (rp->pp == pp);
286   GNUNET_STATISTICS_set (GSF_stats,
287                          gettext_noop ("# average retransmission delay (ms)"),
288                          total_delay * 1000LL / plan_count, GNUNET_NO);
289   prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
290
291   if (rp->transmission_counter < 8)
292     delay =
293         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
294                                        rp->transmission_counter);
295   else if (rp->transmission_counter < 32)
296     delay =
297         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
298                                        8 +
299                                        (1LL << (rp->transmission_counter - 8)));
300   else
301     delay =
302         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
303                                        8 + (1LL << 24));
304   delay.rel_value =
305       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
306                                 delay.rel_value + 1);
307   /* Add 0.01 to avg_delay to avoid division-by-zero later */
308   avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value) / N) + 0.01;
309
310   /*
311    * For the priority, we need to consider a few basic rules:
312    * 1) if we just started requesting (delay is small), we should
313    * virtually always have a priority of zero.
314    * 2) for requests with average latency, our priority should match
315    * the average priority observed on the network
316    * 3) even the longest-running requests should not be WAY out of
317    * the observed average (thus we bound by a factor of 2)
318    * 4) we add +1 to the observed average priority to avoid everyone
319    * staying put at zero (2 * 0 = 0...).
320    *
321    * Using the specific calculation below, we get:
322    *
323    * delay = 0 => priority = 0;
324    * delay = avg delay => priority = running-average-observed-priority;
325    * delay >> avg_delay => priority = 2 * running-average-observed-priority;
326    *
327    * which satisfies all of the rules above.
328    *
329    * Note: M_PI_4 = PI/4 = arctan(1)
330    */
331   rp->priority =
332       round ((GSF_current_priorities +
333               1.0) * atan (delay.rel_value / avg_delay)) / M_PI_4;
334   /* Note: usage of 'round' and 'atan' requires -lm */
335
336   if (rp->transmission_counter != 0)
337     delay.rel_value += TTL_DECREMENT;
338   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
339               "Considering (re)transmission number %u in %llu ms\n",
340               (unsigned int) rp->transmission_counter,
341               (unsigned long long) delay.rel_value);
342   rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
343   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
344               "Earliest (re)transmission for `%s' in %us\n",
345               GNUNET_h2s (&prd->query), rp->transmission_counter);
346   GNUNET_assert (rp->hn == NULL);
347   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value
348       == 0)
349     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
350   else
351     rp->hn =
352         GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp,
353                                       rp->earliest_transmission.abs_value);
354   GNUNET_assert (GNUNET_YES ==
355                  GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
356                                                                get_rp_key (rp),
357                                                                rp));
358   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
359     GNUNET_SCHEDULER_cancel (pp->task);
360   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
361 #undef N
362 }
363
364
365 /**
366  * Get the pending request with the highest TTL from the given plan.
367  *
368  * @param rp plan to investigate
369  * @return pending request with highest TTL
370  */
371 struct GSF_PendingRequest *
372 get_latest (const struct GSF_RequestPlan *rp)
373 {
374   struct GSF_PendingRequest *ret;
375   struct PendingRequestList *prl;
376
377   prl = rp->prl_head;
378   ret = prl->pr;
379   prl = prl->next_PE; 
380   while (NULL != prl)
381   {
382     if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
383         GSF_pending_request_get_data_ (ret)->ttl.abs_value)
384       ret = prl->pr;
385     prl = prl->next_PE;
386   }
387   return ret;
388 }
389
390
391 /**
392  * Function called to get a message for transmission.
393  *
394  * @param cls closure
395  * @param buf_size number of bytes available in buf
396  * @param buf where to copy the message, NULL on error (peer disconnect)
397  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
398  */
399 static size_t
400 transmit_message_callback (void *cls, size_t buf_size, void *buf)
401 {
402   struct PeerPlan *pp = cls;
403   struct GSF_RequestPlan *rp;
404   size_t msize;
405
406   pp->pth = NULL;
407   if (NULL == buf)
408   {
409     /* failed, try again... */
410     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
411     GNUNET_STATISTICS_update (GSF_stats,
412                               gettext_noop
413                               ("# transmission failed (core has no bandwidth)"),
414                               1, GNUNET_NO);
415     return 0;
416   }
417   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
418   if (NULL == rp)
419   {
420     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
421     return 0;
422   }
423   msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
424   if (msize > buf_size)
425   {
426     /* buffer to small (message changed), try again */
427     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
428     return 0;
429   }
430   /* remove from root, add again elsewhere... */
431   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
432   rp->hn = NULL;
433   rp->last_transmission = GNUNET_TIME_absolute_get ();
434   rp->transmission_counter++;
435   total_delay++;
436   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
437               "Executing plan %p executed %u times, planning retransmission\n",
438               rp, rp->transmission_counter);
439   plan (pp, rp);
440   GNUNET_STATISTICS_update (GSF_stats,
441                             gettext_noop
442                             ("# query messages sent to other peers"), 1,
443                             GNUNET_NO);
444   return msize;
445 }
446
447
448 /**
449  * Figure out when and how to transmit to the given peer.
450  *
451  * @param cls the 'struct PeerPlan'
452  * @param tc scheduler context
453  */
454 static void
455 schedule_peer_transmission (void *cls,
456                             const struct GNUNET_SCHEDULER_TaskContext *tc)
457 {
458   struct PeerPlan *pp = cls;
459   struct GSF_RequestPlan *rp;
460   size_t msize;
461   struct GNUNET_TIME_Relative delay;
462
463   pp->task = GNUNET_SCHEDULER_NO_TASK;
464   if (NULL != pp->pth)
465   {
466     GSF_peer_transmit_cancel_ (pp->pth);
467     pp->pth = NULL;
468   }
469   /* move ready requests to priority queue */
470   while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
471          (GNUNET_TIME_absolute_get_remaining
472           (rp->earliest_transmission).rel_value == 0))
473   {
474     GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
475     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
476   }
477   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
478   {
479     /* priority heap (still) empty, check for delay... */
480     rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
481     if (NULL == rp)
482     {
483       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No active requests for plan %p.\n",
484                   pp);
485       return;                   /* both queues empty */
486     }
487     delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
488     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489                 "Sleeping for %llu ms before retrying requests on plan %p.\n",
490                 (unsigned long long) delay.rel_value, pp);
491     GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# delay heap timeout"),
492                            delay.rel_value, GNUNET_NO);
493
494     pp->task =
495         GNUNET_SCHEDULER_add_delayed (delay, &schedule_peer_transmission, pp);
496     return;
497   }
498 #if INSANE_STATISTICS
499   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"),
500                             1, GNUNET_NO);
501 #endif
502   /* process from priority heap */
503   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
504   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp);
505   GNUNET_assert (NULL != rp);
506   msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
507   pp->pth =
508       GSF_peer_transmit_ (pp->cp, GNUNET_YES, rp->priority,
509                           GNUNET_TIME_UNIT_FOREVER_REL, msize,
510                           &transmit_message_callback, pp);
511   GNUNET_assert (NULL != pp->pth);
512 }
513
514
515 /**
516  * Closure for 'merge_pr'.
517  */
518 struct MergeContext
519 {
520
521   struct GSF_PendingRequest *pr;
522
523   int merged;
524
525 };
526
527
528 /**
529  * Iterator that checks if an equivalent request is already
530  * present for this peer.
531  *
532  * @param cls closure
533  * @param query the query
534  * @param element request plan stored at the node
535  * @return GNUNET_YES if we should continue to iterate,
536  *         GNUNET_NO if not (merge success)
537  */
538 static int
539 merge_pr (void *cls, const struct GNUNET_HashCode * query, void *element)
540 {
541   struct MergeContext *mpr = cls;
542   struct GSF_RequestPlan *rp = element;
543   struct GSF_PendingRequestData *prd;
544   struct GSF_RequestPlanReference *rpr;
545   struct PendingRequestList *prl;
546   struct GSF_PendingRequest *latest;
547
548   if (GNUNET_OK !=
549       GSF_pending_request_is_compatible_ (mpr->pr, rp->prl_head->pr))
550     return GNUNET_YES;
551   /* merge new request with existing request plan */
552   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
553   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
554   rpr->rp = rp;
555   rpr->prl = prl;
556   prl->rpr = rpr;
557   prl->pr = mpr->pr;
558   prd = GSF_pending_request_get_data_ (mpr->pr);
559   GNUNET_CONTAINER_MDLL_insert (PR, prd->rpr_head, prd->rpr_tail, rpr);
560   GNUNET_CONTAINER_MDLL_insert (PE, rp->prl_head, rp->prl_tail, prl);
561   mpr->merged = GNUNET_YES;
562 #if INSANE_STATISTICS
563   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests merged"), 1,
564                             GNUNET_NO);
565 #endif
566   latest = get_latest (rp);
567   if (GSF_pending_request_get_data_ (latest)->ttl.abs_value <
568       prd->ttl.abs_value)
569   {
570 #if INSANE_STATISTICS
571     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests refreshed"),
572                               1, GNUNET_NO);
573 #endif
574     rp->transmission_counter = 0;       /* reset */
575   }
576   return GNUNET_NO;
577 }
578
579
580 /**
581  * Create a new query plan entry.
582  *
583  * @param cp peer with the entry
584  * @param pr request with the entry
585  */
586 void
587 GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct GSF_PendingRequest *pr)
588 {
589   const struct GNUNET_PeerIdentity *id;
590   struct PeerPlan *pp;
591   struct GSF_PendingRequestData *prd;
592   struct GSF_RequestPlan *rp;
593   struct GSF_RequestPlanReference *rpr;
594   struct PendingRequestList *prl;
595   struct MergeContext mpc;
596
597   GNUNET_assert (NULL != cp);
598   id = GSF_connected_peer_get_identity2_ (cp);
599   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id->hashPubKey);
600   if (NULL == pp)
601   {
602     pp = GNUNET_malloc (sizeof (struct PeerPlan));
603     pp->plan_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
604     pp->priority_heap =
605         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
606     pp->delay_heap =
607         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
608     pp->cp = cp;
609     GNUNET_CONTAINER_multihashmap_put (plans, 
610                                        &id->hashPubKey, pp,
611                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
612   }
613   mpc.merged = GNUNET_NO;
614   mpc.pr = pr;
615   GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
616                                               &GSF_pending_request_get_data_
617                                               (pr)->query, &merge_pr, &mpc); // 8 MB in 'merge_pr'
618   if (GNUNET_NO != mpc.merged)
619     return;
620   GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
621                                               &GSF_pending_request_get_data_
622                                               (pr)->query, &merge_pr, &mpc);
623   if (GNUNET_NO != mpc.merged)
624     return;
625   plan_count++;
626   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plan entries"), 1,
627                             GNUNET_NO);
628   prd = GSF_pending_request_get_data_ (pr);
629   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
630               "Planning transmission of query `%s' to peer `%s'\n",
631               GNUNET_h2s (&prd->query), GNUNET_i2s (id));
632   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan)); // 8 MB
633   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
634   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
635   rpr->rp = rp;
636   rpr->prl = prl;
637   prl->rpr = rpr;
638   prl->pr = pr;
639   GNUNET_CONTAINER_MDLL_insert (PR, prd->rpr_head, prd->rpr_tail, rpr);
640   GNUNET_CONTAINER_MDLL_insert (PE, rp->prl_head, rp->prl_tail, prl);
641   rp->pp = pp;
642   GNUNET_assert (GNUNET_YES ==
643                  GNUNET_CONTAINER_multihashmap_put (pp->plan_map,
644                                                     get_rp_key (rp), rp,
645                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); // 8 MB
646   plan (pp, rp); // +5 MB (plan/heap-insert)
647 }
648
649
650 /**
651  * Notify the plan about a peer being no longer available;
652  * destroy all entries associated with this peer.
653  *
654  * @param cp connected peer
655  */
656 void
657 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
658 {
659   const struct GNUNET_PeerIdentity *id;
660   struct PeerPlan *pp;
661   struct GSF_RequestPlan *rp;
662   struct GSF_PendingRequestData *prd;
663   struct PendingRequestList *prl;
664
665   id = GSF_connected_peer_get_identity2_ (cp);
666   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id->hashPubKey);
667   if (NULL == pp)
668     return;                     /* nothing was ever planned for this peer */
669   GNUNET_assert (GNUNET_YES ==
670                  GNUNET_CONTAINER_multihashmap_remove (plans, &id->hashPubKey,
671                                                        pp));
672   if (NULL != pp->pth)
673   {
674     GSF_peer_transmit_cancel_ (pp->pth);
675     pp->pth = NULL;
676   }
677   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
678   {
679     GNUNET_SCHEDULER_cancel (pp->task);
680     pp->task = GNUNET_SCHEDULER_NO_TASK;
681   }
682   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
683   {
684     GNUNET_break (GNUNET_YES ==
685                   GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
686                                                         get_rp_key (rp), rp));
687     while (NULL != (prl = rp->prl_head))
688     {
689       GNUNET_CONTAINER_MDLL_remove (PE, rp->prl_head, rp->prl_tail, prl);
690       prd = GSF_pending_request_get_data_ (prl->pr);
691       GNUNET_CONTAINER_MDLL_remove (PR, prd->rpr_head, prd->rpr_tail, prl->rpr);
692       GNUNET_free (prl->rpr);
693       GNUNET_free (prl);
694     }
695     plan_count--;
696     GNUNET_free (rp);
697   }
698   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
699   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
700   {
701     GNUNET_break (GNUNET_YES ==
702                   GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
703                                                         get_rp_key (rp), rp));
704     while (NULL != (prl = rp->prl_head))
705     {
706       GNUNET_CONTAINER_MDLL_remove (PE, rp->prl_head, rp->prl_tail, prl);
707       prd = GSF_pending_request_get_data_ (prl->pr);
708       GNUNET_CONTAINER_MDLL_remove (PR, prd->rpr_head, prd->rpr_tail, prl->rpr);
709       GNUNET_free (prl->rpr);
710       GNUNET_free (prl);
711     }
712     plan_count--;
713     GNUNET_free (rp);
714   }
715   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
716                          plan_count, GNUNET_NO);
717   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
718   GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map);
719   GNUNET_free (pp);
720 }
721
722
723 /**
724  * Get the last transmission attempt time for the request plan list
725  * referenced by 'rpr_head', that was sent to 'sender'
726  *
727  * @param rpr_head request plan reference list to check.
728  * @param sender the peer that we've sent the request to.
729  * @param result the timestamp to fill.
730  * @return GNUNET_YES if 'result' was changed, GNUNET_NO otherwise.
731  */
732 int
733 GSF_request_plan_reference_get_last_transmission_ (
734     struct GSF_RequestPlanReference *rpr_head, struct GSF_ConnectedPeer *sender,
735     struct GNUNET_TIME_Absolute *result)
736 {
737   struct GSF_RequestPlanReference *rpr;
738
739   for (rpr = rpr_head; NULL != rpr; rpr = rpr->next_PR)
740   {
741     if (rpr->rp->pp->cp == sender)
742     {
743       *result = rpr->rp->last_transmission;
744       return GNUNET_YES;
745     }
746   }
747   return GNUNET_NO;
748 }
749
750
751 /**
752  * Notify the plan about a request being done; destroy all entries
753  * associated with this request.
754  *
755  * @param pr request that is done
756  */
757 void
758 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
759 {
760   struct GSF_RequestPlan *rp;
761   struct GSF_PendingRequestData *prd;
762   struct GSF_RequestPlanReference *rpr;
763
764   prd = GSF_pending_request_get_data_ (pr);
765   while (NULL != (rpr = prd->rpr_head))
766   {
767     GNUNET_CONTAINER_MDLL_remove (PR, prd->rpr_head, prd->rpr_tail, rpr);
768     rp = rpr->rp;
769     GNUNET_CONTAINER_MDLL_remove (PE, rp->prl_head, rp->prl_tail, rpr->prl);
770     if (NULL == rp->prl_head)
771     {
772       GNUNET_CONTAINER_heap_remove_node (rp->hn);
773       plan_count--;
774       GNUNET_break (GNUNET_YES ==
775                     GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map,
776                                                           &GSF_pending_request_get_data_
777                                                           (rpr->prl->pr)->query,
778                                                           rp));
779       GNUNET_free (rp);
780     }
781     GNUNET_free (rpr->prl);
782     GNUNET_free (rpr);
783   }
784   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
785                          plan_count, GNUNET_NO);
786 }
787
788
789 /**
790  * Initialize plan subsystem.
791  */
792 void
793 GSF_plan_init ()
794 {
795   plans = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_YES);
796 }
797
798
799 /**
800  * Shutdown plan subsystem.
801  */
802 void
803 GSF_plan_done ()
804 {
805   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (plans));
806   GNUNET_CONTAINER_multihashmap_destroy (plans);
807 }
808
809
810
811 /* end of gnunet-service-fs_pe.h */