28036150f5a4fa41c54f02b76b3fb04d74403404
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32
33 /**
34  * Information we keep per request per peer.  This is a doubly-linked
35  * list (with head and tail in the 'struct GSF_PendingRequestData')
36  * with one entry in each heap of each 'struct PeerPlan'.  Each
37  * entry tracks information relevant for this request and this peer.
38  */
39 struct GSF_RequestPlan
40 {
41
42   /**
43    * This is a doubly-linked list.
44    */
45   struct GSF_RequestPlan *next;
46
47   /**
48    * This is a doubly-linked list.
49    */
50   struct GSF_RequestPlan *prev;
51
52   /**
53    * Heap node associated with this request and this peer.
54    */
55   struct GNUNET_CONTAINER_HeapNode *hn;
56
57   /**
58    * Associated pending request.
59    */
60   struct GSF_PendingRequest *pr;
61
62   /**
63    * Earliest time we'd be happy to (re)transmit this request.
64    */
65   struct GNUNET_TIME_Absolute earliest_transmission;
66
67   /**
68    * When was the last time we transmitted this request to this peer? 0 for never.
69    */
70   struct GNUNET_TIME_Absolute last_transmission;
71
72   /**
73    * Current priority for this request for this target.
74    */
75   uint64_t priority;
76
77   /**
78    * How often did we transmit this request to this peer?
79    */
80   unsigned int transmission_counter;
81
82 };
83
84
85 /**
86  * Transmission plan for a peer.
87  */
88 struct PeerPlan
89 {
90   /**
91    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
92    */
93   struct GNUNET_CONTAINER_Heap *priority_heap;
94
95   /**
96    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
97    */
98   struct GNUNET_CONTAINER_Heap *delay_heap;
99
100   /**
101    * Current transmission request handle.
102    */
103   struct GSF_PeerTransmitHandle *pth;
104
105   /**
106    * Peer for which this is the plan.
107    */
108   struct GSF_ConnectedPeer *cp;
109
110   /**
111    * Current task for executing the plan.
112    */
113   GNUNET_SCHEDULER_TaskIdentifier task;
114 };
115
116
117 /**
118  * Hash map from peer identities to PeerPlans.
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *plans;
121
122
123 /**
124  * Figure out when and how to transmit to the given peer.
125  *
126  * @param cls the 'struct GSF_ConnectedPeer' for transmission
127  * @param tc scheduler context
128  */
129 static void
130 schedule_peer_transmission (void *cls,
131                             const struct GNUNET_SCHEDULER_TaskContext *tc);
132
133
134 /**
135  * Insert the given request plan into the heap with the appropriate weight.
136  *
137  * @param pp associated peer's plan
138  * @param rp request to plan
139  */
140 static void
141 plan (struct PeerPlan *pp,
142       struct GSF_RequestPlan *rp)
143 {
144   struct GSF_PendingRequestData *prd;
145
146   prd = GSF_pending_request_get_data_ (rp->pr);
147   // FIXME: calculate 'rp->earliest_transmission'!
148   // FIXME: claculate 'rp->priority'! 
149   rp->earliest_transmission 
150     = GNUNET_TIME_relative_to_absolute 
151     (GNUNET_TIME_relative_multiply 
152      (GNUNET_TIME_UNIT_SECONDS,
153       rp->transmission_counter));
154 #if DEBUG_FS
155   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
156               "Earliest (re)transmission for `%s' in %us\n",
157               GNUNET_h2s (&prd->query),
158               rp->transmission_counter);
159 #endif 
160
161
162   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
163     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
164                                            rp,
165                                            rp->priority);
166   else
167     rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
168                                            rp,
169                                            rp->earliest_transmission.abs_value);
170   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
171     GNUNET_SCHEDULER_cancel (pp->task);
172   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
173 }
174
175
176 /**
177  * Function called to get a message for transmission.
178  *
179  * @param cls closure
180  * @param buf_size number of bytes available in buf
181  * @param buf where to copy the message, NULL on error (peer disconnect)
182  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
183  */
184 static size_t 
185 transmit_message_callback (void *cls,
186                            size_t buf_size,
187                            void *buf)
188 {
189   struct PeerPlan *pp = cls;
190   struct GSF_RequestPlan *rp;
191   size_t msize;
192
193   pp->pth = NULL;
194   if (NULL == buf)
195     {
196       /* failed, try again... */
197       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
198       return 0;
199     }
200   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
201   if (NULL == rp)
202     {
203       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
204       return 0;
205     }
206   msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf);
207   if (msize > buf_size)
208     {
209       /* buffer to small (message changed), try again */
210       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
211       return 0;
212     }
213   /* remove from root, add again elsewhere... */
214   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
215   rp->hn = NULL;
216   rp->last_transmission = GNUNET_TIME_absolute_get ();
217   rp->transmission_counter++;
218 #if DEBUG_FS
219   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
220               "Executing plan %p executed %u times, planning retransmission\n",
221               rp,
222               rp->transmission_counter);
223 #endif    
224   plan (pp, rp);
225   return msize;
226 }
227
228
229 /**
230  * Figure out when and how to transmit to the given peer.
231  *
232  * @param cls the 'struct PeerPlan'
233  * @param tc scheduler context
234  */
235 static void
236 schedule_peer_transmission (void *cls,
237                             const struct GNUNET_SCHEDULER_TaskContext *tc)
238 {
239   struct PeerPlan *pp = cls;
240   struct GSF_RequestPlan *rp;
241   size_t msize;
242
243   pp->task = GNUNET_SCHEDULER_NO_TASK;
244   if (pp->pth != NULL)
245     {
246       GSF_peer_transmit_cancel_ (pp->pth);
247       pp->pth = NULL;
248     }
249   /* move ready requests to priority queue */
250   while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
251           (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
252     {
253       GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
254       rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
255                                              rp, 
256                                              rp->priority);                                     
257     }   
258   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
259     {
260       /* priority heap (still) empty, check for delay... */
261       rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
262       if (NULL == rp)
263         {
264 #if DEBUG_FS
265           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266                       "No active requests for plan %p.\n",
267                       pp);
268 #endif
269           return; /* both queues empty */
270         }
271 #if DEBUG_FS
272       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
273                   "Sleeping for %llu ms before retrying requests on plan %p.\n",
274                   (unsigned long long) GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value,
275                   pp);
276 #endif
277       pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
278                                                &schedule_peer_transmission,
279                                                pp);
280       return;
281     }
282   /* process from priority heap */
283   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
284 #if DEBUG_FS
285   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
286               "Executing query plan %p\n",
287               rp);
288 #endif    
289   GNUNET_assert (NULL != rp);
290   msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);
291   pp->pth = GSF_peer_transmit_ (pp->cp,
292                                 GNUNET_YES,
293                                 rp->priority,
294                                 GNUNET_TIME_UNIT_FOREVER_REL,
295                                 msize,
296                                 &transmit_message_callback,
297                                 pp);
298   GNUNET_assert (NULL != pp->pth);
299 }
300
301
302 /**
303  * Create a new query plan entry.
304  *
305  * @param cp peer with the entry
306  * @param pr request with the entry
307  */
308 void
309 GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
310                struct GSF_PendingRequest *pr)
311 {
312   struct GNUNET_PeerIdentity id;
313   struct PeerPlan *pp;
314   struct GSF_PendingRequestData *prd;
315   struct GSF_RequestPlan *rp;
316
317   GSF_connected_peer_get_identity_ (cp, &id);
318   pp = GNUNET_CONTAINER_multihashmap_get (plans,
319                                           &id.hashPubKey);
320   if (NULL == pp)
321     {
322       pp = GNUNET_malloc (sizeof (struct PeerPlan));
323       pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
324       pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
325       pp->cp = cp;
326       GNUNET_CONTAINER_multihashmap_put (plans,
327                                          &id.hashPubKey,
328                                          pp,
329                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
330     }
331   prd = GSF_pending_request_get_data_ (pr);
332   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
333 #if DEBUG_FS
334   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
335               "Planning transmission of query `%s' to peer `%s' (%p)\n",
336               GNUNET_h2s (&prd->query),
337               GNUNET_i2s (&id), 
338               rp);
339 #endif    
340   rp->pr = pr;
341   GNUNET_CONTAINER_DLL_insert (prd->rp_head,
342                                prd->rp_tail,
343                                rp);
344   plan (pp, rp);
345 }
346
347
348 /**
349  * Notify the plan about a peer being no longer available;
350  * destroy all entries associated with this peer.
351  *
352  * @param cp connected peer 
353  */
354 void
355 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
356 {
357   struct GNUNET_PeerIdentity id;
358   struct PeerPlan *pp;
359   struct GSF_RequestPlan *rp;
360   struct GSF_PendingRequestData *prd;
361
362   GSF_connected_peer_get_identity_ (cp, &id);
363   pp = GNUNET_CONTAINER_multihashmap_get (plans,
364                                           &id.hashPubKey);
365   if (NULL == pp)
366     return; /* nothing was ever planned for this peer */
367   GNUNET_CONTAINER_multihashmap_remove (plans,
368                                         &id.hashPubKey,
369                                         pp);
370   if (NULL != pp->pth)
371     GSF_peer_transmit_cancel_ (pp->pth);
372   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
373     GNUNET_SCHEDULER_cancel (pp->task);
374   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
375     {
376       prd = GSF_pending_request_get_data_ (rp->pr);
377       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
378                                    prd->rp_tail,
379                                    rp);
380       GNUNET_free (rp);
381     }
382   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
383   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
384     {
385       prd = GSF_pending_request_get_data_ (rp->pr);
386       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
387                                    prd->rp_tail,
388                                    rp);
389       GNUNET_free (rp);
390     }
391   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
392   GNUNET_free (pp);
393 }
394
395
396 /**
397  * Notify the plan about a request being done; destroy all entries
398  * associated with this request.
399  *
400  * @param pr request that is done
401  */
402 void
403 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
404 {
405   struct GSF_RequestPlan *rp;
406   struct GSF_PendingRequestData *prd;
407
408   prd = GSF_pending_request_get_data_ (pr);
409   while (NULL != (rp = prd->rp_head))
410     {
411       GNUNET_CONTAINER_heap_remove_node (rp->hn);
412       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
413                                    prd->rp_tail,
414                                    rp);
415       GNUNET_free (rp);
416     }
417 }
418
419
420 /**
421  * Initialize plan subsystem.
422  */
423 void
424 GSF_plan_init ()
425 {
426   plans = GNUNET_CONTAINER_multihashmap_create (256);
427 }
428
429
430 /**
431  * Shutdown plan subsystem.
432  */
433 void
434 GSF_plan_done ()
435 {
436   GNUNET_assert (0 == 
437                  GNUNET_CONTAINER_multihashmap_size (plans));
438   GNUNET_CONTAINER_multihashmap_destroy (plans);
439 }
440
441
442
443 /* end of gnunet-service-fs_pe.h */