indentation
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32
33 /**
34  * List of GSF_PendingRequests this request plan 
35  * participates with.
36  */
37 struct PendingRequestList;
38
39
40 /**
41  * DLL of request plans a particular pending request is
42  * involved with.
43  */
44 struct GSF_RequestPlanReference
45 {
46
47   /**
48    * This is a doubly-linked list.
49    */
50   struct GSF_RequestPlanReference *next;
51
52   /**
53    * This is a doubly-linked list.
54    */
55   struct GSF_RequestPlanReference *prev;
56
57   /**
58    * Associated request plan.
59    */
60   struct GSF_RequestPlan *rp;
61
62   /**
63    * Corresponding PendingRequestList.
64    */
65   struct PendingRequestList *prl;
66 };
67
68
69 /**
70  * List of GSF_PendingRequests this request plan 
71  * participates with.
72  */
73 struct PendingRequestList
74 {
75
76   /**
77    * This is a doubly-linked list.
78    */
79   struct PendingRequestList *next;
80
81   /**
82    * This is a doubly-linked list.
83    */
84   struct PendingRequestList *prev;
85
86   /**
87    * Array of associated pending requests.
88    */
89   struct GSF_PendingRequest *pr;
90
91   /**
92    * Corresponding GSF_RequestPlanReference.
93    */
94   struct GSF_RequestPlanReference *rpr;
95
96 };
97
98
99 /**
100  * Information we keep per request per peer.  This is a doubly-linked
101  * list (with head and tail in the 'struct GSF_PendingRequestData')
102  * with one entry in each heap of each 'struct PeerPlan'.  Each
103  * entry tracks information relevant for this request and this peer.
104  */
105 struct GSF_RequestPlan
106 {
107
108   /**
109    * This is a doubly-linked list.
110    */
111   struct GSF_RequestPlan *next;
112
113   /**
114    * This is a doubly-linked list.
115    */
116   struct GSF_RequestPlan *prev;
117
118   /**
119    * Heap node associated with this request and this peer.
120    */
121   struct GNUNET_CONTAINER_HeapNode *hn;
122
123   /**
124    * Head of list of associated pending requests.
125    */
126   struct PendingRequestList *prl_head;
127
128   /**
129    * Tail of list of associated pending requests.
130    */
131   struct PendingRequestList *prl_tail;
132
133   /**
134    * Earliest time we'd be happy to (re)transmit this request.
135    */
136   struct GNUNET_TIME_Absolute earliest_transmission;
137
138   /**
139    * When was the last time we transmitted this request to this peer? 0 for never.
140    */
141   struct GNUNET_TIME_Absolute last_transmission;
142
143   /**
144    * Current priority for this request for this target.
145    */
146   uint64_t priority;
147
148   /**
149    * How often did we transmit this request to this peer?
150    */
151   unsigned int transmission_counter;
152
153 };
154
155
156 /**
157  * Transmission plan for a peer.
158  */
159 struct PeerPlan
160 {
161   /**
162    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
163    */
164   struct GNUNET_CONTAINER_Heap *priority_heap;
165
166   /**
167    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
168    */
169   struct GNUNET_CONTAINER_Heap *delay_heap;
170
171   /**
172    * Current transmission request handle.
173    */
174   struct GSF_PeerTransmitHandle *pth;
175
176   /**
177    * Peer for which this is the plan.
178    */
179   struct GSF_ConnectedPeer *cp;
180
181   /**
182    * Current task for executing the plan.
183    */
184   GNUNET_SCHEDULER_TaskIdentifier task;
185 };
186
187
188 /**
189  * Hash map from peer identities to PeerPlans.
190  */
191 static struct GNUNET_CONTAINER_MultiHashMap *plans;
192
193 /**
194  * Sum of all transmission counters (equals total delay for all plan entries).
195  */
196 static unsigned long long total_delay;
197
198 /**
199  * Number of plan entries.
200  */
201 static unsigned long long plan_count;
202
203
204 /**
205  * Figure out when and how to transmit to the given peer.
206  *
207  * @param cls the 'struct GSF_ConnectedPeer' for transmission
208  * @param tc scheduler context
209  */
210 static void
211 schedule_peer_transmission (void *cls,
212                             const struct GNUNET_SCHEDULER_TaskContext *tc);
213
214
215 /**
216  * Insert the given request plan into the heap with the appropriate weight.
217  *
218  * @param pp associated peer's plan
219  * @param rp request to plan
220  */
221 static void
222 plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
223 {
224   struct GSF_PendingRequestData *prd;
225   struct GNUNET_TIME_Relative delay;
226
227   GNUNET_STATISTICS_set (GSF_stats,
228                          gettext_noop ("# average retransmission delay (ms)"),
229                          total_delay * 1000LL / plan_count, GNUNET_NO);
230   prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
231   // FIXME: calculate 'rp->priority'!  
232   if (rp->transmission_counter < 32)
233     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
234                                            1LL << rp->transmission_counter);
235   else
236     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, UINT_MAX);
237   rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
238 #if DEBUG_FS
239   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
240               "Earliest (re)transmission for `%s' in %us\n",
241               GNUNET_h2s (&prd->query), rp->transmission_counter);
242 #endif
243
244   GNUNET_assert (rp->hn == NULL);
245   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value
246       == 0)
247     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
248   else
249     rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
250                                            rp,
251                                            rp->earliest_transmission.abs_value);
252   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
253     GNUNET_SCHEDULER_cancel (pp->task);
254   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
255 }
256
257
258 /**
259  * Get the pending request with the highest TTL from the given plan.
260  *
261  * @param rp plan to investigate
262  * @return pending request with highest TTL
263  */
264 struct GSF_PendingRequest *
265 get_latest (const struct GSF_RequestPlan *rp)
266 {
267   struct GSF_PendingRequest *ret;
268   struct PendingRequestList *prl;
269
270   prl = rp->prl_head;
271   ret = prl->pr;
272   prl = prl->next;
273   while (NULL != prl)
274   {
275     if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
276         GSF_pending_request_get_data_ (ret)->ttl.abs_value)
277       ret = prl->pr;
278     prl = prl->next;
279   }
280   return ret;
281 }
282
283
284 /**
285  * Function called to get a message for transmission.
286  *
287  * @param cls closure
288  * @param buf_size number of bytes available in buf
289  * @param buf where to copy the message, NULL on error (peer disconnect)
290  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
291  */
292 static size_t
293 transmit_message_callback (void *cls, size_t buf_size, void *buf)
294 {
295   struct PeerPlan *pp = cls;
296   struct GSF_RequestPlan *rp;
297   size_t msize;
298
299   pp->pth = NULL;
300   if (NULL == buf)
301   {
302     /* failed, try again... */
303     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
304     return 0;
305   }
306   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
307   if (NULL == rp)
308   {
309     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
310     return 0;
311   }
312   msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
313   if (msize > buf_size)
314   {
315     /* buffer to small (message changed), try again */
316     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
317     return 0;
318   }
319   /* remove from root, add again elsewhere... */
320   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
321   rp->hn = NULL;
322   rp->last_transmission = GNUNET_TIME_absolute_get ();
323   rp->transmission_counter++;
324   total_delay++;
325 #if DEBUG_FS
326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327               "Executing plan %p executed %u times, planning retransmission\n",
328               rp, rp->transmission_counter);
329 #endif
330   plan (pp, rp);
331   GNUNET_STATISTICS_update (GSF_stats,
332                             gettext_noop
333                             ("# queries messages sent to other peers"), 1,
334                             GNUNET_NO);
335   return msize;
336 }
337
338
339 /**
340  * Figure out when and how to transmit to the given peer.
341  *
342  * @param cls the 'struct PeerPlan'
343  * @param tc scheduler context
344  */
345 static void
346 schedule_peer_transmission (void *cls,
347                             const struct GNUNET_SCHEDULER_TaskContext *tc)
348 {
349   struct PeerPlan *pp = cls;
350   struct GSF_RequestPlan *rp;
351   size_t msize;
352
353   pp->task = GNUNET_SCHEDULER_NO_TASK;
354   if (pp->pth != NULL)
355   {
356     GSF_peer_transmit_cancel_ (pp->pth);
357     pp->pth = NULL;
358   }
359   /* move ready requests to priority queue */
360   while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
361          (GNUNET_TIME_absolute_get_remaining
362           (rp->earliest_transmission).rel_value == 0))
363   {
364     GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
365     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
366   }
367   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
368   {
369     /* priority heap (still) empty, check for delay... */
370     rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
371     if (NULL == rp)
372     {
373 #if DEBUG_FS
374       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
375                   "No active requests for plan %p.\n", pp);
376 #endif
377       return;                   /* both queues empty */
378     }
379 #if DEBUG_FS
380     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
381                 "Sleeping for %llu ms before retrying requests on plan %p.\n",
382                 (unsigned long long)
383                 GNUNET_TIME_absolute_get_remaining
384                 (rp->earliest_transmission).rel_value, pp);
385 #endif
386     pp->task =
387         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
388                                       (rp->earliest_transmission),
389                                       &schedule_peer_transmission, pp);
390     return;
391   }
392   /* process from priority heap */
393   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
394 #if DEBUG_FS > 1
395   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp);
396 #endif
397   GNUNET_assert (NULL != rp);
398   msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
399   pp->pth = GSF_peer_transmit_ (pp->cp,
400                                 GNUNET_YES,
401                                 rp->priority,
402                                 GNUNET_TIME_UNIT_FOREVER_REL,
403                                 msize, &transmit_message_callback, pp);
404   GNUNET_assert (NULL != pp->pth);
405 }
406
407
408 /**
409  * Closure for 'merge_pr'.
410  */
411 struct MergeContext
412 {
413
414   struct GSF_PendingRequest *pr;
415
416   int merged;
417
418 };
419
420
421 /**
422  * Iterator that checks if an equivalent request is already
423  * present for this peer.
424  *
425  * @param cls closure
426  * @param node internal node of the heap (ignored)
427  * @param element request plan stored at the node
428  * @param cost cost associated with the node (ignored)
429  * @return GNUNET_YES if we should continue to iterate,
430  *         GNUNET_NO if not (merge success)
431  */
432 static int
433 merge_pr (void *cls,
434           struct GNUNET_CONTAINER_HeapNode *node,
435           void *element, GNUNET_CONTAINER_HeapCostType cost)
436 {
437   struct MergeContext *mpr = cls;
438   struct GSF_RequestPlan *rp = element;
439   struct GSF_PendingRequestData *prd;
440   struct GSF_RequestPlanReference *rpr;
441   struct PendingRequestList *prl;
442   struct GSF_PendingRequest *latest;
443
444   if (GNUNET_OK !=
445       GSF_pending_request_is_compatible_ (mpr->pr, rp->prl_head->pr))
446     return GNUNET_YES;
447   /* merge new request with existing request plan */
448   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
449   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
450   rpr->rp = rp;
451   rpr->prl = prl;
452   prl->rpr = rpr;
453   prl->pr = mpr->pr;
454   prd = GSF_pending_request_get_data_ (mpr->pr);
455   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
456   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
457   mpr->merged = GNUNET_YES;
458   GNUNET_STATISTICS_update (GSF_stats,
459                             gettext_noop ("# requests merged"), 1, GNUNET_NO);
460   latest = get_latest (rp);
461   if (GSF_pending_request_get_data_ (latest)->ttl.abs_value <
462       prd->ttl.abs_value)
463   {
464     GNUNET_STATISTICS_update (GSF_stats,
465                               gettext_noop ("# requests refreshed"),
466                               1, GNUNET_NO);
467     rp->transmission_counter = 0;       /* reset */
468   }
469   return GNUNET_NO;
470 }
471
472
473 /**
474  * Create a new query plan entry.
475  *
476  * @param cp peer with the entry
477  * @param pr request with the entry
478  */
479 void
480 GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct GSF_PendingRequest *pr)
481 {
482   struct GNUNET_PeerIdentity id;
483   struct PeerPlan *pp;
484   struct GSF_PendingRequestData *prd;
485   struct GSF_RequestPlan *rp;
486   struct GSF_RequestPlanReference *rpr;
487   struct PendingRequestList *prl;
488   struct MergeContext mpc;
489
490   GNUNET_assert (NULL != cp);
491   GSF_connected_peer_get_identity_ (cp, &id);
492   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id.hashPubKey);
493   if (NULL == pp)
494   {
495     pp = GNUNET_malloc (sizeof (struct PeerPlan));
496     pp->priority_heap =
497         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
498     pp->delay_heap =
499         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
500     pp->cp = cp;
501     GNUNET_CONTAINER_multihashmap_put (plans,
502                                        &id.hashPubKey,
503                                        pp,
504                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
505   }
506   mpc.merged = GNUNET_NO;
507   mpc.pr = pr;
508   GNUNET_CONTAINER_heap_iterate (pp->priority_heap, &merge_pr, &mpc);
509   if (mpc.merged != GNUNET_NO)
510     return;
511   GNUNET_CONTAINER_heap_iterate (pp->delay_heap, &merge_pr, &mpc);
512   if (mpc.merged != GNUNET_NO)
513     return;
514   plan_count++;
515   GNUNET_STATISTICS_update (GSF_stats,
516                             gettext_noop ("# query plan entries"),
517                             1, GNUNET_NO);
518   prd = GSF_pending_request_get_data_ (pr);
519 #if DEBUG_FS
520   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521               "Planning transmission of query `%s' to peer `%s'\n",
522               GNUNET_h2s (&prd->query), GNUNET_i2s (&id));
523 #endif
524   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
525   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
526   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
527   rpr->rp = rp;
528   rpr->prl = prl;
529   prl->rpr = rpr;
530   prl->pr = pr;
531   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
532   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
533   plan (pp, rp);
534 }
535
536
537 /**
538  * Notify the plan about a peer being no longer available;
539  * destroy all entries associated with this peer.
540  *
541  * @param cp connected peer 
542  */
543 void
544 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
545 {
546   struct GNUNET_PeerIdentity id;
547   struct PeerPlan *pp;
548   struct GSF_RequestPlan *rp;
549   struct GSF_PendingRequestData *prd;
550   struct PendingRequestList *prl;
551
552   GSF_connected_peer_get_identity_ (cp, &id);
553   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id.hashPubKey);
554   if (NULL == pp)
555     return;                     /* nothing was ever planned for this peer */
556   GNUNET_assert (GNUNET_YES ==
557                  GNUNET_CONTAINER_multihashmap_remove (plans,
558                                                        &id.hashPubKey, pp));
559   if (NULL != pp->pth)
560     GSF_peer_transmit_cancel_ (pp->pth);
561   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
562   {
563     GNUNET_SCHEDULER_cancel (pp->task);
564     pp->task = GNUNET_SCHEDULER_NO_TASK;
565   }
566   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
567   {
568     while (NULL != (prl = rp->prl_head))
569     {
570       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
571       prd = GSF_pending_request_get_data_ (prl->pr);
572       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
573       GNUNET_free (prl->rpr);
574       GNUNET_free (prl);
575     }
576     GNUNET_free (rp);
577   }
578   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
579   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
580   {
581     while (NULL != (prl = rp->prl_head))
582     {
583       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
584       prd = GSF_pending_request_get_data_ (prl->pr);
585       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
586       GNUNET_free (prl->rpr);
587       GNUNET_free (prl);
588     }
589     GNUNET_free (rp);
590   }
591   GNUNET_STATISTICS_set (GSF_stats,
592                          gettext_noop ("# query plan entries"),
593                          plan_count, GNUNET_NO);
594
595   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
596   GNUNET_free (pp);
597 }
598
599
600 /**
601  * Notify the plan about a request being done; destroy all entries
602  * associated with this request.
603  *
604  * @param pr request that is done
605  */
606 void
607 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
608 {
609   struct GSF_RequestPlan *rp;
610   struct GSF_PendingRequestData *prd;
611   struct GSF_RequestPlanReference *rpr;
612
613   prd = GSF_pending_request_get_data_ (pr);
614   while (NULL != (rpr = prd->rpr_head))
615   {
616     GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, rpr);
617     rp = rpr->rp;
618     GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, rpr->prl);
619     GNUNET_free (rpr->prl);
620     GNUNET_free (rpr);
621     if (rp->prl_head == 0)
622     {
623       GNUNET_CONTAINER_heap_remove_node (rp->hn);
624       plan_count--;
625       GNUNET_free (rp);
626     }
627   }
628   GNUNET_STATISTICS_set (GSF_stats,
629                          gettext_noop ("# query plan entries"),
630                          plan_count, GNUNET_NO);
631 }
632
633
634 /**
635  * Initialize plan subsystem.
636  */
637 void
638 GSF_plan_init ()
639 {
640   plans = GNUNET_CONTAINER_multihashmap_create (256);
641 }
642
643
644 /**
645  * Shutdown plan subsystem.
646  */
647 void
648 GSF_plan_done ()
649 {
650   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (plans));
651   GNUNET_CONTAINER_multihashmap_destroy (plans);
652 }
653
654
655
656 /* end of gnunet-service-fs_pe.h */