00dd41f2a74f0c9b2132088aeabf53704ad24a75
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_cp.h"
28 #include "gnunet-service-fs_pe.h"
29 #include "gnunet-service-fs_pr.h"
30
31
32 /**
33  * Information we keep per request per peer.  This is a doubly-linked
34  * list (with head and tail in the 'struct GSF_PendingRequestData')
35  * with one entry in each heap of each 'struct PeerPlan'.  Each
36  * entry tracks information relevant for this request and this peer.
37  */
38 struct GSF_RequestPlan
39 {
40
41   /**
42    * This is a doubly-linked list.
43    */
44   struct GSF_RequestPlan *next;
45
46   /**
47    * This is a doubly-linked list.
48    */
49   struct GSF_RequestPlan *prev;
50
51   /**
52    * Heap node associated with this request and this peer.
53    */
54   struct GNUNET_CONTAINER_HeapNode *hn;
55
56   /**
57    * Associated pending request.
58    */
59   struct GSF_PendingRequest *pr;
60
61   /**
62    * Earliest time we'd be happy to (re)transmit this request.
63    */
64   struct GNUNET_TIME_Absolute earliest_transmission;
65
66   /**
67    * When was the last time we transmitted this request to this peer? 0 for never.
68    */
69   struct GNUNET_TIME_Absolute last_transmission;
70
71   /**
72    * Current priority for this request for this target.
73    */
74   uint64_t priority;
75
76   /**
77    * How often did we transmit this request to this peer?
78    */
79   unsigned int transmission_counter;
80
81 };
82
83
84 /**
85  * Transmission plan for a peer.
86  */
87 struct PeerPlan
88 {
89   /**
90    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
91    */
92   struct GNUNET_CONTAINER_Heap *priority_heap;
93
94   /**
95    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
96    */
97   struct GNUNET_CONTAINER_Heap *delay_heap;
98
99   /**
100    * Current transmission request handle.
101    */
102   struct GSF_PeerTransmitHandle *pth;
103
104   /**
105    * Peer for which this is the plan.
106    */
107   struct GSF_ConnectedPeer *cp;
108
109   /**
110    * Current task for executing the plan.
111    */
112   GNUNET_SCHEDULER_TaskIdentifier task;
113 };
114
115
116 /**
117  * Hash map from peer identities to PeerPlans.
118  */
119 static struct GNUNET_CONTAINER_MultiHashMap *plans;
120
121
122 /**
123  * Insert the given request plan into the heap with the appropriate weight.
124  *
125  * @param pp associated peer's plan
126  * @param rp request to plan
127  */
128 static void
129 plan (struct PeerPlan *pp,
130       struct GSF_RequestPlan *rp)
131 {
132   struct GSF_PendingRequestData *prd;
133
134   prd = GSF_pending_request_get_data_ (rp->pr);
135   // FIXME: calculate 'rp->earliest_transmission'!
136   // fIXME: claculate 'rp->priority'! 
137
138   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
139     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
140                                            rp,
141                                            rp->priority);
142   else
143     rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
144                                            rp,
145                                            rp->earliest_transmission.abs_value);
146 }
147
148
149 /**
150  * Figure out when and how to transmit to the given peer.
151  *
152  * @param cls the 'struct GSF_ConnectedPeer' for transmission
153  * @param tc scheduler context
154  */
155 static void
156 schedule_peer_transmission (void *cls,
157                             const struct GNUNET_SCHEDULER_TaskContext *tc);
158
159
160 /**
161  * Function called to get a message for transmission.
162  *
163  * @param cls closure
164  * @param buf_size number of bytes available in buf
165  * @param buf where to copy the message, NULL on error (peer disconnect)
166  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
167  */
168 static size_t 
169 transmit_message_callback (void *cls,
170                            size_t buf_size,
171                            void *buf)
172 {
173   struct PeerPlan *pp = cls;
174   struct GSF_RequestPlan *rp;
175   size_t msize;
176
177   pp->pth = NULL;
178   if (NULL == buf)
179     {
180       /* failed, try again... */
181       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
182       return 0;
183     }
184   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
185   if (NULL == rp)
186     {
187       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
188       return 0;
189     }
190   msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf);
191   if (msize > buf_size)
192     {
193       /* buffer to small (message changed), try again */
194       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
195       return 0;
196     }
197   /* remove from root, add again elsewhere... */
198   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
199   rp->hn = NULL;
200   rp->last_transmission = GNUNET_TIME_absolute_get ();
201   rp->transmission_counter++;
202   plan (pp, rp);
203   return msize;
204 }
205
206
207 /**
208  * Figure out when and how to transmit to the given peer.
209  *
210  * @param cls the 'struct PeerPlan'
211  * @param tc scheduler context
212  */
213 static void
214 schedule_peer_transmission (void *cls,
215                             const struct GNUNET_SCHEDULER_TaskContext *tc)
216 {
217   struct PeerPlan *pp = cls;
218   struct GSF_RequestPlan *rp;
219   struct GSF_PendingRequestData *prd;
220   size_t msize;
221
222   pp->task = GNUNET_SCHEDULER_NO_TASK;
223   GNUNET_assert (NULL == pp->pth);
224   /* move ready requests to priority queue */
225   while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
226           (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
227     {
228       rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap);
229       GNUNET_CONTAINER_heap_insert (pp->priority_heap,
230                                     rp, 
231                                     rp->priority);                                      
232       if (NULL == (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap)))
233         break;
234     }   
235   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
236     {
237       /* priority heap (still) empty, check for delay... */
238       rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
239       if (NULL == rp)
240         return; /* both queues empty */
241       pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
242                                                &schedule_peer_transmission,
243                                                pp);
244       return;
245     }
246   /* process from priority heap */
247   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
248   GNUNET_assert (NULL != rp);
249   prd = GSF_pending_request_get_data_ (rp->pr);
250   msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);                                      
251   pp->pth = GSF_peer_transmit_ (pp->cp,
252                                 GNUNET_YES,
253                                 rp->priority,
254                                 GNUNET_TIME_UNIT_FOREVER_REL,
255                                 msize,
256                                 &transmit_message_callback,
257                                 pp);
258   GNUNET_assert (NULL != pp->pth);
259 }
260
261
262 /**
263  * Create a new query plan entry.
264  *
265  * @param cp peer with the entry
266  * @param pr request with the entry
267  */
268 void
269 GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
270                struct GSF_PendingRequest *pr)
271 {
272   struct GNUNET_PeerIdentity id;
273   struct PeerPlan *pp;
274   struct GSF_PendingRequestData *prd;
275   struct GSF_RequestPlan *rp;
276   
277   GSF_connected_peer_get_identity_ (cp, &id);
278   pp = GNUNET_CONTAINER_multihashmap_get (plans,
279                                           &id.hashPubKey);
280   if (NULL == pp)
281     {
282       pp = GNUNET_malloc (sizeof (struct PeerPlan));
283       pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
284       pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
285       GNUNET_CONTAINER_multihashmap_put (plans,
286                                          &id.hashPubKey,
287                                          pp,
288                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
289     }
290   prd = GSF_pending_request_get_data_ (pr);
291   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
292   rp->pr = pr;
293   GNUNET_CONTAINER_DLL_insert (prd->rp_head,
294                                prd->rp_tail,
295                                rp);
296   plan (pp, rp);
297   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
298     {
299       /* no request that should be done immediately, figure out delay */
300       if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap))
301         return; /* did not change delay heap top, no need to do anything */
302       GNUNET_assert (NULL == pp->pth);
303       if (GNUNET_SCHEDULER_NO_TASK != pp->task)
304         GNUNET_SCHEDULER_cancel (pp->task);
305       pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
306                                                &schedule_peer_transmission,
307                                                pp);
308       return;
309     }
310
311   if (pp->pth != NULL)
312     {
313       if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap))
314         return; /* did not change priority heap top, no need to do anyhing */
315       GSF_peer_transmit_cancel_ (pp->pth);
316       pp->pth = NULL;
317     }
318   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
319     GNUNET_SCHEDULER_cancel (pp->task);
320   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
321                                        pp);
322 }
323
324
325 /**
326  * Notify the plan about a peer being no longer available;
327  * destroy all entries associated with this peer.
328  *
329  * @param cp connected peer 
330  */
331 void
332 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
333 {
334   struct GNUNET_PeerIdentity id;
335   struct PeerPlan *pp;
336   struct GSF_RequestPlan *rp;
337   struct GSF_PendingRequestData *prd;
338
339   GSF_connected_peer_get_identity_ (cp, &id);
340   pp = GNUNET_CONTAINER_multihashmap_get (plans,
341                                           &id.hashPubKey);
342   if (NULL == pp)
343     return; /* nothing was ever planned for this peer */
344   GNUNET_CONTAINER_multihashmap_remove (plans,
345                                         &id.hashPubKey,
346                                         pp);
347   if (NULL != pp->pth)
348     GSF_peer_transmit_cancel_ (pp->pth);
349   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
350     GNUNET_SCHEDULER_cancel (pp->task);
351   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
352     {
353       prd = GSF_pending_request_get_data_ (rp->pr);
354       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
355                                    prd->rp_tail,
356                                    rp);
357       GNUNET_free (rp);
358     }
359   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
360   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
361     {
362       prd = GSF_pending_request_get_data_ (rp->pr);
363       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
364                                    prd->rp_tail,
365                                    rp);
366       GNUNET_free (rp);
367     }
368   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
369   GNUNET_free (pp);
370 }
371
372
373 /**
374  * Notify the plan about a request being done; destroy all entries
375  * associated with this request.
376  *
377  * @param pr request that is done
378  */
379 void
380 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
381 {
382   struct GSF_RequestPlan *rp;
383   struct GSF_PendingRequestData *prd;
384
385   prd = GSF_pending_request_get_data_ (pr);
386   while (NULL != (rp = prd->rp_head))
387     {
388       GNUNET_CONTAINER_heap_remove_node (rp->hn);
389       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
390                                    prd->rp_tail,
391                                    rp);
392       GNUNET_free (rp);
393     }
394 }
395
396
397 /**
398  * Initialize plan subsystem.
399  */
400 void
401 GSF_plan_init ()
402 {
403   plans = GNUNET_CONTAINER_multihashmap_create (256);
404 }
405
406
407 /**
408  * Shutdown plan subsystem.
409  */
410 void
411 GSF_plan_done ()
412 {
413   GNUNET_assert (0 == 
414                  GNUNET_CONTAINER_multihashmap_size (plans));
415   GNUNET_CONTAINER_multihashmap_destroy (plans);
416 }
417
418
419
420 /* end of gnunet-service-fs_pe.h */