83733ef8dccffa9fae782eaa5420403df070b2e7
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32
33 /**
34  * Information we keep per request per peer.  This is a doubly-linked
35  * list (with head and tail in the 'struct GSF_PendingRequestData')
36  * with one entry in each heap of each 'struct PeerPlan'.  Each
37  * entry tracks information relevant for this request and this peer.
38  */
39 struct GSF_RequestPlan
40 {
41
42   /**
43    * This is a doubly-linked list.
44    */
45   struct GSF_RequestPlan *next;
46
47   /**
48    * This is a doubly-linked list.
49    */
50   struct GSF_RequestPlan *prev;
51
52   /**
53    * Heap node associated with this request and this peer.
54    */
55   struct GNUNET_CONTAINER_HeapNode *hn;
56
57   /**
58    * Array of associated pending requests.
59    */
60   struct GSF_PendingRequest **prs;  
61
62   /**
63    * Earliest time we'd be happy to (re)transmit this request.
64    */
65   struct GNUNET_TIME_Absolute earliest_transmission;
66
67   /**
68    * When was the last time we transmitted this request to this peer? 0 for never.
69    */
70   struct GNUNET_TIME_Absolute last_transmission;
71
72   /**
73    * Number of entries in 'prs'.
74    */
75   unsigned int prs_length;
76
77   /**
78    * Current priority for this request for this target.
79    */
80   uint64_t priority;
81
82   /**
83    * How often did we transmit this request to this peer?
84    */
85   unsigned int transmission_counter;
86
87 };
88
89
90 /**
91  * Transmission plan for a peer.
92  */
93 struct PeerPlan
94 {
95   /**
96    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
97    */
98   struct GNUNET_CONTAINER_Heap *priority_heap;
99
100   /**
101    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
102    */
103   struct GNUNET_CONTAINER_Heap *delay_heap;
104
105   /**
106    * Current transmission request handle.
107    */
108   struct GSF_PeerTransmitHandle *pth;
109
110   /**
111    * Peer for which this is the plan.
112    */
113   struct GSF_ConnectedPeer *cp;
114
115   /**
116    * Current task for executing the plan.
117    */
118   GNUNET_SCHEDULER_TaskIdentifier task;
119 };
120
121
122 /**
123  * Hash map from peer identities to PeerPlans.
124  */
125 static struct GNUNET_CONTAINER_MultiHashMap *plans;
126
127 /**
128  * Sum of all transmission counters (equals total delay for all plan entries).
129  */
130 static unsigned long long total_delay;
131
132 /**
133  * Number of plan entries.
134  */
135 static unsigned long long plan_count;
136
137
138 /**
139  * Figure out when and how to transmit to the given peer.
140  *
141  * @param cls the 'struct GSF_ConnectedPeer' for transmission
142  * @param tc scheduler context
143  */
144 static void
145 schedule_peer_transmission (void *cls,
146                             const struct GNUNET_SCHEDULER_TaskContext *tc);
147
148
149 /**
150  * Insert the given request plan into the heap with the appropriate weight.
151  *
152  * @param pp associated peer's plan
153  * @param rp request to plan
154  */
155 static void
156 plan (struct PeerPlan *pp,
157       struct GSF_RequestPlan *rp)
158 {
159   struct GSF_PendingRequestData *prd;
160   struct GNUNET_TIME_Relative delay;
161
162   GNUNET_STATISTICS_set (GSF_stats,
163                          gettext_noop ("# average retransmission delay (ms)"),
164                          total_delay * 1000LL / plan_count,
165                          GNUNET_NO);
166   prd = GSF_pending_request_get_data_ (rp->prs[0]);
167   // FIXME: calculate 'rp->priority'!  
168   if (rp->transmission_counter < 32)
169     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
170                                            1LL << rp->transmission_counter);
171   else
172     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
173                                            UINT_MAX);
174   rp->earliest_transmission 
175     = GNUNET_TIME_relative_to_absolute (delay);
176 #if DEBUG_FS
177   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
178               "Earliest (re)transmission for `%s' in %us\n",
179               GNUNET_h2s (&prd->query),
180               rp->transmission_counter);
181 #endif 
182
183   GNUNET_assert (rp->hn == NULL);
184   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
185     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
186                                            rp,
187                                            rp->priority);
188   else
189     rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
190                                            rp,
191                                            rp->earliest_transmission.abs_value);
192   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
193     GNUNET_SCHEDULER_cancel (pp->task);
194   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
195 }
196
197
198 /**
199  * Function called to get a message for transmission.
200  *
201  * @param cls closure
202  * @param buf_size number of bytes available in buf
203  * @param buf where to copy the message, NULL on error (peer disconnect)
204  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
205  */
206 static size_t 
207 transmit_message_callback (void *cls,
208                            size_t buf_size,
209                            void *buf)
210 {
211   struct PeerPlan *pp = cls;
212   struct GSF_RequestPlan *rp;
213   size_t msize;
214
215   pp->pth = NULL;
216   if (NULL == buf)
217     {
218       /* failed, try again... */
219       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
220       return 0;
221     }
222   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
223   if (NULL == rp)
224     {
225       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
226       return 0;
227     }
228   msize = GSF_pending_request_get_message_ (rp->prs[0], buf_size, buf);
229   if (msize > buf_size)
230     {
231       /* buffer to small (message changed), try again */
232       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
233       return 0;
234     }
235   /* remove from root, add again elsewhere... */
236   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
237   rp->hn = NULL;
238   rp->last_transmission = GNUNET_TIME_absolute_get ();
239   rp->transmission_counter++;
240   total_delay++;
241 #if DEBUG_FS
242   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
243               "Executing plan %p executed %u times, planning retransmission\n",
244               rp,
245               rp->transmission_counter);
246 #endif    
247   plan (pp, rp);
248   GNUNET_STATISTICS_update (GSF_stats,
249                             gettext_noop ("# queries messages sent to other peers"),
250                             1,
251                             GNUNET_NO);
252   return msize;
253 }
254
255
256 /**
257  * Figure out when and how to transmit to the given peer.
258  *
259  * @param cls the 'struct PeerPlan'
260  * @param tc scheduler context
261  */
262 static void
263 schedule_peer_transmission (void *cls,
264                             const struct GNUNET_SCHEDULER_TaskContext *tc)
265 {
266   struct PeerPlan *pp = cls;
267   struct GSF_RequestPlan *rp;
268   size_t msize;
269
270   pp->task = GNUNET_SCHEDULER_NO_TASK;
271   if (pp->pth != NULL)
272     {
273       GSF_peer_transmit_cancel_ (pp->pth);
274       pp->pth = NULL;
275     }
276   /* move ready requests to priority queue */
277   while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
278           (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
279     {
280       GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
281       rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
282                                              rp, 
283                                              rp->priority);                                     
284     }   
285   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
286     {
287       /* priority heap (still) empty, check for delay... */
288       rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
289       if (NULL == rp)
290         {
291 #if DEBUG_FS
292           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
293                       "No active requests for plan %p.\n",
294                       pp);
295 #endif
296           return; /* both queues empty */
297         }
298 #if DEBUG_FS
299       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
300                   "Sleeping for %llu ms before retrying requests on plan %p.\n",
301                   (unsigned long long) GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value,
302                   pp);
303 #endif
304       pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
305                                                &schedule_peer_transmission,
306                                                pp);
307       return;
308     }
309   /* process from priority heap */
310   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
311 #if DEBUG_FS > 1
312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313               "Executing query plan %p\n",
314               rp);
315 #endif    
316   GNUNET_assert (NULL != rp);
317   msize = GSF_pending_request_get_message_ (rp->prs[0], 0, NULL);
318   pp->pth = GSF_peer_transmit_ (pp->cp,
319                                 GNUNET_YES,
320                                 rp->priority,
321                                 GNUNET_TIME_UNIT_FOREVER_REL,
322                                 msize,
323                                 &transmit_message_callback,
324                                 pp);
325   GNUNET_assert (NULL != pp->pth);
326 }
327
328
329 /**
330  * Create a new query plan entry.
331  *
332  * @param cp peer with the entry
333  * @param pr request with the entry
334  */
335 void
336 GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
337                struct GSF_PendingRequest *pr)
338 {
339   struct GNUNET_PeerIdentity id;
340   struct PeerPlan *pp;
341   struct GSF_PendingRequestData *prd;
342   struct GSF_RequestPlan *rp;
343   unsigned int i;
344   size_t msize;
345
346   GNUNET_assert (NULL != cp);
347   GSF_connected_peer_get_identity_ (cp, &id);
348   pp = GNUNET_CONTAINER_multihashmap_get (plans,
349                                           &id.hashPubKey);
350   if (NULL == pp)
351     {
352       pp = GNUNET_malloc (sizeof (struct PeerPlan));
353       pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
354       pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
355       pp->cp = cp;
356       GNUNET_CONTAINER_multihashmap_put (plans,
357                                          &id.hashPubKey,
358                                          pp,
359                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
360     }
361   msize = GSF_pending_request_get_message_ (pr, 0, NULL);
362   prd = GSF_pending_request_get_data_ (pr);
363   for (rp = prd->rp_head; NULL != rp; rp = rp->next)
364     {
365       char mbuf[msize];
366       char xbuf[msize];
367       
368       GNUNET_assert (msize == GSF_pending_request_get_message_ (pr, msize, mbuf));
369       if ( (msize == GSF_pending_request_get_message_ (rp->prs[0], msize, xbuf)) &&
370            (0 == memcmp (xbuf, mbuf, msize)) )
371         {
372           /* add request to existing plan */
373           GNUNET_array_append (rp->prs, 
374                                rp->prs_length,
375                                pr);
376           for (i=0;i<rp->prs_length;i++)
377             if (GSF_pending_request_get_data_ (rp->prs[0])->ttl.abs_value < prd->ttl.abs_value)
378               {
379                 GNUNET_STATISTICS_update (GSF_stats,
380                                           gettext_noop ("# requests refreshed"),
381                                           1,
382                                           GNUNET_NO);  
383                 rp->transmission_counter = 0; /* reset */               
384                 break;
385               }
386           return;
387         }
388     }
389   plan_count++;
390   GNUNET_STATISTICS_update (GSF_stats,
391                             gettext_noop ("# query plan entries"),
392                             1,
393                             GNUNET_NO);  
394   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
395 #if DEBUG_FS
396   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
397               "Planning transmission of query `%s' to peer `%s' (%p)\n",
398               GNUNET_h2s (&prd->query),
399               GNUNET_i2s (&id), 
400               rp);
401 #endif    
402   GNUNET_array_append (rp->prs,
403                        rp->prs_length,
404                        pr);
405   GNUNET_CONTAINER_DLL_insert (prd->rp_head,
406                                prd->rp_tail,
407                                rp);
408   plan (pp, rp);
409 }
410
411
412 /**
413  * Notify the plan about a peer being no longer available;
414  * destroy all entries associated with this peer.
415  *
416  * @param cp connected peer 
417  */
418 void
419 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
420 {
421   struct GNUNET_PeerIdentity id;
422   struct PeerPlan *pp;
423   struct GSF_RequestPlan *rp;
424   struct GSF_PendingRequestData *prd;
425   unsigned int i;
426
427   GSF_connected_peer_get_identity_ (cp, &id);
428   pp = GNUNET_CONTAINER_multihashmap_get (plans,
429                                           &id.hashPubKey);
430   if (NULL == pp)
431     return; /* nothing was ever planned for this peer */
432   GNUNET_assert (GNUNET_YES ==
433                  GNUNET_CONTAINER_multihashmap_remove (plans,
434                                                        &id.hashPubKey,
435                                                        pp));
436   if (NULL != pp->pth)
437     GSF_peer_transmit_cancel_ (pp->pth);
438   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
439     {
440       GNUNET_SCHEDULER_cancel (pp->task);
441       pp->task = GNUNET_SCHEDULER_NO_TASK;
442     }
443   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
444     {
445       for (i=0;i<rp->prs_length;i++)
446         {
447           prd = GSF_pending_request_get_data_ (rp->prs[i]);
448           GNUNET_CONTAINER_DLL_remove (prd->rp_head,
449                                        prd->rp_tail,
450                                        rp);
451         }
452       plan_count--;
453       GNUNET_array_grow (rp->prs, rp->prs_length, 0);
454       GNUNET_free (rp);
455     }
456   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
457   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
458     {
459       for (i=0;i<rp->prs_length;i++)
460         {
461           prd = GSF_pending_request_get_data_ (rp->prs[i]);
462           GNUNET_CONTAINER_DLL_remove (prd->rp_head,
463                                        prd->rp_tail,
464                                        rp);
465         }
466       plan_count--;
467       GNUNET_array_grow (rp->prs, rp->prs_length, 0);
468       GNUNET_free (rp);
469     }
470   GNUNET_STATISTICS_set (GSF_stats,
471                          gettext_noop ("# query plan entries"),
472                          plan_count,
473                          GNUNET_NO);
474
475   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
476   GNUNET_free (pp);
477 }
478
479
480 /**
481  * Notify the plan about a request being done; destroy all entries
482  * associated with this request.
483  *
484  * @param pr request that is done
485  */
486 void
487 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
488 {
489   struct GSF_RequestPlan *rp;
490   struct GSF_PendingRequestData *prd;
491   unsigned int i;
492
493   prd = GSF_pending_request_get_data_ (pr);
494   while (NULL != (rp = prd->rp_head))
495     {
496       for (i=0;i<rp->prs_length;i++)
497         {
498           if (rp->prs[i] == pr)
499             {         
500               rp->prs[i] = rp->prs[rp->prs_length - 1];
501               GNUNET_array_grow (rp->prs, rp->prs_length, rp->prs_length-1);
502               if (rp->prs_length == 0)
503                 {
504                   GNUNET_CONTAINER_heap_remove_node (rp->hn);
505                   GNUNET_CONTAINER_DLL_remove (prd->rp_head,
506                                                prd->rp_tail,
507                                                rp);
508                   plan_count--;
509                   GNUNET_free (rp);
510                   break;
511                 }
512             }
513         }
514     }
515   GNUNET_STATISTICS_set (GSF_stats,
516                          gettext_noop ("# query plan entries"),
517                          plan_count,
518                          GNUNET_NO);  
519 }
520
521
522 /**
523  * Initialize plan subsystem.
524  */
525 void
526 GSF_plan_init ()
527 {
528   plans = GNUNET_CONTAINER_multihashmap_create (256);
529 }
530
531
532 /**
533  * Shutdown plan subsystem.
534  */
535 void
536 GSF_plan_done ()
537 {
538   GNUNET_assert (0 == 
539                  GNUNET_CONTAINER_multihashmap_size (plans));
540   GNUNET_CONTAINER_multihashmap_destroy (plans);
541 }
542
543
544
545 /* end of gnunet-service-fs_pe.h */