even more stats, more logging
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32
33 /**
34  * List of GSF_PendingRequests this request plan
35  * participates with.
36  */
37 struct PendingRequestList;
38
39
40 /**
41  * DLL of request plans a particular pending request is
42  * involved with.
43  */
44 struct GSF_RequestPlanReference
45 {
46
47   /**
48    * This is a doubly-linked list.
49    */
50   struct GSF_RequestPlanReference *next;
51
52   /**
53    * This is a doubly-linked list.
54    */
55   struct GSF_RequestPlanReference *prev;
56
57   /**
58    * Associated request plan.
59    */
60   struct GSF_RequestPlan *rp;
61
62   /**
63    * Corresponding PendingRequestList.
64    */
65   struct PendingRequestList *prl;
66 };
67
68
69 /**
70  * List of GSF_PendingRequests this request plan
71  * participates with.
72  */
73 struct PendingRequestList
74 {
75
76   /**
77    * This is a doubly-linked list.
78    */
79   struct PendingRequestList *next;
80
81   /**
82    * This is a doubly-linked list.
83    */
84   struct PendingRequestList *prev;
85
86   /**
87    * Array of associated pending requests.
88    */
89   struct GSF_PendingRequest *pr;
90
91   /**
92    * Corresponding GSF_RequestPlanReference.
93    */
94   struct GSF_RequestPlanReference *rpr;
95
96 };
97
98
99 /**
100  * Information we keep per request per peer.  This is a doubly-linked
101  * list (with head and tail in the 'struct GSF_PendingRequestData')
102  * with one entry in each heap of each 'struct PeerPlan'.  Each
103  * entry tracks information relevant for this request and this peer.
104  */
105 struct GSF_RequestPlan
106 {
107
108   /**
109    * This is a doubly-linked list.
110    */
111   struct GSF_RequestPlan *next;
112
113   /**
114    * This is a doubly-linked list.
115    */
116   struct GSF_RequestPlan *prev;
117
118   /**
119    * Heap node associated with this request and this peer.
120    */
121   struct GNUNET_CONTAINER_HeapNode *hn;
122
123   /**
124    * Head of list of associated pending requests.
125    */
126   struct PendingRequestList *prl_head;
127
128   /**
129    * Tail of list of associated pending requests.
130    */
131   struct PendingRequestList *prl_tail;
132
133   /**
134    * Earliest time we'd be happy to (re)transmit this request.
135    */
136   struct GNUNET_TIME_Absolute earliest_transmission;
137
138   /**
139    * When was the last time we transmitted this request to this peer? 0 for never.
140    */
141   struct GNUNET_TIME_Absolute last_transmission;
142
143   /**
144    * Current priority for this request for this target.
145    */
146   uint64_t priority;
147
148   /**
149    * How often did we transmit this request to this peer?
150    */
151   unsigned int transmission_counter;
152
153 };
154
155
156 /**
157  * Transmission plan for a peer.
158  */
159 struct PeerPlan
160 {
161   /**
162    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
163    */
164   struct GNUNET_CONTAINER_Heap *priority_heap;
165
166   /**
167    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
168    */
169   struct GNUNET_CONTAINER_Heap *delay_heap;
170
171   /**
172    * Current transmission request handle.
173    */
174   struct GSF_PeerTransmitHandle *pth;
175
176   /**
177    * Peer for which this is the plan.
178    */
179   struct GSF_ConnectedPeer *cp;
180
181   /**
182    * Current task for executing the plan.
183    */
184   GNUNET_SCHEDULER_TaskIdentifier task;
185 };
186
187
188 /**
189  * Hash map from peer identities to PeerPlans.
190  */
191 static struct GNUNET_CONTAINER_MultiHashMap *plans;
192
193 /**
194  * Sum of all transmission counters (equals total delay for all plan entries).
195  */
196 static unsigned long long total_delay;
197
198 /**
199  * Number of plan entries.
200  */
201 static unsigned long long plan_count;
202
203
204 /**
205  * Figure out when and how to transmit to the given peer.
206  *
207  * @param cls the 'struct GSF_ConnectedPeer' for transmission
208  * @param tc scheduler context
209  */
210 static void
211 schedule_peer_transmission (void *cls,
212                             const struct GNUNET_SCHEDULER_TaskContext *tc);
213
214
215 /**
216  * Insert the given request plan into the heap with the appropriate weight.
217  *
218  * @param pp associated peer's plan
219  * @param rp request to plan
220  */
221 static void
222 plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
223 {
224   struct GSF_PendingRequestData *prd;
225   struct GNUNET_TIME_Relative delay;
226
227   GNUNET_STATISTICS_set (GSF_stats,
228                          gettext_noop ("# average retransmission delay (ms)"),
229                          total_delay * 1000LL / plan_count, GNUNET_NO);
230   prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
231   // FIXME: calculate 'rp->priority'!
232   if (rp->transmission_counter < 32)
233     delay =
234         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
235                                        1LL << rp->transmission_counter);
236   else
237     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, UINT_MAX);
238   rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
239 #if DEBUG_FS
240   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
241               "Earliest (re)transmission for `%s' in %us\n",
242               GNUNET_h2s (&prd->query), rp->transmission_counter);
243 #endif
244
245   GNUNET_assert (rp->hn == NULL);
246   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value
247       == 0)
248     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
249   else
250     rp->hn =
251         GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp,
252                                       rp->earliest_transmission.abs_value);
253   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
254     GNUNET_SCHEDULER_cancel (pp->task);
255   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
256 }
257
258
259 /**
260  * Get the pending request with the highest TTL from the given plan.
261  *
262  * @param rp plan to investigate
263  * @return pending request with highest TTL
264  */
265 struct GSF_PendingRequest *
266 get_latest (const struct GSF_RequestPlan *rp)
267 {
268   struct GSF_PendingRequest *ret;
269   struct PendingRequestList *prl;
270
271   prl = rp->prl_head;
272   ret = prl->pr;
273   prl = prl->next;
274   while (NULL != prl)
275   {
276     if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
277         GSF_pending_request_get_data_ (ret)->ttl.abs_value)
278       ret = prl->pr;
279     prl = prl->next;
280   }
281   return ret;
282 }
283
284
285 /**
286  * Function called to get a message for transmission.
287  *
288  * @param cls closure
289  * @param buf_size number of bytes available in buf
290  * @param buf where to copy the message, NULL on error (peer disconnect)
291  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
292  */
293 static size_t
294 transmit_message_callback (void *cls, size_t buf_size, void *buf)
295 {
296   struct PeerPlan *pp = cls;
297   struct GSF_RequestPlan *rp;
298   size_t msize;
299
300   pp->pth = NULL;
301   if (NULL == buf)
302   {
303     /* failed, try again... */
304     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
305     GNUNET_STATISTICS_update (GSF_stats,
306                               gettext_noop ("# transmission failed (core has no bandwidth)"), 1,
307                               GNUNET_NO);
308     return 0;
309   }
310   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
311   if (NULL == rp)
312   {
313     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
314     return 0;
315   }
316   msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
317   if (msize > buf_size)
318   {
319     /* buffer to small (message changed), try again */
320     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
321     return 0;
322   }
323   /* remove from root, add again elsewhere... */
324   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
325   rp->hn = NULL;
326   rp->last_transmission = GNUNET_TIME_absolute_get ();
327   rp->transmission_counter++;
328   total_delay++;
329 #if DEBUG_FS
330   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
331               "Executing plan %p executed %u times, planning retransmission\n",
332               rp, rp->transmission_counter);
333 #endif
334   plan (pp, rp);
335   GNUNET_STATISTICS_update (GSF_stats,
336                             gettext_noop
337                             ("# queries messages sent to other peers"), 1,
338                             GNUNET_NO);
339   return msize;
340 }
341
342
343 /**
344  * Figure out when and how to transmit to the given peer.
345  *
346  * @param cls the 'struct PeerPlan'
347  * @param tc scheduler context
348  */
349 static void
350 schedule_peer_transmission (void *cls,
351                             const struct GNUNET_SCHEDULER_TaskContext *tc)
352 {
353   struct PeerPlan *pp = cls;
354   struct GSF_RequestPlan *rp;
355   size_t msize;
356   struct GNUNET_TIME_Relative delay;
357
358   pp->task = GNUNET_SCHEDULER_NO_TASK;
359   if (pp->pth != NULL)
360   {
361     GSF_peer_transmit_cancel_ (pp->pth);
362     pp->pth = NULL;
363   }
364   GNUNET_STATISTICS_set (GSF_stats,
365                          gettext_noop ("# query delay heap size"), 
366                          GNUNET_CONTAINER_heap_get_size (pp->delay_heap),
367                          GNUNET_NO);
368   GNUNET_STATISTICS_set (GSF_stats,
369                          gettext_noop ("# query priority heap size"), 
370                          GNUNET_CONTAINER_heap_get_size (pp->priority_heap),
371                          GNUNET_NO);
372   /* move ready requests to priority queue */
373   while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
374          (GNUNET_TIME_absolute_get_remaining
375           (rp->earliest_transmission).rel_value == 0))
376   {
377     GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
378     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
379   }
380   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
381   {
382     /* priority heap (still) empty, check for delay... */
383     rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
384     if (NULL == rp)
385     {
386 #if DEBUG_FS
387       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No active requests for plan %p.\n",
388                   pp);
389 #endif
390       return;                   /* both queues empty */
391     }
392     delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
393 #if DEBUG_FS
394     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
395                 "Sleeping for %llu ms before retrying requests on plan %p.\n",
396                 (unsigned long long)
397                 delay.rel_value, pp);
398 #endif
399     GNUNET_STATISTICS_set (GSF_stats,
400                            gettext_noop ("# delay heap timeout"), 
401                            delay.rel_value,
402                            GNUNET_NO);
403     
404     pp->task =
405       GNUNET_SCHEDULER_add_delayed (delay,
406                                       &schedule_peer_transmission, pp);
407     return;
408   }
409   GNUNET_STATISTICS_update (GSF_stats,
410                             gettext_noop ("# query plans executed"), 1,
411                             GNUNET_NO);
412   /* process from priority heap */
413   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
414 #if DEBUG_FS > 1
415   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp);
416 #endif
417   GNUNET_assert (NULL != rp);
418   msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
419   pp->pth =
420       GSF_peer_transmit_ (pp->cp, GNUNET_YES, rp->priority,
421                           GNUNET_TIME_UNIT_FOREVER_REL, msize,
422                           &transmit_message_callback, pp);
423   GNUNET_assert (NULL != pp->pth);
424 }
425
426
427 /**
428  * Closure for 'merge_pr'.
429  */
430 struct MergeContext
431 {
432
433   struct GSF_PendingRequest *pr;
434
435   int merged;
436
437 };
438
439
440 /**
441  * Iterator that checks if an equivalent request is already
442  * present for this peer.
443  *
444  * @param cls closure
445  * @param node internal node of the heap (ignored)
446  * @param element request plan stored at the node
447  * @param cost cost associated with the node (ignored)
448  * @return GNUNET_YES if we should continue to iterate,
449  *         GNUNET_NO if not (merge success)
450  */
451 static int
452 merge_pr (void *cls, struct GNUNET_CONTAINER_HeapNode *node, void *element,
453           GNUNET_CONTAINER_HeapCostType cost)
454 {
455   struct MergeContext *mpr = cls;
456   struct GSF_RequestPlan *rp = element;
457   struct GSF_PendingRequestData *prd;
458   struct GSF_RequestPlanReference *rpr;
459   struct PendingRequestList *prl;
460   struct GSF_PendingRequest *latest;
461
462   if (GNUNET_OK !=
463       GSF_pending_request_is_compatible_ (mpr->pr, rp->prl_head->pr))
464     return GNUNET_YES;
465   /* merge new request with existing request plan */
466   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
467   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
468   rpr->rp = rp;
469   rpr->prl = prl;
470   prl->rpr = rpr;
471   prl->pr = mpr->pr;
472   prd = GSF_pending_request_get_data_ (mpr->pr);
473   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
474   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
475   mpr->merged = GNUNET_YES;
476   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests merged"), 1,
477                             GNUNET_NO);
478   latest = get_latest (rp);
479   if (GSF_pending_request_get_data_ (latest)->ttl.abs_value <
480       prd->ttl.abs_value)
481   {
482     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests refreshed"),
483                               1, GNUNET_NO);
484     rp->transmission_counter = 0;       /* reset */
485   }
486   return GNUNET_NO;
487 }
488
489
490 /**
491  * Create a new query plan entry.
492  *
493  * @param cp peer with the entry
494  * @param pr request with the entry
495  */
496 void
497 GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct GSF_PendingRequest *pr)
498 {
499   struct GNUNET_PeerIdentity id;
500   struct PeerPlan *pp;
501   struct GSF_PendingRequestData *prd;
502   struct GSF_RequestPlan *rp;
503   struct GSF_RequestPlanReference *rpr;
504   struct PendingRequestList *prl;
505   struct MergeContext mpc;
506
507   GNUNET_assert (NULL != cp);
508   GSF_connected_peer_get_identity_ (cp, &id);
509   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id.hashPubKey);
510   if (NULL == pp)
511   {
512     pp = GNUNET_malloc (sizeof (struct PeerPlan));
513     pp->priority_heap =
514         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
515     pp->delay_heap =
516         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
517     pp->cp = cp;
518     GNUNET_CONTAINER_multihashmap_put (plans, &id.hashPubKey, pp,
519                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
520   }
521   mpc.merged = GNUNET_NO;
522   mpc.pr = pr;
523   /* FIXME: O(n) call here, LRN reports this is a performance
524      problem.  Try using hash map!? */
525   GNUNET_CONTAINER_heap_iterate (pp->priority_heap, &merge_pr, &mpc);
526   if (mpc.merged != GNUNET_NO)
527     return;
528   GNUNET_CONTAINER_heap_iterate (pp->delay_heap, &merge_pr, &mpc);
529   if (mpc.merged != GNUNET_NO)
530     return;
531   plan_count++;
532   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plan entries"), 1,
533                             GNUNET_NO);
534   prd = GSF_pending_request_get_data_ (pr);
535 #if DEBUG_FS
536   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
537               "Planning transmission of query `%s' to peer `%s'\n",
538               GNUNET_h2s (&prd->query), GNUNET_i2s (&id));
539 #endif
540   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
541   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
542   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
543   rpr->rp = rp;
544   rpr->prl = prl;
545   prl->rpr = rpr;
546   prl->pr = pr;
547   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
548   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
549   plan (pp, rp);
550 }
551
552
553 /**
554  * Notify the plan about a peer being no longer available;
555  * destroy all entries associated with this peer.
556  *
557  * @param cp connected peer
558  */
559 void
560 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
561 {
562   struct GNUNET_PeerIdentity id;
563   struct PeerPlan *pp;
564   struct GSF_RequestPlan *rp;
565   struct GSF_PendingRequestData *prd;
566   struct PendingRequestList *prl;
567
568   GSF_connected_peer_get_identity_ (cp, &id);
569   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id.hashPubKey);
570   if (NULL == pp)
571     return;                     /* nothing was ever planned for this peer */
572   GNUNET_assert (GNUNET_YES ==
573                  GNUNET_CONTAINER_multihashmap_remove (plans, &id.hashPubKey,
574                                                        pp));
575   if (NULL != pp->pth)
576     GSF_peer_transmit_cancel_ (pp->pth);
577   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
578   {
579     GNUNET_SCHEDULER_cancel (pp->task);
580     pp->task = GNUNET_SCHEDULER_NO_TASK;
581   }
582   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
583   {
584     while (NULL != (prl = rp->prl_head))
585     {
586       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
587       prd = GSF_pending_request_get_data_ (prl->pr);
588       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
589       GNUNET_free (prl->rpr);
590       GNUNET_free (prl);
591     }
592     GNUNET_free (rp);
593   }
594   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
595   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
596   {
597     while (NULL != (prl = rp->prl_head))
598     {
599       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
600       prd = GSF_pending_request_get_data_ (prl->pr);
601       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
602       GNUNET_free (prl->rpr);
603       GNUNET_free (prl);
604     }
605     GNUNET_free (rp);
606   }
607   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
608                          plan_count, GNUNET_NO);
609
610   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
611   GNUNET_free (pp);
612 }
613
614
615 /**
616  * Notify the plan about a request being done; destroy all entries
617  * associated with this request.
618  *
619  * @param pr request that is done
620  */
621 void
622 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
623 {
624   struct GSF_RequestPlan *rp;
625   struct GSF_PendingRequestData *prd;
626   struct GSF_RequestPlanReference *rpr;
627
628   prd = GSF_pending_request_get_data_ (pr);
629   while (NULL != (rpr = prd->rpr_head))
630   {
631     GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, rpr);
632     rp = rpr->rp;
633     GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, rpr->prl);
634     GNUNET_free (rpr->prl);
635     GNUNET_free (rpr);
636     if (rp->prl_head == 0)
637     {
638       GNUNET_CONTAINER_heap_remove_node (rp->hn);
639       plan_count--;
640       GNUNET_free (rp);
641     }
642   }
643   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
644                          plan_count, GNUNET_NO);
645 }
646
647
648 /**
649  * Initialize plan subsystem.
650  */
651 void
652 GSF_plan_init ()
653 {
654   plans = GNUNET_CONTAINER_multihashmap_create (256);
655 }
656
657
658 /**
659  * Shutdown plan subsystem.
660  */
661 void
662 GSF_plan_done ()
663 {
664   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (plans));
665   GNUNET_CONTAINER_multihashmap_destroy (plans);
666 }
667
668
669
670 /* end of gnunet-service-fs_pe.h */