prevent loopback routing, add some statistics, faster re-transmit
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32
33 /**
34  * List of GSF_PendingRequests this request plan
35  * participates with.
36  */
37 struct PendingRequestList;
38
39
40 /**
41  * DLL of request plans a particular pending request is
42  * involved with.
43  */
44 struct GSF_RequestPlanReference
45 {
46
47   /**
48    * This is a doubly-linked list.
49    */
50   struct GSF_RequestPlanReference *next;
51
52   /**
53    * This is a doubly-linked list.
54    */
55   struct GSF_RequestPlanReference *prev;
56
57   /**
58    * Associated request plan.
59    */
60   struct GSF_RequestPlan *rp;
61
62   /**
63    * Corresponding PendingRequestList.
64    */
65   struct PendingRequestList *prl;
66 };
67
68
69 /**
70  * List of GSF_PendingRequests this request plan
71  * participates with.
72  */
73 struct PendingRequestList
74 {
75
76   /**
77    * This is a doubly-linked list.
78    */
79   struct PendingRequestList *next;
80
81   /**
82    * This is a doubly-linked list.
83    */
84   struct PendingRequestList *prev;
85
86   /**
87    * Array of associated pending requests.
88    */
89   struct GSF_PendingRequest *pr;
90
91   /**
92    * Corresponding GSF_RequestPlanReference.
93    */
94   struct GSF_RequestPlanReference *rpr;
95
96 };
97
98
99 /**
100  * Information we keep per request per peer.  This is a doubly-linked
101  * list (with head and tail in the 'struct GSF_PendingRequestData')
102  * with one entry in each heap of each 'struct PeerPlan'.  Each
103  * entry tracks information relevant for this request and this peer.
104  */
105 struct GSF_RequestPlan
106 {
107
108   /**
109    * This is a doubly-linked list.
110    */
111   struct GSF_RequestPlan *next;
112
113   /**
114    * This is a doubly-linked list.
115    */
116   struct GSF_RequestPlan *prev;
117
118   /**
119    * Heap node associated with this request and this peer.
120    */
121   struct GNUNET_CONTAINER_HeapNode *hn;
122
123   /**
124    * Head of list of associated pending requests.
125    */
126   struct PendingRequestList *prl_head;
127
128   /**
129    * Tail of list of associated pending requests.
130    */
131   struct PendingRequestList *prl_tail;
132
133   /**
134    * Earliest time we'd be happy to (re)transmit this request.
135    */
136   struct GNUNET_TIME_Absolute earliest_transmission;
137
138   /**
139    * When was the last time we transmitted this request to this peer? 0 for never.
140    */
141   struct GNUNET_TIME_Absolute last_transmission;
142
143   /**
144    * Current priority for this request for this target.
145    */
146   uint64_t priority;
147
148   /**
149    * How often did we transmit this request to this peer?
150    */
151   unsigned int transmission_counter;
152
153 };
154
155
156 /**
157  * Transmission plan for a peer.
158  */
159 struct PeerPlan
160 {
161   /**
162    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
163    */
164   struct GNUNET_CONTAINER_Heap *priority_heap;
165
166   /**
167    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
168    */
169   struct GNUNET_CONTAINER_Heap *delay_heap;
170
171   /**
172    * Current transmission request handle.
173    */
174   struct GSF_PeerTransmitHandle *pth;
175
176   /**
177    * Peer for which this is the plan.
178    */
179   struct GSF_ConnectedPeer *cp;
180
181   /**
182    * Current task for executing the plan.
183    */
184   GNUNET_SCHEDULER_TaskIdentifier task;
185 };
186
187
188 /**
189  * Hash map from peer identities to PeerPlans.
190  */
191 static struct GNUNET_CONTAINER_MultiHashMap *plans;
192
193 /**
194  * Sum of all transmission counters (equals total delay for all plan entries).
195  */
196 static unsigned long long total_delay;
197
198 /**
199  * Number of plan entries.
200  */
201 static unsigned long long plan_count;
202
203
204 /**
205  * Figure out when and how to transmit to the given peer.
206  *
207  * @param cls the 'struct GSF_ConnectedPeer' for transmission
208  * @param tc scheduler context
209  */
210 static void
211 schedule_peer_transmission (void *cls,
212                             const struct GNUNET_SCHEDULER_TaskContext *tc);
213
214
215 /**
216  * Insert the given request plan into the heap with the appropriate weight.
217  *
218  * @param pp associated peer's plan
219  * @param rp request to plan
220  */
221 static void
222 plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
223 {
224   struct GSF_PendingRequestData *prd;
225   struct GNUNET_TIME_Relative delay;
226
227   GNUNET_STATISTICS_set (GSF_stats,
228                          gettext_noop ("# average retransmission delay (ms)"),
229                          total_delay * 1000LL / plan_count, GNUNET_NO);
230   prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
231   // FIXME: calculate 'rp->priority'!
232 #if 0
233   if (rp->transmission_counter < 32)
234     delay =
235         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
236                                        1LL << rp->transmission_counter);
237   else
238     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, UINT_MAX);
239 #else
240   if (rp->transmission_counter < 32)
241     delay =
242         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
243                                        rp->transmission_counter);
244   else if (rp->transmission_counter < 32)
245     delay =
246         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
247                                        32 + (1LL << rp->transmission_counter));
248   else
249     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, UINT_MAX);
250 #endif
251   rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
252 #if DEBUG_FS
253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
254               "Earliest (re)transmission for `%s' in %us\n",
255               GNUNET_h2s (&prd->query), rp->transmission_counter);
256 #endif
257
258   GNUNET_assert (rp->hn == NULL);
259   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value
260       == 0)
261     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
262   else
263     rp->hn =
264         GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp,
265                                       rp->earliest_transmission.abs_value);
266   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
267     GNUNET_SCHEDULER_cancel (pp->task);
268   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
269 }
270
271
272 /**
273  * Get the pending request with the highest TTL from the given plan.
274  *
275  * @param rp plan to investigate
276  * @return pending request with highest TTL
277  */
278 struct GSF_PendingRequest *
279 get_latest (const struct GSF_RequestPlan *rp)
280 {
281   struct GSF_PendingRequest *ret;
282   struct PendingRequestList *prl;
283
284   prl = rp->prl_head;
285   ret = prl->pr;
286   prl = prl->next;
287   while (NULL != prl)
288   {
289     if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
290         GSF_pending_request_get_data_ (ret)->ttl.abs_value)
291       ret = prl->pr;
292     prl = prl->next;
293   }
294   return ret;
295 }
296
297
298 /**
299  * Function called to get a message for transmission.
300  *
301  * @param cls closure
302  * @param buf_size number of bytes available in buf
303  * @param buf where to copy the message, NULL on error (peer disconnect)
304  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
305  */
306 static size_t
307 transmit_message_callback (void *cls, size_t buf_size, void *buf)
308 {
309   struct PeerPlan *pp = cls;
310   struct GSF_RequestPlan *rp;
311   size_t msize;
312
313   pp->pth = NULL;
314   if (NULL == buf)
315   {
316     /* failed, try again... */
317     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
318     GNUNET_STATISTICS_update (GSF_stats,
319                               gettext_noop ("# transmission failed (core has no bandwidth)"), 1,
320                               GNUNET_NO);
321     return 0;
322   }
323   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
324   if (NULL == rp)
325   {
326     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
327     return 0;
328   }
329   msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
330   if (msize > buf_size)
331   {
332     /* buffer to small (message changed), try again */
333     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
334     return 0;
335   }
336   /* remove from root, add again elsewhere... */
337   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
338   rp->hn = NULL;
339   rp->last_transmission = GNUNET_TIME_absolute_get ();
340   rp->transmission_counter++;
341   total_delay++;
342 #if DEBUG_FS
343   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
344               "Executing plan %p executed %u times, planning retransmission\n",
345               rp, rp->transmission_counter);
346 #endif
347   plan (pp, rp);
348   GNUNET_STATISTICS_update (GSF_stats,
349                             gettext_noop
350                             ("# query messages sent to other peers"), 1,
351                             GNUNET_NO);
352   return msize;
353 }
354
355
356 /**
357  * Figure out when and how to transmit to the given peer.
358  *
359  * @param cls the 'struct PeerPlan'
360  * @param tc scheduler context
361  */
362 static void
363 schedule_peer_transmission (void *cls,
364                             const struct GNUNET_SCHEDULER_TaskContext *tc)
365 {
366   struct PeerPlan *pp = cls;
367   struct GSF_RequestPlan *rp;
368   size_t msize;
369   struct GNUNET_TIME_Relative delay;
370
371   pp->task = GNUNET_SCHEDULER_NO_TASK;
372   if (pp->pth != NULL)
373   {
374     GSF_peer_transmit_cancel_ (pp->pth);
375     pp->pth = NULL;
376   }
377   /* move ready requests to priority queue */
378   while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
379          (GNUNET_TIME_absolute_get_remaining
380           (rp->earliest_transmission).rel_value == 0))
381   {
382     GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
383     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
384   }
385   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
386   {
387     /* priority heap (still) empty, check for delay... */
388     rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
389     if (NULL == rp)
390     {
391 #if DEBUG_FS
392       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No active requests for plan %p.\n",
393                   pp);
394 #endif
395       return;                   /* both queues empty */
396     }
397     delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
398 #if DEBUG_FS
399     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
400                 "Sleeping for %llu ms before retrying requests on plan %p.\n",
401                 (unsigned long long)
402                 delay.rel_value, pp);
403 #endif
404     GNUNET_STATISTICS_set (GSF_stats,
405                            gettext_noop ("# delay heap timeout"), 
406                            delay.rel_value,
407                            GNUNET_NO);
408     
409     pp->task =
410       GNUNET_SCHEDULER_add_delayed (delay,
411                                       &schedule_peer_transmission, pp);
412     return;
413   }
414   GNUNET_STATISTICS_update (GSF_stats,
415                             gettext_noop ("# query plans executed"), 1,
416                             GNUNET_NO);
417   /* process from priority heap */
418   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
419 #if DEBUG_FS > 1
420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp);
421 #endif
422   GNUNET_assert (NULL != rp);
423   msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
424   pp->pth =
425       GSF_peer_transmit_ (pp->cp, GNUNET_YES, rp->priority,
426                           GNUNET_TIME_UNIT_FOREVER_REL, msize,
427                           &transmit_message_callback, pp);
428   GNUNET_assert (NULL != pp->pth);
429 }
430
431
432 /**
433  * Closure for 'merge_pr'.
434  */
435 struct MergeContext
436 {
437
438   struct GSF_PendingRequest *pr;
439
440   int merged;
441
442 };
443
444
445 /**
446  * Iterator that checks if an equivalent request is already
447  * present for this peer.
448  *
449  * @param cls closure
450  * @param node internal node of the heap (ignored)
451  * @param element request plan stored at the node
452  * @param cost cost associated with the node (ignored)
453  * @return GNUNET_YES if we should continue to iterate,
454  *         GNUNET_NO if not (merge success)
455  */
456 static int
457 merge_pr (void *cls, struct GNUNET_CONTAINER_HeapNode *node, void *element,
458           GNUNET_CONTAINER_HeapCostType cost)
459 {
460   struct MergeContext *mpr = cls;
461   struct GSF_RequestPlan *rp = element;
462   struct GSF_PendingRequestData *prd;
463   struct GSF_RequestPlanReference *rpr;
464   struct PendingRequestList *prl;
465   struct GSF_PendingRequest *latest;
466
467   if (GNUNET_OK !=
468       GSF_pending_request_is_compatible_ (mpr->pr, rp->prl_head->pr))
469     return GNUNET_YES;
470   /* merge new request with existing request plan */
471   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
472   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
473   rpr->rp = rp;
474   rpr->prl = prl;
475   prl->rpr = rpr;
476   prl->pr = mpr->pr;
477   prd = GSF_pending_request_get_data_ (mpr->pr);
478   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
479   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
480   mpr->merged = GNUNET_YES;
481   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests merged"), 1,
482                             GNUNET_NO);
483   latest = get_latest (rp);
484   if (GSF_pending_request_get_data_ (latest)->ttl.abs_value <
485       prd->ttl.abs_value)
486   {
487     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests refreshed"),
488                               1, GNUNET_NO);
489     rp->transmission_counter = 0;       /* reset */
490   }
491   return GNUNET_NO;
492 }
493
494
495 /**
496  * Create a new query plan entry.
497  *
498  * @param cp peer with the entry
499  * @param pr request with the entry
500  */
501 void
502 GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct GSF_PendingRequest *pr)
503 {
504   struct GNUNET_PeerIdentity id;
505   struct PeerPlan *pp;
506   struct GSF_PendingRequestData *prd;
507   struct GSF_RequestPlan *rp;
508   struct GSF_RequestPlanReference *rpr;
509   struct PendingRequestList *prl;
510   struct MergeContext mpc;
511
512   GNUNET_assert (NULL != cp);
513   GSF_connected_peer_get_identity_ (cp, &id);
514   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id.hashPubKey);
515   if (NULL == pp)
516   {
517     pp = GNUNET_malloc (sizeof (struct PeerPlan));
518     pp->priority_heap =
519         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
520     pp->delay_heap =
521         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
522     pp->cp = cp;
523     GNUNET_CONTAINER_multihashmap_put (plans, &id.hashPubKey, pp,
524                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
525   }
526   mpc.merged = GNUNET_NO;
527   mpc.pr = pr;
528   /* FIXME: O(n) call here, LRN reports this is a performance
529      problem.  Try using hash map!? */
530   GNUNET_CONTAINER_heap_iterate (pp->priority_heap, &merge_pr, &mpc);
531   if (mpc.merged != GNUNET_NO)
532     return;
533   GNUNET_CONTAINER_heap_iterate (pp->delay_heap, &merge_pr, &mpc);
534   if (mpc.merged != GNUNET_NO)
535     return;
536   plan_count++;
537   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plan entries"), 1,
538                             GNUNET_NO);
539   prd = GSF_pending_request_get_data_ (pr);
540 #if DEBUG_FS
541   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
542               "Planning transmission of query `%s' to peer `%s'\n",
543               GNUNET_h2s (&prd->query), GNUNET_i2s (&id));
544 #endif
545   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
546   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
547   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
548   rpr->rp = rp;
549   rpr->prl = prl;
550   prl->rpr = rpr;
551   prl->pr = pr;
552   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
553   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
554   plan (pp, rp);
555 }
556
557
558 /**
559  * Notify the plan about a peer being no longer available;
560  * destroy all entries associated with this peer.
561  *
562  * @param cp connected peer
563  */
564 void
565 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
566 {
567   struct GNUNET_PeerIdentity id;
568   struct PeerPlan *pp;
569   struct GSF_RequestPlan *rp;
570   struct GSF_PendingRequestData *prd;
571   struct PendingRequestList *prl;
572
573   GSF_connected_peer_get_identity_ (cp, &id);
574   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id.hashPubKey);
575   if (NULL == pp)
576     return;                     /* nothing was ever planned for this peer */
577   GNUNET_assert (GNUNET_YES ==
578                  GNUNET_CONTAINER_multihashmap_remove (plans, &id.hashPubKey,
579                                                        pp));
580   if (NULL != pp->pth)
581     GSF_peer_transmit_cancel_ (pp->pth);
582   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
583   {
584     GNUNET_SCHEDULER_cancel (pp->task);
585     pp->task = GNUNET_SCHEDULER_NO_TASK;
586   }
587   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
588   {
589     while (NULL != (prl = rp->prl_head))
590     {
591       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
592       prd = GSF_pending_request_get_data_ (prl->pr);
593       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
594       GNUNET_free (prl->rpr);
595       GNUNET_free (prl);
596     }
597     GNUNET_free (rp);
598   }
599   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
600   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
601   {
602     while (NULL != (prl = rp->prl_head))
603     {
604       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
605       prd = GSF_pending_request_get_data_ (prl->pr);
606       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
607       GNUNET_free (prl->rpr);
608       GNUNET_free (prl);
609     }
610     GNUNET_free (rp);
611   }
612   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
613                          plan_count, GNUNET_NO);
614
615   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
616   GNUNET_free (pp);
617 }
618
619
620 /**
621  * Notify the plan about a request being done; destroy all entries
622  * associated with this request.
623  *
624  * @param pr request that is done
625  */
626 void
627 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
628 {
629   struct GSF_RequestPlan *rp;
630   struct GSF_PendingRequestData *prd;
631   struct GSF_RequestPlanReference *rpr;
632
633   prd = GSF_pending_request_get_data_ (pr);
634   while (NULL != (rpr = prd->rpr_head))
635   {
636     GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, rpr);
637     rp = rpr->rp;
638     GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, rpr->prl);
639     GNUNET_free (rpr->prl);
640     GNUNET_free (rpr);
641     if (rp->prl_head == 0)
642     {
643       GNUNET_CONTAINER_heap_remove_node (rp->hn);
644       plan_count--;
645       GNUNET_free (rp);
646     }
647   }
648   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
649                          plan_count, GNUNET_NO);
650 }
651
652
653 /**
654  * Initialize plan subsystem.
655  */
656 void
657 GSF_plan_init ()
658 {
659   plans = GNUNET_CONTAINER_multihashmap_create (256);
660 }
661
662
663 /**
664  * Shutdown plan subsystem.
665  */
666 void
667 GSF_plan_done ()
668 {
669   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (plans));
670   GNUNET_CONTAINER_multihashmap_destroy (plans);
671 }
672
673
674
675 /* end of gnunet-service-fs_pe.h */