b3f46cf45c51906830531f2c62e6c2550d956546
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pe.h"
30 #include "gnunet-service-fs_pr.h"
31
32
33 /**
34  * Information we keep per request per peer.  This is a doubly-linked
35  * list (with head and tail in the 'struct GSF_PendingRequestData')
36  * with one entry in each heap of each 'struct PeerPlan'.  Each
37  * entry tracks information relevant for this request and this peer.
38  */
39 struct GSF_RequestPlan
40 {
41
42   /**
43    * This is a doubly-linked list.
44    */
45   struct GSF_RequestPlan *next;
46
47   /**
48    * This is a doubly-linked list.
49    */
50   struct GSF_RequestPlan *prev;
51
52   /**
53    * Heap node associated with this request and this peer.
54    */
55   struct GNUNET_CONTAINER_HeapNode *hn;
56
57   /**
58    * Associated pending request.
59    */
60   struct GSF_PendingRequest *pr;
61
62   /**
63    * Earliest time we'd be happy to (re)transmit this request.
64    */
65   struct GNUNET_TIME_Absolute earliest_transmission;
66
67   /**
68    * When was the last time we transmitted this request to this peer? 0 for never.
69    */
70   struct GNUNET_TIME_Absolute last_transmission;
71
72   /**
73    * Current priority for this request for this target.
74    */
75   uint64_t priority;
76
77   /**
78    * How often did we transmit this request to this peer?
79    */
80   unsigned int transmission_counter;
81
82 };
83
84
85 /**
86  * Transmission plan for a peer.
87  */
88 struct PeerPlan
89 {
90   /**
91    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
92    */
93   struct GNUNET_CONTAINER_Heap *priority_heap;
94
95   /**
96    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
97    */
98   struct GNUNET_CONTAINER_Heap *delay_heap;
99
100   /**
101    * Current transmission request handle.
102    */
103   struct GSF_PeerTransmitHandle *pth;
104
105   /**
106    * Peer for which this is the plan.
107    */
108   struct GSF_ConnectedPeer *cp;
109
110   /**
111    * Current task for executing the plan.
112    */
113   GNUNET_SCHEDULER_TaskIdentifier task;
114 };
115
116
117 /**
118  * Hash map from peer identities to PeerPlans.
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *plans;
121
122
123 /**
124  * Insert the given request plan into the heap with the appropriate weight.
125  *
126  * @param pp associated peer's plan
127  * @param rp request to plan
128  */
129 static void
130 plan (struct PeerPlan *pp,
131       struct GSF_RequestPlan *rp)
132 {
133   struct GSF_PendingRequestData *prd;
134
135   prd = GSF_pending_request_get_data_ (rp->pr);
136   // FIXME: calculate 'rp->earliest_transmission'!
137   // FIXME: claculate 'rp->priority'! 
138   rp->earliest_transmission 
139     = GNUNET_TIME_relative_to_absolute 
140     (GNUNET_TIME_relative_multiply 
141      (GNUNET_TIME_UNIT_SECONDS,
142       rp->transmission_counter));
143 #if DEBUG_FS
144   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
145               "Earliest (re)transmission for `%s' in %us\n",
146               GNUNET_h2s (&prd->query),
147               rp->transmission_counter);
148 #endif 
149
150
151   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
152     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
153                                            rp,
154                                            rp->priority);
155   else
156     rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
157                                            rp,
158                                            rp->earliest_transmission.abs_value);
159 }
160
161
162 /**
163  * Figure out when and how to transmit to the given peer.
164  *
165  * @param cls the 'struct GSF_ConnectedPeer' for transmission
166  * @param tc scheduler context
167  */
168 static void
169 schedule_peer_transmission (void *cls,
170                             const struct GNUNET_SCHEDULER_TaskContext *tc);
171
172
173 /**
174  * Function called to get a message for transmission.
175  *
176  * @param cls closure
177  * @param buf_size number of bytes available in buf
178  * @param buf where to copy the message, NULL on error (peer disconnect)
179  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
180  */
181 static size_t 
182 transmit_message_callback (void *cls,
183                            size_t buf_size,
184                            void *buf)
185 {
186   struct PeerPlan *pp = cls;
187   struct GSF_RequestPlan *rp;
188   size_t msize;
189
190   pp->pth = NULL;
191   if (NULL == buf)
192     {
193       /* failed, try again... */
194       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
195       return 0;
196     }
197   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
198   if (NULL == rp)
199     {
200       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
201       return 0;
202     }
203   msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf);
204   if (msize > buf_size)
205     {
206       /* buffer to small (message changed), try again */
207       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
208       return 0;
209     }
210   /* remove from root, add again elsewhere... */
211   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
212   rp->hn = NULL;
213   rp->last_transmission = GNUNET_TIME_absolute_get ();
214   rp->transmission_counter++;
215 #if DEBUG_FS
216   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
217               "Executing plan %p executed %u times, planning retransmission\n",
218               rp,
219               rp->transmission_counter);
220 #endif    
221   plan (pp, rp);
222   return msize;
223 }
224
225
226 /**
227  * Figure out when and how to transmit to the given peer.
228  *
229  * @param cls the 'struct PeerPlan'
230  * @param tc scheduler context
231  */
232 static void
233 schedule_peer_transmission (void *cls,
234                             const struct GNUNET_SCHEDULER_TaskContext *tc)
235 {
236   struct PeerPlan *pp = cls;
237   struct GSF_RequestPlan *rp;
238   size_t msize;
239
240   pp->task = GNUNET_SCHEDULER_NO_TASK;
241   GNUNET_assert (NULL == pp->pth);
242   /* move ready requests to priority queue */
243   while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
244           (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
245     {
246       GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
247       rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
248                                              rp, 
249                                              rp->priority);                                     
250     }   
251   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
252     {
253       /* priority heap (still) empty, check for delay... */
254       rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
255       if (NULL == rp)
256         return; /* both queues empty */
257       pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
258                                                &schedule_peer_transmission,
259                                                pp);
260       return;
261     }
262   /* process from priority heap */
263   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
264 #if DEBUG_FS
265   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266               "Executing query plan %p\n",
267               rp);
268 #endif    
269   GNUNET_assert (NULL != rp);
270   msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);                                      
271   pp->pth = GSF_peer_transmit_ (pp->cp,
272                                 GNUNET_YES,
273                                 rp->priority,
274                                 GNUNET_TIME_UNIT_FOREVER_REL,
275                                 msize,
276                                 &transmit_message_callback,
277                                 pp);
278   GNUNET_assert (NULL != pp->pth);
279 }
280
281
282 /**
283  * Create a new query plan entry.
284  *
285  * @param cp peer with the entry
286  * @param pr request with the entry
287  */
288 void
289 GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
290                struct GSF_PendingRequest *pr)
291 {
292   struct GNUNET_PeerIdentity id;
293   struct PeerPlan *pp;
294   struct GSF_PendingRequestData *prd;
295   struct GSF_RequestPlan *rp;
296
297   GSF_connected_peer_get_identity_ (cp, &id);
298   pp = GNUNET_CONTAINER_multihashmap_get (plans,
299                                           &id.hashPubKey);
300   if (NULL == pp)
301     {
302       pp = GNUNET_malloc (sizeof (struct PeerPlan));
303       pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
304       pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
305       pp->cp = cp;
306       GNUNET_CONTAINER_multihashmap_put (plans,
307                                          &id.hashPubKey,
308                                          pp,
309                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
310     }
311   prd = GSF_pending_request_get_data_ (pr);
312   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
313 #if DEBUG_FS
314   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
315               "Planning transmission of query `%s' to peer `%s' (%p)\n",
316               GNUNET_h2s (&prd->query),
317               GNUNET_i2s (&id), 
318               rp);
319 #endif    
320   rp->pr = pr;
321   GNUNET_CONTAINER_DLL_insert (prd->rp_head,
322                                prd->rp_tail,
323                                rp);
324   plan (pp, rp);
325   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
326     {
327       /* no request that should be done immediately, figure out delay */
328       if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap))
329         return; /* did not change delay heap top, no need to do anything */
330       GNUNET_assert (NULL == pp->pth);
331       if (GNUNET_SCHEDULER_NO_TASK != pp->task)
332         GNUNET_SCHEDULER_cancel (pp->task);
333       pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
334                                                &schedule_peer_transmission,
335                                                pp);
336       return;
337     }
338
339   if (pp->pth != NULL)
340     {
341       if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap))
342         return; /* did not change priority heap top, no need to do anyhing */
343       GSF_peer_transmit_cancel_ (pp->pth);
344       pp->pth = NULL;
345     }
346   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
347     GNUNET_SCHEDULER_cancel (pp->task);
348   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
349                                        pp);
350 }
351
352
353 /**
354  * Notify the plan about a peer being no longer available;
355  * destroy all entries associated with this peer.
356  *
357  * @param cp connected peer 
358  */
359 void
360 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
361 {
362   struct GNUNET_PeerIdentity id;
363   struct PeerPlan *pp;
364   struct GSF_RequestPlan *rp;
365   struct GSF_PendingRequestData *prd;
366
367   GSF_connected_peer_get_identity_ (cp, &id);
368   pp = GNUNET_CONTAINER_multihashmap_get (plans,
369                                           &id.hashPubKey);
370   if (NULL == pp)
371     return; /* nothing was ever planned for this peer */
372   GNUNET_CONTAINER_multihashmap_remove (plans,
373                                         &id.hashPubKey,
374                                         pp);
375   if (NULL != pp->pth)
376     GSF_peer_transmit_cancel_ (pp->pth);
377   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
378     GNUNET_SCHEDULER_cancel (pp->task);
379   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
380     {
381       prd = GSF_pending_request_get_data_ (rp->pr);
382       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
383                                    prd->rp_tail,
384                                    rp);
385       GNUNET_free (rp);
386     }
387   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
388   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
389     {
390       prd = GSF_pending_request_get_data_ (rp->pr);
391       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
392                                    prd->rp_tail,
393                                    rp);
394       GNUNET_free (rp);
395     }
396   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
397   GNUNET_free (pp);
398 }
399
400
401 /**
402  * Notify the plan about a request being done; destroy all entries
403  * associated with this request.
404  *
405  * @param pr request that is done
406  */
407 void
408 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
409 {
410   struct GSF_RequestPlan *rp;
411   struct GSF_PendingRequestData *prd;
412
413   prd = GSF_pending_request_get_data_ (pr);
414   while (NULL != (rp = prd->rp_head))
415     {
416       GNUNET_CONTAINER_heap_remove_node (rp->hn);
417       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
418                                    prd->rp_tail,
419                                    rp);
420       GNUNET_free (rp);
421     }
422 }
423
424
425 /**
426  * Initialize plan subsystem.
427  */
428 void
429 GSF_plan_init ()
430 {
431   plans = GNUNET_CONTAINER_multihashmap_create (256);
432 }
433
434
435 /**
436  * Shutdown plan subsystem.
437  */
438 void
439 GSF_plan_done ()
440 {
441   GNUNET_assert (0 == 
442                  GNUNET_CONTAINER_multihashmap_size (plans));
443   GNUNET_CONTAINER_multihashmap_destroy (plans);
444 }
445
446
447
448 /* end of gnunet-service-fs_pe.h */