e85cafe90ca5e49c829874885a52ba9703af9976
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32
33 /**
34  * List of GSF_PendingRequests this request plan
35  * participates with.
36  */
37 struct PendingRequestList;
38
39 /**
40  * Transmission plan for a peer.
41  */
42 struct PeerPlan;
43
44
45 /**
46  * DLL of request plans a particular pending request is
47  * involved with.
48  */
49 struct GSF_RequestPlanReference
50 {
51
52   /**
53    * This is a doubly-linked list.
54    */
55   struct GSF_RequestPlanReference *next;
56
57   /**
58    * This is a doubly-linked list.
59    */
60   struct GSF_RequestPlanReference *prev;
61
62   /**
63    * Associated request plan.
64    */
65   struct GSF_RequestPlan *rp;
66
67   /**
68    * Corresponding PendingRequestList.
69    */
70   struct PendingRequestList *prl;
71 };
72
73
74 /**
75  * List of GSF_PendingRequests this request plan
76  * participates with.
77  */
78 struct PendingRequestList
79 {
80
81   /**
82    * This is a doubly-linked list.
83    */
84   struct PendingRequestList *next;
85
86   /**
87    * This is a doubly-linked list.
88    */
89   struct PendingRequestList *prev;
90
91   /**
92    * Associated pending request.
93    */
94   struct GSF_PendingRequest *pr;
95
96   /**
97    * Corresponding GSF_RequestPlanReference.
98    */
99   struct GSF_RequestPlanReference *rpr;
100
101 };
102
103
104 /**
105  * Information we keep per request per peer.  This is a doubly-linked
106  * list (with head and tail in the 'struct GSF_PendingRequestData')
107  * with one entry in each heap of each 'struct PeerPlan'.  Each
108  * entry tracks information relevant for this request and this peer.
109  */
110 struct GSF_RequestPlan
111 {
112
113   /**
114    * This is a doubly-linked list.
115    */
116   struct GSF_RequestPlan *next;
117
118   /**
119    * This is a doubly-linked list.
120    */
121   struct GSF_RequestPlan *prev;
122
123   /**
124    * Heap node associated with this request and this peer.
125    */
126   struct GNUNET_CONTAINER_HeapNode *hn;
127
128   /**
129    * The transmission plan for a peer that this request is associated with.
130    */
131   struct PeerPlan *pp;
132
133   /**
134    * Head of list of associated pending requests.
135    */
136   struct PendingRequestList *prl_head;
137
138   /**
139    * Tail of list of associated pending requests.
140    */
141   struct PendingRequestList *prl_tail;
142
143   /**
144    * Earliest time we'd be happy to (re)transmit this request.
145    */
146   struct GNUNET_TIME_Absolute earliest_transmission;
147
148   /**
149    * When was the last time we transmitted this request to this peer? 0 for never.
150    */
151   struct GNUNET_TIME_Absolute last_transmission;
152
153   /**
154    * Current priority for this request for this target.
155    */
156   uint64_t priority;
157
158   /**
159    * How often did we transmit this request to this peer?
160    */
161   unsigned int transmission_counter;
162
163 };
164
165
166 /**
167  * Transmission plan for a peer.
168  */
169 struct PeerPlan
170 {
171   /**
172    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
173    */
174   struct GNUNET_CONTAINER_Heap *priority_heap;
175
176   /**
177    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
178    */
179   struct GNUNET_CONTAINER_Heap *delay_heap;
180
181   /**
182    * Map of queries to plan entries.  All entries in the priority_heap or delay_heap
183    * should be in the plan map.  Note that it IS possible for the plan map to have
184    * multiple entries for the same query.
185    */
186   struct GNUNET_CONTAINER_MultiHashMap *plan_map;
187
188   /**
189    * Current transmission request handle.
190    */
191   struct GSF_PeerTransmitHandle *pth;
192
193   /**
194    * Peer for which this is the plan.
195    */
196   struct GSF_ConnectedPeer *cp;
197
198   /**
199    * Current task for executing the plan.
200    */
201   GNUNET_SCHEDULER_TaskIdentifier task;
202 };
203
204
205 /**
206  * Hash map from peer identities to PeerPlans.
207  */
208 static struct GNUNET_CONTAINER_MultiHashMap *plans;
209
210 /**
211  * Sum of all transmission counters (equals total delay for all plan entries).
212  */
213 static unsigned long long total_delay;
214
215 /**
216  * Number of plan entries.
217  */
218 static unsigned long long plan_count;
219
220
221 /**
222  * Return the query (key in the plan_map) for the given request plan.
223  *
224  * @param rp a request plan
225  * @return the associated query
226  */
227 static const GNUNET_HashCode *
228 get_rp_key (struct GSF_RequestPlan *rp)
229 {
230   return &GSF_pending_request_get_data_ (rp->prl_head->pr)->query;
231 }
232
233
234 /**
235  * Figure out when and how to transmit to the given peer.
236  *
237  * @param cls the 'struct GSF_ConnectedPeer' for transmission
238  * @param tc scheduler context
239  */
240 static void
241 schedule_peer_transmission (void *cls,
242                             const struct GNUNET_SCHEDULER_TaskContext *tc);
243
244
245 /**
246  * Insert the given request plan into the heap with the appropriate weight.
247  *
248  * @param pp associated peer's plan
249  * @param rp request to plan
250  */
251 static void
252 plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
253 {
254   struct GSF_PendingRequestData *prd;
255   struct GNUNET_TIME_Relative delay;
256
257   GNUNET_assert (rp->pp == pp);
258   GNUNET_STATISTICS_set (GSF_stats,
259                          gettext_noop ("# average retransmission delay (ms)"),
260                          total_delay * 1000LL / plan_count, GNUNET_NO);
261   prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
262   // FIXME: calculate 'rp->priority'!
263   if (rp->transmission_counter < 8)
264     delay =
265         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
266                                        rp->transmission_counter);
267   else if (rp->transmission_counter < 32)
268     delay =
269         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
270                                        8 + (1LL << (rp->transmission_counter - 8)));
271   else
272     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 
273                                            8 + (1LL << 24));
274   delay.rel_value = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
275                                               delay.rel_value + 1);
276   if (rp->transmission_counter != 0)
277     delay.rel_value += TTL_DECREMENT;
278   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
279               "Considering (re)transmission number %u in %llu ms\n",
280               (unsigned int) rp->transmission_counter,
281               (unsigned long long) delay.rel_value);
282   rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
283 #if DEBUG_FS
284   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
285               "Earliest (re)transmission for `%s' in %us\n",
286               GNUNET_h2s (&prd->query), rp->transmission_counter);
287 #endif
288
289   GNUNET_assert (rp->hn == NULL);
290   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value
291       == 0)
292     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
293   else
294     rp->hn =
295         GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp,
296                                       rp->earliest_transmission.abs_value);
297   GNUNET_assert (GNUNET_YES ==
298                  GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
299                                                                get_rp_key (rp),
300                                                                rp));
301   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
302     GNUNET_SCHEDULER_cancel (pp->task);
303   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
304 }
305
306
307 /**
308  * Get the pending request with the highest TTL from the given plan.
309  *
310  * @param rp plan to investigate
311  * @return pending request with highest TTL
312  */
313 struct GSF_PendingRequest *
314 get_latest (const struct GSF_RequestPlan *rp)
315 {
316   struct GSF_PendingRequest *ret;
317   struct PendingRequestList *prl;
318
319   prl = rp->prl_head;
320   ret = prl->pr;
321   prl = prl->next;
322   while (NULL != prl)
323   {
324     if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
325         GSF_pending_request_get_data_ (ret)->ttl.abs_value)
326       ret = prl->pr;
327     prl = prl->next;
328   }
329   return ret;
330 }
331
332
333 /**
334  * Function called to get a message for transmission.
335  *
336  * @param cls closure
337  * @param buf_size number of bytes available in buf
338  * @param buf where to copy the message, NULL on error (peer disconnect)
339  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
340  */
341 static size_t
342 transmit_message_callback (void *cls, size_t buf_size, void *buf)
343 {
344   struct PeerPlan *pp = cls;
345   struct GSF_RequestPlan *rp;
346   size_t msize;
347
348   pp->pth = NULL;
349   if (NULL == buf)
350   {
351     /* failed, try again... */
352     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
353     GNUNET_STATISTICS_update (GSF_stats,
354                               gettext_noop ("# transmission failed (core has no bandwidth)"), 1,
355                               GNUNET_NO);
356     return 0;
357   }
358   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
359   if (NULL == rp)
360   {
361     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
362     return 0;
363   }
364   msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
365   if (msize > buf_size)
366   {
367     /* buffer to small (message changed), try again */
368     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
369     return 0;
370   }
371   /* remove from root, add again elsewhere... */
372   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
373   rp->hn = NULL;
374   rp->last_transmission = GNUNET_TIME_absolute_get ();
375   rp->transmission_counter++;
376   total_delay++;
377 #if DEBUG_FS
378   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
379               "Executing plan %p executed %u times, planning retransmission\n",
380               rp, rp->transmission_counter);
381 #endif
382   plan (pp, rp);
383   GNUNET_STATISTICS_update (GSF_stats,
384                             gettext_noop
385                             ("# query messages sent to other peers"), 1,
386                             GNUNET_NO);
387   return msize;
388 }
389
390
391 /**
392  * Figure out when and how to transmit to the given peer.
393  *
394  * @param cls the 'struct PeerPlan'
395  * @param tc scheduler context
396  */
397 static void
398 schedule_peer_transmission (void *cls,
399                             const struct GNUNET_SCHEDULER_TaskContext *tc)
400 {
401   struct PeerPlan *pp = cls;
402   struct GSF_RequestPlan *rp;
403   size_t msize;
404   struct GNUNET_TIME_Relative delay;
405
406   pp->task = GNUNET_SCHEDULER_NO_TASK;
407   if (pp->pth != NULL)
408   {
409     GSF_peer_transmit_cancel_ (pp->pth);
410     pp->pth = NULL;
411   }
412   /* move ready requests to priority queue */
413   while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
414          (GNUNET_TIME_absolute_get_remaining
415           (rp->earliest_transmission).rel_value == 0))
416   {
417     GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
418     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
419   }
420   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
421   {
422     /* priority heap (still) empty, check for delay... */
423     rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
424     if (NULL == rp)
425     {
426 #if DEBUG_FS
427       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No active requests for plan %p.\n",
428                   pp);
429 #endif
430       return;                   /* both queues empty */
431     }
432     delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
433 #if DEBUG_FS
434     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
435                 "Sleeping for %llu ms before retrying requests on plan %p.\n",
436                 (unsigned long long)
437                 delay.rel_value, pp);
438 #endif
439     GNUNET_STATISTICS_set (GSF_stats,
440                            gettext_noop ("# delay heap timeout"), 
441                            delay.rel_value,
442                            GNUNET_NO);
443     
444     pp->task =
445       GNUNET_SCHEDULER_add_delayed (delay,
446                                       &schedule_peer_transmission, pp);
447     return;
448   }
449   GNUNET_STATISTICS_update (GSF_stats,
450                             gettext_noop ("# query plans executed"), 1,
451                             GNUNET_NO);
452   /* process from priority heap */
453   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
454 #if DEBUG_FS > 1
455   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp);
456 #endif
457   GNUNET_assert (NULL != rp);
458   msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
459   pp->pth =
460       GSF_peer_transmit_ (pp->cp, GNUNET_YES, rp->priority,
461                           GNUNET_TIME_UNIT_FOREVER_REL, msize,
462                           &transmit_message_callback, pp);
463   GNUNET_assert (NULL != pp->pth);
464 }
465
466
467 /**
468  * Closure for 'merge_pr'.
469  */
470 struct MergeContext
471 {
472
473   struct GSF_PendingRequest *pr;
474
475   int merged;
476
477 };
478
479
480 /**
481  * Iterator that checks if an equivalent request is already
482  * present for this peer.
483  *
484  * @param cls closure
485  * @param query the query
486  * @param element request plan stored at the node
487  * @return GNUNET_YES if we should continue to iterate,
488  *         GNUNET_NO if not (merge success)
489  */
490 static int
491 merge_pr (void *cls, 
492           const GNUNET_HashCode *query,
493           void *element)
494 {
495   struct MergeContext *mpr = cls;
496   struct GSF_RequestPlan *rp = element;
497   struct GSF_PendingRequestData *prd;
498   struct GSF_RequestPlanReference *rpr;
499   struct PendingRequestList *prl;
500   struct GSF_PendingRequest *latest;
501
502   if (GNUNET_OK !=
503       GSF_pending_request_is_compatible_ (mpr->pr, rp->prl_head->pr))
504     return GNUNET_YES;
505   /* merge new request with existing request plan */
506   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
507   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
508   rpr->rp = rp;
509   rpr->prl = prl;
510   prl->rpr = rpr;
511   prl->pr = mpr->pr;
512   prd = GSF_pending_request_get_data_ (mpr->pr);
513   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
514   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
515   mpr->merged = GNUNET_YES;
516   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests merged"), 1,
517                             GNUNET_NO);
518   latest = get_latest (rp);
519   if (GSF_pending_request_get_data_ (latest)->ttl.abs_value <
520       prd->ttl.abs_value)
521   {
522     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests refreshed"),
523                               1, GNUNET_NO);
524     rp->transmission_counter = 0;       /* reset */
525   }
526   return GNUNET_NO;
527 }
528
529
530 /**
531  * Create a new query plan entry.
532  *
533  * @param cp peer with the entry
534  * @param pr request with the entry
535  */
536 void
537 GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct GSF_PendingRequest *pr)
538 {
539   struct GNUNET_PeerIdentity id;
540   struct PeerPlan *pp;
541   struct GSF_PendingRequestData *prd;
542   struct GSF_RequestPlan *rp;
543   struct GSF_RequestPlanReference *rpr;
544   struct PendingRequestList *prl;
545   struct MergeContext mpc;
546
547   GNUNET_assert (NULL != cp);
548   GSF_connected_peer_get_identity_ (cp, &id);
549   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id.hashPubKey);
550   if (NULL == pp)
551   {
552     pp = GNUNET_malloc (sizeof (struct PeerPlan));
553     pp->plan_map =
554       GNUNET_CONTAINER_multihashmap_create (128);
555     pp->priority_heap =
556         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
557     pp->delay_heap =
558         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
559     pp->cp = cp;
560     GNUNET_CONTAINER_multihashmap_put (plans, &id.hashPubKey, pp,
561                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
562   }
563   mpc.merged = GNUNET_NO;
564   mpc.pr = pr;
565   GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
566                                               &GSF_pending_request_get_data_ (pr)->query,
567                                               &merge_pr, &mpc);
568   if (mpc.merged != GNUNET_NO)
569     return;
570   GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
571                                               &GSF_pending_request_get_data_ (pr)->query,
572                                               &merge_pr, &mpc);
573   if (mpc.merged != GNUNET_NO)
574     return;
575   plan_count++;
576   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plan entries"), 1,
577                             GNUNET_NO);
578   prd = GSF_pending_request_get_data_ (pr);
579 #if DEBUG_FS
580   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
581               "Planning transmission of query `%s' to peer `%s'\n",
582               GNUNET_h2s (&prd->query), GNUNET_i2s (&id));
583 #endif
584   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
585   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
586   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
587   rpr->rp = rp;
588   rpr->prl = prl;
589   prl->rpr = rpr;
590   prl->pr = pr;
591   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
592   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
593   rp->pp = pp;
594   GNUNET_assert (GNUNET_YES ==
595                  GNUNET_CONTAINER_multihashmap_put (pp->plan_map,
596                                                     get_rp_key (rp),
597                                                     rp,
598                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
599   plan (pp, rp);
600 }
601
602
603 /**
604  * Notify the plan about a peer being no longer available;
605  * destroy all entries associated with this peer.
606  *
607  * @param cp connected peer
608  */
609 void
610 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
611 {
612   struct GNUNET_PeerIdentity id;
613   struct PeerPlan *pp;
614   struct GSF_RequestPlan *rp;
615   struct GSF_PendingRequestData *prd;
616   struct PendingRequestList *prl;
617
618   GSF_connected_peer_get_identity_ (cp, &id);
619   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id.hashPubKey);
620   if (NULL == pp)
621     return;                     /* nothing was ever planned for this peer */
622   GNUNET_assert (GNUNET_YES ==
623                  GNUNET_CONTAINER_multihashmap_remove (plans, &id.hashPubKey,
624                                                        pp));
625   if (NULL != pp->pth)
626     GSF_peer_transmit_cancel_ (pp->pth);
627   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
628   {
629     GNUNET_SCHEDULER_cancel (pp->task);
630     pp->task = GNUNET_SCHEDULER_NO_TASK;
631   }
632   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
633   {
634     GNUNET_assert (GNUNET_YES ==
635                    GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
636                                                          get_rp_key (rp),
637                                                          rp));
638     while (NULL != (prl = rp->prl_head))
639     {
640       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
641       prd = GSF_pending_request_get_data_ (prl->pr);
642       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
643       GNUNET_free (prl->rpr);
644       GNUNET_free (prl);
645     }
646     GNUNET_free (rp);
647   }
648   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
649   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
650   {
651     GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
652                                           get_rp_key (rp),
653                                           rp);
654     while (NULL != (prl = rp->prl_head))
655     {
656       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
657       prd = GSF_pending_request_get_data_ (prl->pr);
658       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
659       GNUNET_free (prl->rpr);
660       GNUNET_free (prl);
661     }
662     GNUNET_free (rp);
663   }
664   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
665                          plan_count, GNUNET_NO);
666
667   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
668   GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map);
669   GNUNET_free (pp);
670 }
671
672
673 /**
674  * Notify the plan about a request being done; destroy all entries
675  * associated with this request.
676  *
677  * @param pr request that is done
678  */
679 void
680 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
681 {
682   struct GSF_RequestPlan *rp;
683   struct GSF_PendingRequestData *prd;
684   struct GSF_RequestPlanReference *rpr;
685
686   prd = GSF_pending_request_get_data_ (pr);
687   while (NULL != (rpr = prd->rpr_head))
688   {
689     GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, rpr);
690     rp = rpr->rp;
691     GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, rpr->prl);
692     if (NULL == rp->prl_head)
693     {
694       GNUNET_CONTAINER_heap_remove_node (rp->hn);
695       plan_count--;
696       GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map,
697                                             &GSF_pending_request_get_data_ (rpr->prl->pr)->query,
698                                             rp);
699       GNUNET_free (rp);
700     }
701     GNUNET_free (rpr->prl);
702     GNUNET_free (rpr);
703   }
704   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
705                          plan_count, GNUNET_NO);
706 }
707
708
709 /**
710  * Initialize plan subsystem.
711  */
712 void
713 GSF_plan_init ()
714 {
715   plans = GNUNET_CONTAINER_multihashmap_create (256);
716 }
717
718
719 /**
720  * Shutdown plan subsystem.
721  */
722 void
723 GSF_plan_done ()
724 {
725   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (plans));
726   GNUNET_CONTAINER_multihashmap_destroy (plans);
727 }
728
729
730
731 /* end of gnunet-service-fs_pe.h */