arg
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32
33 /**
34  * Information we keep per request per peer.  This is a doubly-linked
35  * list (with head and tail in the 'struct GSF_PendingRequestData')
36  * with one entry in each heap of each 'struct PeerPlan'.  Each
37  * entry tracks information relevant for this request and this peer.
38  */
39 struct GSF_RequestPlan
40 {
41
42   /**
43    * This is a doubly-linked list.
44    */
45   struct GSF_RequestPlan *next;
46
47   /**
48    * This is a doubly-linked list.
49    */
50   struct GSF_RequestPlan *prev;
51
52   /**
53    * Heap node associated with this request and this peer.
54    */
55   struct GNUNET_CONTAINER_HeapNode *hn;
56
57   /**
58    * Associated pending request.
59    */
60   struct GSF_PendingRequest *pr;
61
62   /**
63    * Earliest time we'd be happy to (re)transmit this request.
64    */
65   struct GNUNET_TIME_Absolute earliest_transmission;
66
67   /**
68    * When was the last time we transmitted this request to this peer? 0 for never.
69    */
70   struct GNUNET_TIME_Absolute last_transmission;
71
72   /**
73    * Current priority for this request for this target.
74    */
75   uint64_t priority;
76
77   /**
78    * How often did we transmit this request to this peer?
79    */
80   unsigned int transmission_counter;
81
82 };
83
84
85 /**
86  * Transmission plan for a peer.
87  */
88 struct PeerPlan
89 {
90   /**
91    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
92    */
93   struct GNUNET_CONTAINER_Heap *priority_heap;
94
95   /**
96    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
97    */
98   struct GNUNET_CONTAINER_Heap *delay_heap;
99
100   /**
101    * Current transmission request handle.
102    */
103   struct GSF_PeerTransmitHandle *pth;
104
105   /**
106    * Peer for which this is the plan.
107    */
108   struct GSF_ConnectedPeer *cp;
109
110   /**
111    * Current task for executing the plan.
112    */
113   GNUNET_SCHEDULER_TaskIdentifier task;
114 };
115
116
117 /**
118  * Hash map from peer identities to PeerPlans.
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *plans;
121
122 /**
123  * Sum of all transmission counters (equals total delay for all plan entries).
124  */
125 static unsigned long long total_delay;
126
127 /**
128  * Number of plan entries.
129  */
130 static unsigned long long plan_count;
131
132
133 /**
134  * Figure out when and how to transmit to the given peer.
135  *
136  * @param cls the 'struct GSF_ConnectedPeer' for transmission
137  * @param tc scheduler context
138  */
139 static void
140 schedule_peer_transmission (void *cls,
141                             const struct GNUNET_SCHEDULER_TaskContext *tc);
142
143
144 /**
145  * Insert the given request plan into the heap with the appropriate weight.
146  *
147  * @param pp associated peer's plan
148  * @param rp request to plan
149  */
150 static void
151 plan (struct PeerPlan *pp,
152       struct GSF_RequestPlan *rp)
153 {
154   struct GSF_PendingRequestData *prd;
155   struct GNUNET_TIME_Relative delay;
156
157   GNUNET_STATISTICS_set (GSF_stats,
158                          gettext_noop ("# average retransmission delay (ms)"),
159                          total_delay * 1000LL / plan_count,
160                          GNUNET_NO);
161   prd = GSF_pending_request_get_data_ (rp->pr);
162   // FIXME: calculate 'rp->earliest_transmission'!
163   // FIXME: claculate 'rp->priority'!  
164   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
165                                          rp->transmission_counter);
166   rp->earliest_transmission 
167     = GNUNET_TIME_relative_to_absolute (delay);
168 #if DEBUG_FS
169   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
170               "Earliest (re)transmission for `%s' in %us\n",
171               GNUNET_h2s (&prd->query),
172               rp->transmission_counter);
173 #endif 
174
175   GNUNET_assert (rp->hn == NULL);
176   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
177     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
178                                            rp,
179                                            rp->priority);
180   else
181     rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
182                                            rp,
183                                            rp->earliest_transmission.abs_value);
184   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
185     GNUNET_SCHEDULER_cancel (pp->task);
186   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
187 }
188
189
190 /**
191  * Function called to get a message for transmission.
192  *
193  * @param cls closure
194  * @param buf_size number of bytes available in buf
195  * @param buf where to copy the message, NULL on error (peer disconnect)
196  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
197  */
198 static size_t 
199 transmit_message_callback (void *cls,
200                            size_t buf_size,
201                            void *buf)
202 {
203   struct PeerPlan *pp = cls;
204   struct GSF_RequestPlan *rp;
205   size_t msize;
206
207   pp->pth = NULL;
208   if (NULL == buf)
209     {
210       /* failed, try again... */
211       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
212       return 0;
213     }
214   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
215   if (NULL == rp)
216     {
217       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
218       return 0;
219     }
220   msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf);
221   if (msize > buf_size)
222     {
223       /* buffer to small (message changed), try again */
224       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
225       return 0;
226     }
227   /* remove from root, add again elsewhere... */
228   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
229   rp->hn = NULL;
230   rp->last_transmission = GNUNET_TIME_absolute_get ();
231   rp->transmission_counter++;
232   total_delay++;
233 #if DEBUG_FS
234   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
235               "Executing plan %p executed %u times, planning retransmission\n",
236               rp,
237               rp->transmission_counter);
238 #endif    
239   plan (pp, rp);
240   GNUNET_STATISTICS_update (GSF_stats,
241                             gettext_noop ("# queries messages sent to other peers"),
242                             1,
243                             GNUNET_NO);
244   return msize;
245 }
246
247
248 /**
249  * Figure out when and how to transmit to the given peer.
250  *
251  * @param cls the 'struct PeerPlan'
252  * @param tc scheduler context
253  */
254 static void
255 schedule_peer_transmission (void *cls,
256                             const struct GNUNET_SCHEDULER_TaskContext *tc)
257 {
258   struct PeerPlan *pp = cls;
259   struct GSF_RequestPlan *rp;
260   size_t msize;
261
262   pp->task = GNUNET_SCHEDULER_NO_TASK;
263   if (pp->pth != NULL)
264     {
265       GSF_peer_transmit_cancel_ (pp->pth);
266       pp->pth = NULL;
267     }
268   /* move ready requests to priority queue */
269   while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
270           (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
271     {
272       GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
273       rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
274                                              rp, 
275                                              rp->priority);                                     
276     }   
277   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
278     {
279       /* priority heap (still) empty, check for delay... */
280       rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
281       if (NULL == rp)
282         {
283 #if DEBUG_FS
284           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
285                       "No active requests for plan %p.\n",
286                       pp);
287 #endif
288           return; /* both queues empty */
289         }
290 #if DEBUG_FS
291       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
292                   "Sleeping for %llu ms before retrying requests on plan %p.\n",
293                   (unsigned long long) GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value,
294                   pp);
295 #endif
296       pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
297                                                &schedule_peer_transmission,
298                                                pp);
299       return;
300     }
301   /* process from priority heap */
302   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
303 #if DEBUG_FS > 1
304   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
305               "Executing query plan %p\n",
306               rp);
307 #endif    
308   GNUNET_assert (NULL != rp);
309   msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);
310   pp->pth = GSF_peer_transmit_ (pp->cp,
311                                 GNUNET_YES,
312                                 rp->priority,
313                                 GNUNET_TIME_UNIT_FOREVER_REL,
314                                 msize,
315                                 &transmit_message_callback,
316                                 pp);
317   GNUNET_assert (NULL != pp->pth);
318 }
319
320
321 /**
322  * Create a new query plan entry.
323  *
324  * @param cp peer with the entry
325  * @param pr request with the entry
326  */
327 void
328 GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
329                struct GSF_PendingRequest *pr)
330 {
331   struct GNUNET_PeerIdentity id;
332   struct PeerPlan *pp;
333   struct GSF_PendingRequestData *prd;
334   struct GSF_RequestPlan *rp;
335
336   GNUNET_assert (NULL != cp);
337   GSF_connected_peer_get_identity_ (cp, &id);
338   pp = GNUNET_CONTAINER_multihashmap_get (plans,
339                                           &id.hashPubKey);
340   if (NULL == pp)
341     {
342       pp = GNUNET_malloc (sizeof (struct PeerPlan));
343       pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
344       pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
345       pp->cp = cp;
346       GNUNET_CONTAINER_multihashmap_put (plans,
347                                          &id.hashPubKey,
348                                          pp,
349                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
350     }
351   prd = GSF_pending_request_get_data_ (pr);
352   plan_count++;
353   GNUNET_STATISTICS_update (GSF_stats,
354                             gettext_noop ("# query plan entries"),
355                             1,
356                             GNUNET_NO);
357   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
358 #if DEBUG_FS
359   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360               "Planning transmission of query `%s' to peer `%s' (%p)\n",
361               GNUNET_h2s (&prd->query),
362               GNUNET_i2s (&id), 
363               rp);
364 #endif    
365   rp->pr = pr;
366   GNUNET_CONTAINER_DLL_insert (prd->rp_head,
367                                prd->rp_tail,
368                                rp);
369   plan (pp, rp);
370 }
371
372
373 /**
374  * Notify the plan about a peer being no longer available;
375  * destroy all entries associated with this peer.
376  *
377  * @param cp connected peer 
378  */
379 void
380 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
381 {
382   struct GNUNET_PeerIdentity id;
383   struct PeerPlan *pp;
384   struct GSF_RequestPlan *rp;
385   struct GSF_PendingRequestData *prd;
386
387   GSF_connected_peer_get_identity_ (cp, &id);
388   pp = GNUNET_CONTAINER_multihashmap_get (plans,
389                                           &id.hashPubKey);
390   if (NULL == pp)
391     return; /* nothing was ever planned for this peer */
392   GNUNET_CONTAINER_multihashmap_remove (plans,
393                                         &id.hashPubKey,
394                                         pp);
395   if (NULL != pp->pth)
396     GSF_peer_transmit_cancel_ (pp->pth);
397   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
398     {
399       GNUNET_SCHEDULER_cancel (pp->task);
400       pp->task = GNUNET_SCHEDULER_NO_TASK;
401     }
402   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
403     {
404       prd = GSF_pending_request_get_data_ (rp->pr);
405       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
406                                    prd->rp_tail,
407                                    rp);
408       plan_count--;
409       GNUNET_free (rp);
410     }
411   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
412   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
413     {
414       prd = GSF_pending_request_get_data_ (rp->pr);
415       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
416                                    prd->rp_tail,
417                                    rp);
418       plan_count--;
419       GNUNET_free (rp);
420     }
421   GNUNET_STATISTICS_set (GSF_stats,
422                          gettext_noop ("# query plan entries"),
423                          plan_count,
424                          GNUNET_NO);
425
426   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
427   GNUNET_free (pp);
428 }
429
430
431 /**
432  * Notify the plan about a request being done; destroy all entries
433  * associated with this request.
434  *
435  * @param pr request that is done
436  */
437 void
438 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
439 {
440   struct GSF_RequestPlan *rp;
441   struct GSF_PendingRequestData *prd;
442
443   prd = GSF_pending_request_get_data_ (pr);
444   while (NULL != (rp = prd->rp_head))
445     {
446       GNUNET_CONTAINER_heap_remove_node (rp->hn);
447       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
448                                    prd->rp_tail,
449                                    rp);
450       plan_count--;
451       GNUNET_free (rp);
452     }
453   GNUNET_STATISTICS_set (GSF_stats,
454                          gettext_noop ("# query plan entries"),
455                          plan_count,
456                          GNUNET_NO);  
457 }
458
459
460 /**
461  * Initialize plan subsystem.
462  */
463 void
464 GSF_plan_init ()
465 {
466   plans = GNUNET_CONTAINER_multihashmap_create (256);
467 }
468
469
470 /**
471  * Shutdown plan subsystem.
472  */
473 void
474 GSF_plan_done ()
475 {
476   GNUNET_assert (0 == 
477                  GNUNET_CONTAINER_multihashmap_size (plans));
478   GNUNET_CONTAINER_multihashmap_destroy (plans);
479 }
480
481
482
483 /* end of gnunet-service-fs_pe.h */