comments
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32
33 /**
34  * List of GSF_PendingRequests this request plan 
35  * participates with.
36  */
37 struct PendingRequestList;
38
39
40 /**
41  * DLL of request plans a particular pending request is
42  * involved with.
43  */
44 struct GSF_RequestPlanReference
45 {
46   
47   /**
48    * This is a doubly-linked list.
49    */
50   struct GSF_RequestPlanReference *next;
51
52   /**
53    * This is a doubly-linked list.
54    */
55   struct GSF_RequestPlanReference *prev;
56
57   /**
58    * Associated request plan.
59    */
60   struct GSF_RequestPlan *rp;
61
62   /**
63    * Corresponding PendingRequestList.
64    */
65   struct PendingRequestList *prl;
66 };
67
68
69 /**
70  * List of GSF_PendingRequests this request plan 
71  * participates with.
72  */
73 struct PendingRequestList
74 {
75
76   /**
77    * This is a doubly-linked list.
78    */
79   struct PendingRequestList *next;
80
81   /**
82    * This is a doubly-linked list.
83    */
84   struct PendingRequestList *prev;
85
86   /**
87    * Array of associated pending requests.
88    */
89   struct GSF_PendingRequest *pr;
90
91   /**
92    * Corresponding GSF_RequestPlanReference.
93    */
94   struct GSF_RequestPlanReference *rpr;
95
96 };
97
98
99 /**
100  * Information we keep per request per peer.  This is a doubly-linked
101  * list (with head and tail in the 'struct GSF_PendingRequestData')
102  * with one entry in each heap of each 'struct PeerPlan'.  Each
103  * entry tracks information relevant for this request and this peer.
104  */
105 struct GSF_RequestPlan
106 {
107
108   /**
109    * This is a doubly-linked list.
110    */
111   struct GSF_RequestPlan *next;
112
113   /**
114    * This is a doubly-linked list.
115    */
116   struct GSF_RequestPlan *prev;
117
118   /**
119    * Heap node associated with this request and this peer.
120    */
121   struct GNUNET_CONTAINER_HeapNode *hn;
122
123   /**
124    * Head of list of associated pending requests.
125    */
126   struct PendingRequestList *prl_head;  
127
128   /**
129    * Tail of list of associated pending requests.
130    */
131   struct PendingRequestList *prl_tail;  
132
133   /**
134    * Earliest time we'd be happy to (re)transmit this request.
135    */
136   struct GNUNET_TIME_Absolute earliest_transmission;
137
138   /**
139    * When was the last time we transmitted this request to this peer? 0 for never.
140    */
141   struct GNUNET_TIME_Absolute last_transmission;
142
143   /**
144    * Current priority for this request for this target.
145    */
146   uint64_t priority;
147
148   /**
149    * How often did we transmit this request to this peer?
150    */
151   unsigned int transmission_counter;
152
153 };
154
155
156 /**
157  * Transmission plan for a peer.
158  */
159 struct PeerPlan
160 {
161   /**
162    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
163    */
164   struct GNUNET_CONTAINER_Heap *priority_heap;
165
166   /**
167    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
168    */
169   struct GNUNET_CONTAINER_Heap *delay_heap;
170
171   /**
172    * Current transmission request handle.
173    */
174   struct GSF_PeerTransmitHandle *pth;
175
176   /**
177    * Peer for which this is the plan.
178    */
179   struct GSF_ConnectedPeer *cp;
180
181   /**
182    * Current task for executing the plan.
183    */
184   GNUNET_SCHEDULER_TaskIdentifier task;
185 };
186
187
188 /**
189  * Hash map from peer identities to PeerPlans.
190  */
191 static struct GNUNET_CONTAINER_MultiHashMap *plans;
192
193 /**
194  * Sum of all transmission counters (equals total delay for all plan entries).
195  */
196 static unsigned long long total_delay;
197
198 /**
199  * Number of plan entries.
200  */
201 static unsigned long long plan_count;
202
203
204 /**
205  * Figure out when and how to transmit to the given peer.
206  *
207  * @param cls the 'struct GSF_ConnectedPeer' for transmission
208  * @param tc scheduler context
209  */
210 static void
211 schedule_peer_transmission (void *cls,
212                             const struct GNUNET_SCHEDULER_TaskContext *tc);
213
214
215 /**
216  * Insert the given request plan into the heap with the appropriate weight.
217  *
218  * @param pp associated peer's plan
219  * @param rp request to plan
220  */
221 static void
222 plan (struct PeerPlan *pp,
223       struct GSF_RequestPlan *rp)
224 {
225   struct GSF_PendingRequestData *prd;
226   struct GNUNET_TIME_Relative delay;
227
228   GNUNET_STATISTICS_set (GSF_stats,
229                          gettext_noop ("# average retransmission delay (ms)"),
230                          total_delay * 1000LL / plan_count,
231                          GNUNET_NO);
232   prd = GSF_pending_request_get_data_ (rp->prl_head->pr);
233   // FIXME: calculate 'rp->priority'!  
234   if (rp->transmission_counter < 32)
235     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
236                                            1LL << rp->transmission_counter);
237   else
238     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
239                                            UINT_MAX);
240   rp->earliest_transmission 
241     = GNUNET_TIME_relative_to_absolute (delay);
242 #if DEBUG_FS
243   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244               "Earliest (re)transmission for `%s' in %us\n",
245               GNUNET_h2s (&prd->query),
246               rp->transmission_counter);
247 #endif 
248
249   GNUNET_assert (rp->hn == NULL);
250   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
251     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
252                                            rp,
253                                            rp->priority);
254   else
255     rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
256                                            rp,
257                                            rp->earliest_transmission.abs_value);
258   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
259     GNUNET_SCHEDULER_cancel (pp->task);
260   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
261 }
262
263
264 /**
265  * Get the pending request with the highest TTL from the given plan.
266  *
267  * @param rp plan to investigate
268  * @return pending request with highest TTL
269  */
270 struct GSF_PendingRequest *
271 get_latest (const struct GSF_RequestPlan *rp)
272 {
273   struct GSF_PendingRequest *ret;
274   struct PendingRequestList *prl;
275
276   prl = rp->prl_head;
277   ret = prl->pr;
278   prl = prl->next;
279   while (NULL != prl)
280     {
281       if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value >
282           GSF_pending_request_get_data_ (ret)->ttl.abs_value)
283         ret = prl->pr;
284       prl = prl->next;
285     }
286   return ret;  
287 }
288
289
290 /**
291  * Function called to get a message for transmission.
292  *
293  * @param cls closure
294  * @param buf_size number of bytes available in buf
295  * @param buf where to copy the message, NULL on error (peer disconnect)
296  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
297  */
298 static size_t 
299 transmit_message_callback (void *cls,
300                            size_t buf_size,
301                            void *buf)
302 {
303   struct PeerPlan *pp = cls;
304   struct GSF_RequestPlan *rp;
305   size_t msize;
306
307   pp->pth = NULL;
308   if (NULL == buf)
309     {
310       /* failed, try again... */
311       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
312       return 0;
313     }
314   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
315   if (NULL == rp)
316     {
317       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
318       return 0;
319     }
320   msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
321   if (msize > buf_size)
322     {
323       /* buffer to small (message changed), try again */
324       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
325       return 0;
326     }
327   /* remove from root, add again elsewhere... */
328   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
329   rp->hn = NULL;
330   rp->last_transmission = GNUNET_TIME_absolute_get ();
331   rp->transmission_counter++;
332   total_delay++;
333 #if DEBUG_FS
334   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
335               "Executing plan %p executed %u times, planning retransmission\n",
336               rp,
337               rp->transmission_counter);
338 #endif    
339   plan (pp, rp);
340   GNUNET_STATISTICS_update (GSF_stats,
341                             gettext_noop ("# queries messages sent to other peers"),
342                             1,
343                             GNUNET_NO);
344   return msize;
345 }
346
347
348 /**
349  * Figure out when and how to transmit to the given peer.
350  *
351  * @param cls the 'struct PeerPlan'
352  * @param tc scheduler context
353  */
354 static void
355 schedule_peer_transmission (void *cls,
356                             const struct GNUNET_SCHEDULER_TaskContext *tc)
357 {
358   struct PeerPlan *pp = cls;
359   struct GSF_RequestPlan *rp;
360   size_t msize;
361
362   pp->task = GNUNET_SCHEDULER_NO_TASK;
363   if (pp->pth != NULL)
364     {
365       GSF_peer_transmit_cancel_ (pp->pth);
366       pp->pth = NULL;
367     }
368   /* move ready requests to priority queue */
369   while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
370           (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
371     {
372       GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
373       rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
374                                              rp, 
375                                              rp->priority);                                     
376     }   
377   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
378     {
379       /* priority heap (still) empty, check for delay... */
380       rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
381       if (NULL == rp)
382         {
383 #if DEBUG_FS
384           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
385                       "No active requests for plan %p.\n",
386                       pp);
387 #endif
388           return; /* both queues empty */
389         }
390 #if DEBUG_FS
391       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392                   "Sleeping for %llu ms before retrying requests on plan %p.\n",
393                   (unsigned long long) GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value,
394                   pp);
395 #endif
396       pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
397                                                &schedule_peer_transmission,
398                                                pp);
399       return;
400     }
401   /* process from priority heap */
402   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
403 #if DEBUG_FS > 1
404   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
405               "Executing query plan %p\n",
406               rp);
407 #endif    
408   GNUNET_assert (NULL != rp);
409   msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
410   pp->pth = GSF_peer_transmit_ (pp->cp,
411                                 GNUNET_YES,
412                                 rp->priority,
413                                 GNUNET_TIME_UNIT_FOREVER_REL,
414                                 msize,
415                                 &transmit_message_callback,
416                                 pp);
417   GNUNET_assert (NULL != pp->pth);
418 }
419
420
421 /**
422  * Closure for 'merge_pr'.
423  */
424 struct MergeContext
425 {
426
427   struct GSF_PendingRequest *pr;
428
429   int merged;
430
431 };
432
433
434 /**
435  * Iterator that checks if an equivalent request is already
436  * present for this peer.
437  *
438  * @param cls closure
439  * @param node internal node of the heap (ignored)
440  * @param element request plan stored at the node
441  * @param cost cost associated with the node (ignored)
442  * @return GNUNET_YES if we should continue to iterate,
443  *         GNUNET_NO if not (merge success)
444  */
445 static int
446 merge_pr (void *cls,
447           struct GNUNET_CONTAINER_HeapNode *node,
448           void *element,
449           GNUNET_CONTAINER_HeapCostType cost)
450 {
451   struct MergeContext *mpr = cls;
452   struct GSF_RequestPlan *rp = element;
453   struct GSF_PendingRequestData *prd;
454   struct GSF_RequestPlanReference *rpr;
455   struct PendingRequestList *prl;
456   struct GSF_PendingRequest *latest;
457
458   if (GNUNET_OK !=
459       GSF_pending_request_is_compatible_ (mpr->pr, 
460                                           rp->prl_head->pr))
461     return GNUNET_YES;
462   /* merge new request with existing request plan */
463   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));  
464   prl = GNUNET_malloc (sizeof (struct PendingRequestList));  
465   rpr->rp = rp;
466   rpr->prl = prl;
467   prl->rpr = rpr;
468   prl->pr = mpr->pr;
469   prd = GSF_pending_request_get_data_ (mpr->pr);
470   GNUNET_CONTAINER_DLL_insert (prd->rpr_head,
471                                prd->rpr_tail,
472                                rpr);
473   GNUNET_CONTAINER_DLL_insert (rp->prl_head,
474                                rp->prl_tail,
475                                prl);
476   mpr->merged = GNUNET_YES;
477   GNUNET_STATISTICS_update (GSF_stats,
478                             gettext_noop ("# requests merged"),
479                             1,
480                             GNUNET_NO);  
481   latest = get_latest (rp);
482   if (GSF_pending_request_get_data_ (latest)->ttl.abs_value < prd->ttl.abs_value)
483     {
484       GNUNET_STATISTICS_update (GSF_stats,
485                                 gettext_noop ("# requests refreshed"),
486                                 1,
487                                 GNUNET_NO);  
488       rp->transmission_counter = 0; /* reset */         
489     }
490   return GNUNET_NO;
491 }
492
493
494 /**
495  * Create a new query plan entry.
496  *
497  * @param cp peer with the entry
498  * @param pr request with the entry
499  */
500 void
501 GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
502                struct GSF_PendingRequest *pr)
503 {
504   struct GNUNET_PeerIdentity id;
505   struct PeerPlan *pp;
506   struct GSF_PendingRequestData *prd;
507   struct GSF_RequestPlan *rp;
508   struct GSF_RequestPlanReference *rpr;
509   struct PendingRequestList *prl;
510   struct MergeContext mpc;
511   size_t msize;
512
513   GNUNET_assert (NULL != cp);
514   GSF_connected_peer_get_identity_ (cp, &id);
515   pp = GNUNET_CONTAINER_multihashmap_get (plans,
516                                           &id.hashPubKey);
517   if (NULL == pp)
518     {
519       pp = GNUNET_malloc (sizeof (struct PeerPlan));
520       pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
521       pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
522       pp->cp = cp;
523       GNUNET_CONTAINER_multihashmap_put (plans,
524                                          &id.hashPubKey,
525                                          pp,
526                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
527     }
528   msize = GSF_pending_request_get_message_ (pr, 0, NULL);
529   mpc.merged = GNUNET_NO;
530   mpc.pr = pr;
531   GNUNET_CONTAINER_heap_iterate (pp->priority_heap, &merge_pr, &mpc);
532   if (mpc.merged != GNUNET_NO)
533     return;
534   GNUNET_CONTAINER_heap_iterate (pp->delay_heap, &merge_pr, &mpc);
535   if (mpc.merged != GNUNET_NO)
536     return;
537   plan_count++;
538   GNUNET_STATISTICS_update (GSF_stats,
539                             gettext_noop ("# query plan entries"),
540                             1,
541                             GNUNET_NO);  
542   prd = GSF_pending_request_get_data_ (pr);
543 #if DEBUG_FS
544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545               "Planning transmission of query `%s' to peer `%s'\n",
546               GNUNET_h2s (&prd->query),
547               GNUNET_i2s (&id));
548 #endif    
549   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
550   rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference));  
551   prl = GNUNET_malloc (sizeof (struct PendingRequestList));  
552   rpr->rp = rp;
553   rpr->prl = prl;
554   prl->rpr = rpr;
555   prl->pr = pr;
556   GNUNET_CONTAINER_DLL_insert (prd->rpr_head,
557                                prd->rpr_tail,
558                                rpr);
559   GNUNET_CONTAINER_DLL_insert (rp->prl_head,
560                                rp->prl_tail,
561                                prl);
562   plan (pp, rp);
563 }
564
565
566 /**
567  * Notify the plan about a peer being no longer available;
568  * destroy all entries associated with this peer.
569  *
570  * @param cp connected peer 
571  */
572 void
573 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
574 {
575   struct GNUNET_PeerIdentity id;
576   struct PeerPlan *pp;
577   struct GSF_RequestPlan *rp;
578   struct GSF_PendingRequestData *prd;
579   struct PendingRequestList *prl;
580
581   GSF_connected_peer_get_identity_ (cp, &id);
582   pp = GNUNET_CONTAINER_multihashmap_get (plans,
583                                           &id.hashPubKey);
584   if (NULL == pp)
585     return; /* nothing was ever planned for this peer */
586   GNUNET_assert (GNUNET_YES ==
587                  GNUNET_CONTAINER_multihashmap_remove (plans,
588                                                        &id.hashPubKey,
589                                                        pp));
590   if (NULL != pp->pth)
591     GSF_peer_transmit_cancel_ (pp->pth);
592   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
593     {
594       GNUNET_SCHEDULER_cancel (pp->task);
595       pp->task = GNUNET_SCHEDULER_NO_TASK;
596     }
597   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
598     {
599       while (NULL != (prl = rp->prl_head))
600         {
601           GNUNET_CONTAINER_DLL_remove (rp->prl_head,
602                                        rp->prl_tail,
603                                        prl);
604           prd = GSF_pending_request_get_data_ (prl->pr);
605           GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
606                                        prd->rpr_tail,
607                                        prl->rpr);
608           GNUNET_free (prl->rpr);
609           GNUNET_free (prl);
610         }
611       GNUNET_free (rp);
612     }
613   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
614   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
615     {
616       while (NULL != (prl = rp->prl_head))
617         {
618           GNUNET_CONTAINER_DLL_remove (rp->prl_head,
619                                        rp->prl_tail,
620                                        prl);
621           prd = GSF_pending_request_get_data_ (prl->pr);
622           GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
623                                        prd->rpr_tail,
624                                        prl->rpr);
625           GNUNET_free (prl->rpr);
626           GNUNET_free (prl);
627         }
628       GNUNET_free (rp);
629     }
630   GNUNET_STATISTICS_set (GSF_stats,
631                          gettext_noop ("# query plan entries"),
632                          plan_count,
633                          GNUNET_NO);
634
635   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
636   GNUNET_free (pp);
637 }
638
639
640 /**
641  * Notify the plan about a request being done; destroy all entries
642  * associated with this request.
643  *
644  * @param pr request that is done
645  */
646 void
647 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
648 {
649   struct GSF_RequestPlan *rp;
650   struct GSF_PendingRequestData *prd;
651   struct GSF_RequestPlanReference *rpr;
652
653   prd = GSF_pending_request_get_data_ (pr);
654   while (NULL != (rpr = prd->rpr_head))
655     {
656       GNUNET_CONTAINER_DLL_remove (prd->rpr_head,
657                                    prd->rpr_tail,
658                                    rpr);
659       rp = rpr->rp;
660       GNUNET_CONTAINER_DLL_remove (rp->prl_head,
661                                    rp->prl_tail,
662                                    rpr->prl);
663       GNUNET_free (rpr->prl);
664       GNUNET_free (rpr);
665       if (rp->prl_head == 0)
666         {
667           GNUNET_CONTAINER_heap_remove_node (rp->hn);
668           plan_count--;
669           GNUNET_free (rp);
670         }
671     }
672   GNUNET_STATISTICS_set (GSF_stats,
673                          gettext_noop ("# query plan entries"),
674                          plan_count,
675                          GNUNET_NO);  
676 }
677
678
679 /**
680  * Initialize plan subsystem.
681  */
682 void
683 GSF_plan_init ()
684 {
685   plans = GNUNET_CONTAINER_multihashmap_create (256);
686 }
687
688
689 /**
690  * Shutdown plan subsystem.
691  */
692 void
693 GSF_plan_done ()
694 {
695   GNUNET_assert (0 == 
696                  GNUNET_CONTAINER_multihashmap_size (plans));
697   GNUNET_CONTAINER_multihashmap_destroy (plans);
698 }
699
700
701
702 /* end of gnunet-service-fs_pe.h */