even nicer indentation, thanks to LRN's indent patch
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32
33 /**
34  * List of GSF_PendingRequests this request plan 
35  * participates with.
36  */
37 struct PendingRequestList;
38
39
40 /**
41  * DLL of request plans a particular pending request is
42  * involved with.
43  */
44 struct GSF_RequestPlanReference
45 {
46
47   /**
48    * This is a doubly-linked list.
49    */
50   struct GSF_RequestPlanReference *next;
51
52   /**
53    * This is a doubly-linked list.
54    */
55   struct GSF_RequestPlanReference *prev;
56
57   /**
58    * Associated request plan.
59    */
60   struct GSF_RequestPlan *rp;
61
62   /**
63    * Corresponding PendingRequestList.
64    */
65   struct PendingRequestList *prl;
66 };
67
68
69 /**
70  * List of GSF_PendingRequests this request plan 
71  * participates with.
72  */
73 struct PendingRequestList
74 {
75
76   /**
77    * This is a doubly-linked list.
78    */
79   struct PendingRequestList *next;
80
81   /**
82    * This is a doubly-linked list.
83    */
84   struct PendingRequestList *prev;
85
86   /**
87    * Array of associated pending requests.
88    */
89   struct GSF_PendingRequest *pr;
90
91   /**
92    * Corresponding GSF_RequestPlanReference.
93    */
94   struct GSF_RequestPlanReference *rpr;
95
96 };
97
98
99 /**
100  * Information we keep per request per peer.  This is a doubly-linked
101  * list (with head and tail in the 'struct GSF_PendingRequestData')
102  * with one entry in each heap of each 'struct PeerPlan'.  Each
103  * entry tracks information relevant for this request and this peer.
104  */
105 struct GSF_RequestPlan
106 {
107
108   /**
109    * This is a doubly-linked list.
110    */
111   struct GSF_RequestPlan *next;
112
113   /**
114    * This is a doubly-linked list.
115    */
116   struct GSF_RequestPlan *prev;
117
118   /**
119    * Heap node associated with this request and this peer.
120    */
121   struct GNUNET_CONTAINER_HeapNode *hn;
122
123   /**
124    * Head of list of associated pending requests.
125    */
126   struct PendingRequestList *prl_head;
127
128   /**
129    * Tail of list of associated pending requests.
130    */
131   struct PendingRequestList *prl_tail;
132
133   /**
134    * Earliest time we'd be happy to (re)transmit this request.
135    */
136   struct GNUNET_TIME_Absolute earliest_transmission;
137
138   /**
139    * When was the last time we transmitted this request to this peer? 0 for never.
140    */
141   struct GNUNET_TIME_Absolute last_transmission;
142
143   /**
144    * Current priority for this request for this target.
145    */
146   uint64_t priority;
147
148   /**
149    * How often did we transmit this request to this peer?
150    */
151   unsigned int transmission_counter;
152
153 };
154
155
156 /**
157  * Transmission plan for a peer.
158  */
159 struct PeerPlan
160 {
161   /**
162    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
163    */
164   struct GNUNET_CONTAINER_Heap *priority_heap;
165
166   /**
167    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
168    */
169   struct GNUNET_CONTAINER_Heap *delay_heap;
170
171   /**
172    * Current transmission request handle.
173    */
174   struct GSF_PeerTransmitHandle *pth;
175
176   /**
177    * Peer for which this is the plan.
178    */
179   struct GSF_ConnectedPeer *cp;
180
181   /**
182    * Current task for executing the plan.
183    */
184   GNUNET_SCHEDULER_TaskIdentifier task;
185 };
186
187
188 /**
189  * Hash map from peer identities to PeerPlans.
190  */
191 static struct GNUNET_CONTAINER_MultiHashMap *plans;
192
193 /**
194  * Sum of all transmission counters (equals total delay for all plan entries).
195  */
196 static unsigned long long total_delay;
197
198 /**
199  * Number of plan entries.
200  */
201 static unsigned long long plan_count;
202
203
204 /**
205  * Figure out when and how to transmit to the given peer.
206  *
207  * @param cls the 'struct GSF_ConnectedPeer' for transmission
208  * @param tc scheduler context
209  */
210 static void
211 schedule_peer_transmission (void *cls,
212                             const struct GNUNET_SCHEDULER_TaskContext *tc);
213
214
215 /**
216  * Insert the given request plan into the heap with the appropriate weight.
217  *
218  * @param pp associated peer's plan
219  * @param rp request to plan
220  */
221 static void
222 plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
223 {
224   struct GSF_PendingRequestData *prd;
225   struct GNUNET_TIME_Relative delay;
226
227   GNUNET_STATISTICS_set (GSF_stats,
228                          gettext_noop ("# average retransmission delay (ms)"),
229                          total_delay * 1000LL / plan_count, GNUNET_NO);
230   prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
231   // FIXME: calculate 'rp->priority'!  
232   if (rp->transmission_counter < 32)
233     delay =
234         GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
235                                        1LL << rp->transmission_counter);
236   else
237     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, UINT_MAX);
238   rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
239 #if DEBUG_FS
240   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
241               "Earliest (re)transmission for `%s' in %us\n",
242               GNUNET_h2s (&prd->query), rp->transmission_counter);
243 #endif
244
245   GNUNET_assert (rp->hn == NULL);
246   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value
247       == 0)
248     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
249   else
250     rp->hn =
251         GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp,
252                                       rp->earliest_transmission.abs_value);
253   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
254     GNUNET_SCHEDULER_cancel (pp->task);
255   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
256 }
257
258
259 /**
260  * Get the pending request with the highest TTL from the given plan.
261  *
262  * @param rp plan to investigate
263  * @return pending request with highest TTL
264  */
265 struct GSF_PendingRequest *
266 get_latest (const struct GSF_RequestPlan *rp)
267 {
268   struct GSF_PendingRequest *ret;
269   struct PendingRequestList *prl;
270
271   prl = rp->prl_head;
272   ret = prl->pr;
273   prl = prl->next;
274   while (NULL != prl)
275   {
276     if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
277         GSF_pending_request_get_data_ (ret)->ttl.abs_value)
278       ret = prl->pr;
279     prl = prl->next;
280   }
281   return ret;
282 }
283
284
285 /**
286  * Function called to get a message for transmission.
287  *
288  * @param cls closure
289  * @param buf_size number of bytes available in buf
290  * @param buf where to copy the message, NULL on error (peer disconnect)
291  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
292  */
293 static size_t
294 transmit_message_callback (void *cls, size_t buf_size, void *buf)
295 {
296   struct PeerPlan *pp = cls;
297   struct GSF_RequestPlan *rp;
298   size_t msize;
299
300   pp->pth = NULL;
301   if (NULL == buf)
302   {
303     /* failed, try again... */
304     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
305     return 0;
306   }
307   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
308   if (NULL == rp)
309   {
310     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
311     return 0;
312   }
313   msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
314   if (msize > buf_size)
315   {
316     /* buffer to small (message changed), try again */
317     pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
318     return 0;
319   }
320   /* remove from root, add again elsewhere... */
321   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
322   rp->hn = NULL;
323   rp->last_transmission = GNUNET_TIME_absolute_get ();
324   rp->transmission_counter++;
325   total_delay++;
326 #if DEBUG_FS
327   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
328               "Executing plan %p executed %u times, planning retransmission\n",
329               rp, rp->transmission_counter);
330 #endif
331   plan (pp, rp);
332   GNUNET_STATISTICS_update (GSF_stats,
333                             gettext_noop
334                             ("# queries messages sent to other peers"), 1,
335                             GNUNET_NO);
336   return msize;
337 }
338
339
340 /**
341  * Figure out when and how to transmit to the given peer.
342  *
343  * @param cls the 'struct PeerPlan'
344  * @param tc scheduler context
345  */
346 static void
347 schedule_peer_transmission (void *cls,
348                             const struct GNUNET_SCHEDULER_TaskContext *tc)
349 {
350   struct PeerPlan *pp = cls;
351   struct GSF_RequestPlan *rp;
352   size_t msize;
353
354   pp->task = GNUNET_SCHEDULER_NO_TASK;
355   if (pp->pth != NULL)
356   {
357     GSF_peer_transmit_cancel_ (pp->pth);
358     pp->pth = NULL;
359   }
360   /* move ready requests to priority queue */
361   while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
362          (GNUNET_TIME_absolute_get_remaining
363           (rp->earliest_transmission).rel_value == 0))
364   {
365     GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
366     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
367   }
368   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
369   {
370     /* priority heap (still) empty, check for delay... */
371     rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
372     if (NULL == rp)
373     {
374 #if DEBUG_FS
375       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No active requests for plan %p.\n",
376                   pp);
377 #endif
378       return;                   /* both queues empty */
379     }
380 #if DEBUG_FS
381     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
382                 "Sleeping for %llu ms before retrying requests on plan %p.\n",
383                 (unsigned long long)
384                 GNUNET_TIME_absolute_get_remaining
385                 (rp->earliest_transmission).rel_value, pp);
386 #endif
387     pp->task =
388         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
389                                       (rp->earliest_transmission),
390                                       &schedule_peer_transmission, pp);
391     return;
392   }
393   /* process from priority heap */
394   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
395 #if DEBUG_FS > 1
396   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp);
397 #endif
398   GNUNET_assert (NULL != rp);
399   msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
400   pp->pth =
401       GSF_peer_transmit_ (pp->cp, GNUNET_YES, rp->priority,
402                           GNUNET_TIME_UNIT_FOREVER_REL, msize,
403                           &transmit_message_callback, pp);
404   GNUNET_assert (NULL != pp->pth);
405 }
406
407
408 /**
409  * Closure for 'merge_pr'.
410  */
411 struct MergeContext
412 {
413
414   struct GSF_PendingRequest *pr;
415
416   int merged;
417
418 };
419
420
421 /**
422  * Iterator that checks if an equivalent request is already
423  * present for this peer.
424  *
425  * @param cls closure
426  * @param node internal node of the heap (ignored)
427  * @param element request plan stored at the node
428  * @param cost cost associated with the node (ignored)
429  * @return GNUNET_YES if we should continue to iterate,
430  *         GNUNET_NO if not (merge success)
431  */
432 static int
433 merge_pr (void *cls, struct GNUNET_CONTAINER_HeapNode *node, void *element,
434           GNUNET_CONTAINER_HeapCostType cost)
435 {
436   struct MergeContext *mpr = cls;
437   struct GSF_RequestPlan *rp = element;
438   struct GSF_PendingRequestData *prd;
439   struct GSF_RequestPlanReference *rpr;
440   struct PendingRequestList *prl;
441   struct GSF_PendingRequest *latest;
442
443   if (GNUNET_OK !=
444       GSF_pending_request_is_compatible_ (mpr->pr, rp->prl_head->pr))
445     return GNUNET_YES;
446   /* merge new request with existing request plan */
447   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
448   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
449   rpr->rp = rp;
450   rpr->prl = prl;
451   prl->rpr = rpr;
452   prl->pr = mpr->pr;
453   prd = GSF_pending_request_get_data_ (mpr->pr);
454   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
455   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
456   mpr->merged = GNUNET_YES;
457   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests merged"), 1,
458                             GNUNET_NO);
459   latest = get_latest (rp);
460   if (GSF_pending_request_get_data_ (latest)->ttl.abs_value <
461       prd->ttl.abs_value)
462   {
463     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests refreshed"),
464                               1, GNUNET_NO);
465     rp->transmission_counter = 0;       /* reset */
466   }
467   return GNUNET_NO;
468 }
469
470
471 /**
472  * Create a new query plan entry.
473  *
474  * @param cp peer with the entry
475  * @param pr request with the entry
476  */
477 void
478 GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct GSF_PendingRequest *pr)
479 {
480   struct GNUNET_PeerIdentity id;
481   struct PeerPlan *pp;
482   struct GSF_PendingRequestData *prd;
483   struct GSF_RequestPlan *rp;
484   struct GSF_RequestPlanReference *rpr;
485   struct PendingRequestList *prl;
486   struct MergeContext mpc;
487
488   GNUNET_assert (NULL != cp);
489   GSF_connected_peer_get_identity_ (cp, &id);
490   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id.hashPubKey);
491   if (NULL == pp)
492   {
493     pp = GNUNET_malloc (sizeof (struct PeerPlan));
494     pp->priority_heap =
495         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
496     pp->delay_heap =
497         GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
498     pp->cp = cp;
499     GNUNET_CONTAINER_multihashmap_put (plans, &id.hashPubKey, pp,
500                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
501   }
502   mpc.merged = GNUNET_NO;
503   mpc.pr = pr;
504   GNUNET_CONTAINER_heap_iterate (pp->priority_heap, &merge_pr, &mpc);
505   if (mpc.merged != GNUNET_NO)
506     return;
507   GNUNET_CONTAINER_heap_iterate (pp->delay_heap, &merge_pr, &mpc);
508   if (mpc.merged != GNUNET_NO)
509     return;
510   plan_count++;
511   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plan entries"), 1,
512                             GNUNET_NO);
513   prd = GSF_pending_request_get_data_ (pr);
514 #if DEBUG_FS
515   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
516               "Planning transmission of query `%s' to peer `%s'\n",
517               GNUNET_h2s (&prd->query), GNUNET_i2s (&id));
518 #endif
519   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
520   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));
521   prl = GNUNET_malloc (sizeof (struct PendingRequestList));
522   rpr->rp = rp;
523   rpr->prl = prl;
524   prl->rpr = rpr;
525   prl->pr = pr;
526   GNUNET_CONTAINER_DLL_insert (prd->rpr_head, prd->rpr_tail, rpr);
527   GNUNET_CONTAINER_DLL_insert (rp->prl_head, rp->prl_tail, prl);
528   plan (pp, rp);
529 }
530
531
532 /**
533  * Notify the plan about a peer being no longer available;
534  * destroy all entries associated with this peer.
535  *
536  * @param cp connected peer 
537  */
538 void
539 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
540 {
541   struct GNUNET_PeerIdentity id;
542   struct PeerPlan *pp;
543   struct GSF_RequestPlan *rp;
544   struct GSF_PendingRequestData *prd;
545   struct PendingRequestList *prl;
546
547   GSF_connected_peer_get_identity_ (cp, &id);
548   pp = GNUNET_CONTAINER_multihashmap_get (plans, &id.hashPubKey);
549   if (NULL == pp)
550     return;                     /* nothing was ever planned for this peer */
551   GNUNET_assert (GNUNET_YES ==
552                  GNUNET_CONTAINER_multihashmap_remove (plans, &id.hashPubKey,
553                                                        pp));
554   if (NULL != pp->pth)
555     GSF_peer_transmit_cancel_ (pp->pth);
556   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
557   {
558     GNUNET_SCHEDULER_cancel (pp->task);
559     pp->task = GNUNET_SCHEDULER_NO_TASK;
560   }
561   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
562   {
563     while (NULL != (prl = rp->prl_head))
564     {
565       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
566       prd = GSF_pending_request_get_data_ (prl->pr);
567       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
568       GNUNET_free (prl->rpr);
569       GNUNET_free (prl);
570     }
571     GNUNET_free (rp);
572   }
573   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
574   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
575   {
576     while (NULL != (prl = rp->prl_head))
577     {
578       GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, prl);
579       prd = GSF_pending_request_get_data_ (prl->pr);
580       GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, prl->rpr);
581       GNUNET_free (prl->rpr);
582       GNUNET_free (prl);
583     }
584     GNUNET_free (rp);
585   }
586   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
587                          plan_count, GNUNET_NO);
588
589   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
590   GNUNET_free (pp);
591 }
592
593
594 /**
595  * Notify the plan about a request being done; destroy all entries
596  * associated with this request.
597  *
598  * @param pr request that is done
599  */
600 void
601 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
602 {
603   struct GSF_RequestPlan *rp;
604   struct GSF_PendingRequestData *prd;
605   struct GSF_RequestPlanReference *rpr;
606
607   prd = GSF_pending_request_get_data_ (pr);
608   while (NULL != (rpr = prd->rpr_head))
609   {
610     GNUNET_CONTAINER_DLL_remove (prd->rpr_head, prd->rpr_tail, rpr);
611     rp = rpr->rp;
612     GNUNET_CONTAINER_DLL_remove (rp->prl_head, rp->prl_tail, rpr->prl);
613     GNUNET_free (rpr->prl);
614     GNUNET_free (rpr);
615     if (rp->prl_head == 0)
616     {
617       GNUNET_CONTAINER_heap_remove_node (rp->hn);
618       plan_count--;
619       GNUNET_free (rp);
620     }
621   }
622   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
623                          plan_count, GNUNET_NO);
624 }
625
626
627 /**
628  * Initialize plan subsystem.
629  */
630 void
631 GSF_plan_init ()
632 {
633   plans = GNUNET_CONTAINER_multihashmap_create (256);
634 }
635
636
637 /**
638  * Shutdown plan subsystem.
639  */
640 void
641 GSF_plan_done ()
642 {
643   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (plans));
644   GNUNET_CONTAINER_multihashmap_destroy (plans);
645 }
646
647
648
649 /* end of gnunet-service-fs_pe.h */