-preparations for replacement of try_connect call
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32 /**
33  * Collect an instane number of statistics?  May cause excessive IPC.
34  */
35 #define INSANE_STATISTICS GNUNET_NO
36
37 /**
38  * List of GSF_PendingRequests this request plan
39  * participates with.
40  */
41 struct PendingRequestList;
42
43 /**
44  * Transmission plan for a peer.
45  */
46 struct PeerPlan;
47
48
49 /**
50  * M:N binding of plans to pending requests.
51  * Each pending request can be in a number of plans,
52  * and each plan can have a number of pending requests.
53  * Objects of this type indicate a mapping of a plan to
54  * a particular pending request.
55  *
56  * The corresponding head and tail of the "PE" MDLL
57  * are stored in a `struct GSF_RequestPlan`. (We need
58  * to be able to lookup all pending requests corresponding
59  * to a given plan entry.)
60  *
61  * Similarly head and tail of the "PR" MDLL are stored
62  * with the `struct GSF_PendingRequest`.  (We need
63  * to be able to lookup all plan entries corresponding
64  * to a given pending request.)
65  */
66 struct GSF_PendingRequestPlanBijection
67 {
68
69   /**
70    * This is a doubly-linked list.
71    */
72   struct GSF_PendingRequestPlanBijection *next_PR;
73
74   /**
75    * This is a doubly-linked list.
76    */
77   struct GSF_PendingRequestPlanBijection *prev_PR;
78
79   /**
80    * This is a doubly-linked list.
81    */
82   struct GSF_PendingRequestPlanBijection *next_PE;
83
84   /**
85    * This is a doubly-linked list.
86    */
87   struct GSF_PendingRequestPlanBijection *prev_PE;
88
89   /**
90    * Associated request plan (tells us one of the peers that
91    * we plan to forward the request to).
92    */
93   struct GSF_RequestPlan *rp;
94
95   /**
96    * Associated pending request (identifies request details
97    * and one of the origins of the request).
98    */
99   struct GSF_PendingRequest *pr;
100
101 };
102
103
104 /**
105  * Information we keep per request per peer.  This is a doubly-linked
106  * list (with head and tail in the `struct GSF_PendingRequestData`)
107  * with one entry in each heap of each `struct PeerPlan`.  Each
108  * entry tracks information relevant for this request and this peer.
109  */
110 struct GSF_RequestPlan
111 {
112
113   /**
114    * This is a doubly-linked list.
115    */
116   struct GSF_RequestPlan *next;
117
118   /**
119    * This is a doubly-linked list.
120    */
121   struct GSF_RequestPlan *prev;
122
123   /**
124    * Heap node associated with this request and this peer.
125    */
126   struct GNUNET_CONTAINER_HeapNode *hn;
127
128   /**
129    * The transmission plan for a peer that this request is associated with.
130    */
131   struct PeerPlan *pp;
132
133   /**
134    * Head of list of associated pending requests.  This tells us
135    * which incoming requests from other peers this plan entry
136    * corresponds to.
137    */
138   struct GSF_PendingRequestPlanBijection *pe_head;
139
140   /**
141    * Tail of list of associated pending requests.
142    */
143   struct GSF_PendingRequestPlanBijection *pe_tail;
144
145   /**
146    * Earliest time we'd be happy to (re)transmit this request.
147    */
148   struct GNUNET_TIME_Absolute earliest_transmission;
149
150   /**
151    * When was the last time we transmitted this request to this peer? 0 for never.
152    */
153   struct GNUNET_TIME_Absolute last_transmission;
154
155   /**
156    * Current priority for this request for this target.
157    */
158   uint64_t priority;
159
160   /**
161    * How often did we transmit this request to this peer?
162    */
163   unsigned int transmission_counter;
164
165 };
166
167
168 /**
169  * Transmission plan for a peer.
170  */
171 struct PeerPlan
172 {
173   /**
174    * Heap with pending queries (`struct GSF_RequestPlan`), higher weights mean higher priority.
175    */
176   struct GNUNET_CONTAINER_Heap *priority_heap;
177
178   /**
179    * Heap with pending queries (`struct GSF_RequestPlan`), by transmission time, lowest first.
180    */
181   struct GNUNET_CONTAINER_Heap *delay_heap;
182
183   /**
184    * Map of queries to plan entries.  All entries in the @e priority_heap
185    * or @e delay_heap should be in the @e plan_map.  Note that it is
186    * possible for the @e plan_map to have multiple entries for the same
187    * query.
188    */
189   struct GNUNET_CONTAINER_MultiHashMap *plan_map;
190
191   /**
192    * Current transmission request handle.
193    */
194   struct GSF_PeerTransmitHandle *pth;
195
196   /**
197    * Peer for which this is the plan.
198    */
199   struct GSF_ConnectedPeer *cp;
200
201   /**
202    * Current task for executing the plan.
203    */
204   struct GNUNET_SCHEDULER_Task *task;
205 };
206
207
208 /**
209  * Hash map from peer identities to PeerPlans.
210  */
211 static struct GNUNET_CONTAINER_MultiPeerMap *plans;
212
213 /**
214  * Sum of all transmission counters (equals total delay for all plan entries).
215  */
216 static unsigned long long total_delay;
217
218 /**
219  * Number of plan entries.
220  */
221 static unsigned long long plan_count;
222
223
224 /**
225  * Return the query (key in the plan_map) for the given request plan.
226  * Note that this key may change as there can be multiple pending
227  * requests for the same key and we just return _one_ of them; this
228  * particular one might complete while another one might still be
229  * active, hence the lifetime of the returned hash code is NOT
230  * necessarily identical to that of the `struct GSF_RequestPlan`
231  * given.
232  *
233  * @param rp a request plan
234  * @return the associated query
235  */
236 static const struct GNUNET_HashCode *
237 get_rp_key (struct GSF_RequestPlan *rp)
238 {
239   return &GSF_pending_request_get_data_ (rp->pe_head->pr)->query;
240 }
241
242
243 /**
244  * Figure out when and how to transmit to the given peer.
245  *
246  * @param cls the `struct GSF_ConnectedPeer` for transmission
247  * @param tc scheduler context
248  */
249 static void
250 schedule_peer_transmission (void *cls,
251                             const struct GNUNET_SCHEDULER_TaskContext *tc);
252
253
254 /**
255  * Insert the given request plan into the heap with the appropriate weight.
256  *
257  * @param pp associated peer's plan
258  * @param rp request to plan
259  */
260 static void
261 plan (struct PeerPlan *pp,
262       struct GSF_RequestPlan *rp)
263 {
264 #define N ((double)128.0)
265   /**
266    * Running average delay we currently impose.
267    */
268   static double avg_delay;
269
270   struct GSF_PendingRequestData *prd;
271   struct GNUNET_TIME_Relative delay;
272
273   GNUNET_assert (rp->pp == pp);
274   GNUNET_STATISTICS_set (GSF_stats,
275                          gettext_noop ("# average retransmission delay (ms)"),
276                          total_delay * 1000LL / plan_count, GNUNET_NO);
277   prd = GSF_pending_request_get_data_ (rp->pe_head->pr);
278
279   if (rp->transmission_counter < 8)
280     delay =
281         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
282                                        rp->transmission_counter);
283   else if (rp->transmission_counter < 32)
284     delay =
285         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
286                                        8 +
287                                        (1LL << (rp->transmission_counter - 8)));
288   else
289     delay =
290         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
291                                        8 + (1LL << 24));
292   delay.rel_value_us =
293     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
294                               delay.rel_value_us + 1);
295   /* Add 0.01 to avg_delay to avoid division-by-zero later */
296   avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value_us) / N) + 0.01;
297
298   /*
299    * For the priority, we need to consider a few basic rules:
300    * 1) if we just started requesting (delay is small), we should
301    * virtually always have a priority of zero.
302    * 2) for requests with average latency, our priority should match
303    * the average priority observed on the network
304    * 3) even the longest-running requests should not be WAY out of
305    * the observed average (thus we bound by a factor of 2)
306    * 4) we add +1 to the observed average priority to avoid everyone
307    * staying put at zero (2 * 0 = 0...).
308    *
309    * Using the specific calculation below, we get:
310    *
311    * delay = 0 => priority = 0;
312    * delay = avg delay => priority = running-average-observed-priority;
313    * delay >> avg_delay => priority = 2 * running-average-observed-priority;
314    *
315    * which satisfies all of the rules above.
316    *
317    * Note: M_PI_4 = PI/4 = arctan(1)
318    */
319   rp->priority =
320       round ((GSF_current_priorities +
321               1.0) * atan (delay.rel_value_us / avg_delay)) / M_PI_4;
322   /* Note: usage of 'round' and 'atan' requires -lm */
323
324   if (rp->transmission_counter != 0)
325     delay.rel_value_us += TTL_DECREMENT * 1000;
326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327               "Considering (re)transmission number %u in %s\n",
328               (unsigned int) rp->transmission_counter,
329               GNUNET_STRINGS_relative_time_to_string (delay,
330                                                       GNUNET_YES));
331   rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
332   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
333               "Earliest (re)transmission for `%s' in %us\n",
334               GNUNET_h2s (&prd->query), rp->transmission_counter);
335   GNUNET_assert (rp->hn == NULL);
336   if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us)
337     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
338   else
339     rp->hn =
340         GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp,
341                                       rp->earliest_transmission.abs_value_us);
342   GNUNET_assert (GNUNET_YES ==
343                  GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
344                                                                get_rp_key (rp),
345                                                                rp));
346   if (NULL != pp->task)
347     GNUNET_SCHEDULER_cancel (pp->task);
348   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
349 #undef N
350 }
351
352
353 /**
354  * Get the pending request with the highest TTL from the given plan.
355  *
356  * @param rp plan to investigate
357  * @return pending request with highest TTL
358  */
359 struct GSF_PendingRequest *
360 get_latest (const struct GSF_RequestPlan *rp)
361 {
362   struct GSF_PendingRequest *ret;
363   struct GSF_PendingRequestPlanBijection *bi;
364   const struct GSF_PendingRequestData *rprd;
365   const struct GSF_PendingRequestData *prd;
366
367   bi = rp->pe_head;
368   if (NULL == bi)
369     return NULL; /* should never happen */
370   ret = bi->pr;
371   rprd = GSF_pending_request_get_data_ (ret);
372   for (bi = bi->next_PE; NULL != bi; bi = bi->next_PE)
373   {
374     GNUNET_break (GNUNET_YES ==
375                   GSF_pending_request_test_active_ (bi->pr));
376     prd = GSF_pending_request_get_data_ (bi->pr);
377     if (prd->ttl.abs_value_us > rprd->ttl.abs_value_us)
378     {
379       ret = bi->pr;
380       rprd = prd;
381     }
382   }
383   return ret;
384 }
385
386
387 /**
388  * Function called to get a message for transmission.
389  *
390  * @param cls closure
391  * @param buf_size number of bytes available in @a buf
392  * @param buf where to copy the message, NULL on error (peer disconnect)
393  * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
394  */
395 static size_t
396 transmit_message_callback (void *cls,
397                            size_t buf_size,
398                            void *buf)
399 {
400   struct PeerPlan *pp = cls;
401   struct GSF_RequestPlan *rp;
402   size_t msize;
403
404   pp->pth = NULL;
405   if (NULL == buf)
406   {
407     /* failed, try again... */
408     if (NULL != pp->task)
409       GNUNET_SCHEDULER_cancel (pp->task);
410
411     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
412     GNUNET_STATISTICS_update (GSF_stats,
413                               gettext_noop
414                               ("# transmission failed (core has no bandwidth)"),
415                               1, GNUNET_NO);
416     return 0;
417   }
418   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
419   if (NULL == rp)
420   {
421     if (NULL != pp->task)
422       GNUNET_SCHEDULER_cancel (pp->task);
423     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
424     return 0;
425   }
426   msize = GSF_pending_request_get_message_ (get_latest (rp),
427                                             buf_size,
428                                             buf);
429   if (msize > buf_size)
430   {
431     if (NULL != pp->task)
432       GNUNET_SCHEDULER_cancel (pp->task);
433     /* buffer to small (message changed), try again */
434     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
435     return 0;
436   }
437   /* remove from root, add again elsewhere... */
438   GNUNET_assert (rp ==
439                  GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
440   rp->hn = NULL;
441   rp->last_transmission = GNUNET_TIME_absolute_get ();
442   rp->transmission_counter++;
443   total_delay++;
444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
445               "Executing plan %p executed %u times, planning retransmission\n",
446               rp, rp->transmission_counter);
447   plan (pp, rp);
448   GNUNET_STATISTICS_update (GSF_stats,
449                             gettext_noop ("# query messages sent to other peers"),
450                             1,
451                             GNUNET_NO);
452   return msize;
453 }
454
455
456 /**
457  * Figure out when and how to transmit to the given peer.
458  *
459  * @param cls the `struct PeerPlan`
460  * @param tc scheduler context
461  */
462 static void
463 schedule_peer_transmission (void *cls,
464                             const struct GNUNET_SCHEDULER_TaskContext *tc)
465 {
466   struct PeerPlan *pp = cls;
467   struct GSF_RequestPlan *rp;
468   size_t msize;
469   struct GNUNET_TIME_Relative delay;
470
471   pp->task = NULL;
472   if (NULL != pp->pth)
473   {
474     GSF_peer_transmit_cancel_ (pp->pth);
475     pp->pth = NULL;
476   }
477   /* move ready requests to priority queue */
478   while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
479          (0 == GNUNET_TIME_absolute_get_remaining
480           (rp->earliest_transmission).rel_value_us))
481   {
482     GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
483     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
484                                            rp,
485                                            rp->priority);
486   }
487   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
488   {
489     /* priority heap (still) empty, check for delay... */
490     rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
491     if (NULL == rp)
492     {
493       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
494                   "No active requests for plan %p.\n",
495                   pp);
496       return;                   /* both queues empty */
497     }
498     delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
499     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500                 "Sleeping for %s before retrying requests on plan %p.\n",
501                 GNUNET_STRINGS_relative_time_to_string (delay,
502                                                         GNUNET_YES),
503                 pp);
504     GNUNET_STATISTICS_set (GSF_stats,
505                            gettext_noop ("# delay heap timeout (ms)"),
506                            delay.rel_value_us / 1000LL, GNUNET_NO);
507
508     pp->task =
509         GNUNET_SCHEDULER_add_delayed (delay,
510                                       &schedule_peer_transmission,
511                                       pp);
512     return;
513   }
514 #if INSANE_STATISTICS
515   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"),
516                             1, GNUNET_NO);
517 #endif
518   /* process from priority heap */
519   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
520   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521               "Executing query plan %p\n",
522               rp);
523   GNUNET_assert (NULL != rp);
524   msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
525   pp->pth =
526       GSF_peer_transmit_ (pp->cp, GNUNET_YES,
527                           rp->priority,
528                           GNUNET_TIME_UNIT_FOREVER_REL,
529                           msize,
530                           &transmit_message_callback, pp);
531   GNUNET_assert (NULL != pp->pth);
532 }
533
534
535 /**
536  * Closure for merge_pr().
537  */
538 struct MergeContext
539 {
540
541   /**
542    * Request we are trying to merge.
543    */
544   struct GSF_PendingRequest *pr;
545
546   /**
547    * Set to #GNUNET_YES if we succeeded to merge.
548    */
549   int merged;
550
551 };
552
553
554 /**
555  * Iterator that checks if an equivalent request is already
556  * present for this peer.
557  *
558  * @param cls closure
559  * @param query the query
560  * @param element request plan stored at the node
561  * @return #GNUNET_YES if we should continue to iterate,
562  *         #GNUNET_NO if not (merge success)
563  */
564 static int
565 merge_pr (void *cls,
566           const struct GNUNET_HashCode *query,
567           void *element)
568 {
569   struct MergeContext *mpr = cls;
570   struct GSF_RequestPlan *rp = element;
571   struct GSF_PendingRequestData *prd;
572   struct GSF_PendingRequestPlanBijection *bi;
573   struct GSF_PendingRequest *latest;
574
575   GNUNET_break (GNUNET_YES ==
576                 GSF_pending_request_test_active_ (mpr->pr));
577   if (GNUNET_OK !=
578       GSF_pending_request_is_compatible_ (mpr->pr,
579                                           rp->pe_head->pr))
580     return GNUNET_YES;
581   /* merge new request with existing request plan */
582   bi = GNUNET_new (struct GSF_PendingRequestPlanBijection);
583   bi->rp = rp;
584   bi->pr = mpr->pr;
585   prd = GSF_pending_request_get_data_ (mpr->pr);
586   GNUNET_CONTAINER_MDLL_insert (PR,
587                                 prd->pr_head,
588                                 prd->pr_tail,
589                                 bi);
590   GNUNET_CONTAINER_MDLL_insert (PE,
591                                 rp->pe_head,
592                                 rp->pe_tail,
593                                 bi);
594   mpr->merged = GNUNET_YES;
595 #if INSANE_STATISTICS
596   GNUNET_STATISTICS_update (GSF_stats,
597                             gettext_noop ("# requests merged"),
598                             1,
599                             GNUNET_NO);
600 #endif
601   latest = get_latest (rp);
602   if (GSF_pending_request_get_data_ (latest)->ttl.abs_value_us <
603       prd->ttl.abs_value_us)
604   {
605 #if INSANE_STATISTICS
606     GNUNET_STATISTICS_update (GSF_stats,
607                               gettext_noop ("# requests refreshed"),
608                               1,
609                               GNUNET_NO);
610 #endif
611     rp->transmission_counter = 0;       /* reset */
612   }
613   return GNUNET_NO;
614 }
615
616
617 /**
618  * Create a new query plan entry.
619  *
620  * @param cp peer with the entry
621  * @param pr request with the entry
622  */
623 void
624 GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
625                struct GSF_PendingRequest *pr)
626 {
627   const struct GNUNET_PeerIdentity *id;
628   struct PeerPlan *pp;
629   struct GSF_PendingRequestData *prd;
630   struct GSF_RequestPlan *rp;
631   struct GSF_PendingRequestPlanBijection *bi;
632   struct MergeContext mpc;
633
634   GNUNET_assert (GNUNET_YES ==
635                  GSF_pending_request_test_active_ (pr));
636   GNUNET_assert (NULL != cp);
637   id = GSF_connected_peer_get_identity2_ (cp);
638   pp = GNUNET_CONTAINER_multipeermap_get (plans, id);
639   if (NULL == pp)
640   {
641     pp = GNUNET_new (struct PeerPlan);
642     pp->plan_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
643     pp->priority_heap =
644         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
645     pp->delay_heap =
646         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
647     pp->cp = cp;
648     GNUNET_assert (GNUNET_OK ==
649                    GNUNET_CONTAINER_multipeermap_put (plans,
650                                                       id,
651                                                       pp,
652                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
653   }
654   mpc.merged = GNUNET_NO;
655   mpc.pr = pr;
656   prd = GSF_pending_request_get_data_ (pr);
657   GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
658                                               &prd->query,
659                                               &merge_pr,
660                                               &mpc);
661   if (GNUNET_NO != mpc.merged)
662     return;
663   plan_count++;
664   GNUNET_STATISTICS_update (GSF_stats,
665                             gettext_noop ("# query plan entries"),
666                             1,
667                             GNUNET_NO);
668   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669               "Planning transmission of query `%s' to peer `%s'\n",
670               GNUNET_h2s (&prd->query),
671               GNUNET_i2s (id));
672   rp = GNUNET_new (struct GSF_RequestPlan);
673   bi = GNUNET_new (struct GSF_PendingRequestPlanBijection);
674   bi->rp = rp;
675   bi->pr = pr;
676   GNUNET_CONTAINER_MDLL_insert (PR,
677                                 prd->pr_head,
678                                 prd->pr_tail,
679                                 bi);
680   GNUNET_CONTAINER_MDLL_insert (PE,
681                                 rp->pe_head,
682                                 rp->pe_tail,
683                                 bi);
684   rp->pp = pp;
685   GNUNET_assert (GNUNET_YES ==
686                  GNUNET_CONTAINER_multihashmap_put (pp->plan_map,
687                                                     get_rp_key (rp),
688                                                     rp,
689                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
690   plan (pp,
691         rp);
692 }
693
694
695 /**
696  * Notify the plan about a peer being no longer available;
697  * destroy all entries associated with this peer.
698  *
699  * @param cp connected peer
700  */
701 void
702 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
703 {
704   const struct GNUNET_PeerIdentity *id;
705   struct PeerPlan *pp;
706   struct GSF_RequestPlan *rp;
707   struct GSF_PendingRequestData *prd;
708   struct GSF_PendingRequestPlanBijection *bi;
709
710   id = GSF_connected_peer_get_identity2_ (cp);
711   pp = GNUNET_CONTAINER_multipeermap_get (plans, id);
712   if (NULL == pp)
713     return;                     /* nothing was ever planned for this peer */
714   GNUNET_assert (GNUNET_YES ==
715                  GNUNET_CONTAINER_multipeermap_remove (plans, id,
716                                                        pp));
717   if (NULL != pp->pth)
718   {
719     GSF_peer_transmit_cancel_ (pp->pth);
720     pp->pth = NULL;
721   }
722   if (NULL != pp->task)
723   {
724     GNUNET_SCHEDULER_cancel (pp->task);
725     pp->task = NULL;
726   }
727   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
728   {
729     GNUNET_break (GNUNET_YES ==
730                   GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
731                                                         get_rp_key (rp),
732                                                         rp));
733     while (NULL != (bi = rp->pe_head))
734     {
735       GNUNET_CONTAINER_MDLL_remove (PE,
736                                     rp->pe_head,
737                                     rp->pe_tail,
738                                     bi);
739       prd = GSF_pending_request_get_data_ (bi->pr);
740       GNUNET_CONTAINER_MDLL_remove (PR,
741                                     prd->pr_head,
742                                     prd->pr_tail,
743                                     bi);
744       GNUNET_free (bi);
745     }
746     plan_count--;
747     GNUNET_free (rp);
748   }
749   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
750   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
751   {
752     GNUNET_break (GNUNET_YES ==
753                   GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
754                                                         get_rp_key (rp),
755                                                         rp));
756     while (NULL != (bi = rp->pe_head))
757     {
758       prd = GSF_pending_request_get_data_ (bi->pr);
759       GNUNET_CONTAINER_MDLL_remove (PE,
760                                     rp->pe_head,
761                                     rp->pe_tail,
762                                     bi);
763       GNUNET_CONTAINER_MDLL_remove (PR,
764                                     prd->pr_head,
765                                     prd->pr_tail,
766                                     bi);
767       GNUNET_free (bi);
768     }
769     plan_count--;
770     GNUNET_free (rp);
771   }
772   GNUNET_STATISTICS_set (GSF_stats,
773                          gettext_noop ("# query plan entries"),
774                          plan_count,
775                          GNUNET_NO);
776   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
777   GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map);
778   GNUNET_free (pp);
779 }
780
781
782 /**
783  * Get the last transmission attempt time for the request plan list
784  * referenced by @a pr_head, that was sent to @a sender
785  *
786  * @param pr_head request plan reference list to check.
787  * @param sender the peer that we've sent the request to.
788  * @param result the timestamp to fill, set to #GNUNET_TIME_UNIT_FOREVER_ABS if never transmitted
789  * @return #GNUNET_YES if @a result was changed, #GNUNET_NO otherwise.
790  */
791 int
792 GSF_request_plan_reference_get_last_transmission_ (struct GSF_PendingRequestPlanBijection *pr_head,
793                                                    struct GSF_ConnectedPeer *sender,
794                                                    struct GNUNET_TIME_Absolute *result)
795 {
796   struct GSF_PendingRequestPlanBijection *bi;
797
798   for (bi = pr_head; NULL != bi; bi = bi->next_PR)
799   {
800     if (bi->rp->pp->cp == sender)
801     {
802       if (0 == bi->rp->last_transmission.abs_value_us)
803         *result = GNUNET_TIME_UNIT_FOREVER_ABS;
804       else
805         *result = bi->rp->last_transmission;
806       return GNUNET_YES;
807     }
808   }
809   return GNUNET_NO;
810 }
811
812
813 /**
814  * Notify the plan about a request being done; destroy all entries
815  * associated with this request.
816  *
817  * @param pr request that is done
818  */
819 void
820 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
821 {
822   struct GSF_RequestPlan *rp;
823   struct GSF_PendingRequestData *prd;
824   struct GSF_PendingRequestPlanBijection *bi;
825
826   prd = GSF_pending_request_get_data_ (pr);
827   while (NULL != (bi = prd->pr_head))
828   {
829     rp = bi->rp;
830     GNUNET_CONTAINER_MDLL_remove (PR,
831                                   prd->pr_head,
832                                   prd->pr_tail,
833                                   bi);
834     GNUNET_CONTAINER_MDLL_remove (PE,
835                                   rp->pe_head,
836                                   rp->pe_tail,
837                                   bi);
838     GNUNET_assert (bi->pr == pr);
839     if (NULL == rp->pe_head)
840     {
841       GNUNET_CONTAINER_heap_remove_node (rp->hn);
842       plan_count--;
843       GNUNET_break (GNUNET_YES ==
844                     GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map,
845                                                           &prd->query,
846                                                           rp));
847       GNUNET_free (rp);
848     }
849     GNUNET_free (bi);
850   }
851   GNUNET_STATISTICS_set (GSF_stats,
852                          gettext_noop ("# query plan entries"),
853                          plan_count,
854                          GNUNET_NO);
855 }
856
857
858 /**
859  * Initialize plan subsystem.
860  */
861 void
862 GSF_plan_init ()
863 {
864   plans = GNUNET_CONTAINER_multipeermap_create (256,
865                                                 GNUNET_YES);
866 }
867
868
869 /**
870  * Shutdown plan subsystem.
871  */
872 void
873 GSF_plan_done ()
874 {
875   GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (plans));
876   GNUNET_CONTAINER_multipeermap_destroy (plans);
877 }
878
879
880
881 /* end of gnunet-service-fs_pe.h */