816328392646fabccbac48743210ae63d4c93678
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_cp.h"
28 #include "gnunet-service-fs_pe.h"
29 #include "gnunet-service-fs_pr.h"
30
31
32 /**
33  * Information we keep per request per peer.  This is a doubly-linked
34  * list (with head and tail in the 'struct GSF_PendingRequestData')
35  * with one entry in each heap of each 'struct PeerPlan'.  Each
36  * entry tracks information relevant for this request and this peer.
37  */
38 struct GSF_RequestPlan
39 {
40
41   /**
42    * This is a doubly-linked list.
43    */
44   struct GSF_RequestPlan *next;
45
46   /**
47    * This is a doubly-linked list.
48    */
49   struct GSF_RequestPlan *prev;
50
51   /**
52    * Heap node associated with this request and this peer.
53    */
54   struct GNUNET_CONTAINER_HeapNode *hn;
55
56   /**
57    * Associated pending request.
58    */
59   struct GSF_PendingRequest *pr;
60
61   /**
62    * Earliest time we'd be happy to (re)transmit this request.
63    */
64   struct GNUNET_TIME_Absolute earliest_transmission;
65
66   /**
67    * When was the last time we transmitted this request to this peer? 0 for never.
68    */
69   struct GNUNET_TIME_Absolute last_transmission;
70
71   /**
72    * Current priority for this request for this target.
73    */
74   uint64_t priority;
75
76   /**
77    * How often did we transmit this request to this peer?
78    */
79   unsigned int transmission_counter;
80
81 };
82
83
84 /**
85  * Transmission plan for a peer.
86  */
87 struct PeerPlan
88 {
89   /**
90    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
91    */
92   struct GNUNET_CONTAINER_Heap *priority_heap;
93
94   /**
95    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
96    */
97   struct GNUNET_CONTAINER_Heap *delay_heap;
98
99   /**
100    * Current transmission request handle.
101    */
102   struct GSF_PeerTransmitHandle *pth;
103
104   /**
105    * Peer for which this is the plan.
106    */
107   struct GSF_ConnectedPeer *cp;
108
109   /**
110    * Current task for executing the plan.
111    */
112   GNUNET_SCHEDULER_TaskIdentifier task;
113 };
114
115
116 /**
117  * Hash map from peer identities to PeerPlans.
118  */
119 static struct GNUNET_CONTAINER_MultiHashMap *plans;
120
121
122 /**
123  * Insert the given request plan into the heap with the appropriate weight.
124  *
125  * @param pp associated peer's plan
126  * @param rp request to plan
127  */
128 static void
129 plan (struct PeerPlan *pp,
130       struct GSF_RequestPlan *rp)
131 {
132   struct GSF_PendingRequestData *prd;
133
134   prd = GSF_pending_request_get_data_ (rp->pr);
135   // FIXME: calculate 'rp->earliest_transmission'!
136   // fIXME: claculate 'rp->priority'! 
137
138   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
139     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
140                                            rp,
141                                            rp->priority);
142   else
143     rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
144                                            rp,
145                                            rp->earliest_transmission.abs_value);
146 }
147
148
149 /**
150  * Figure out when and how to transmit to the given peer.
151  *
152  * @param cls the 'struct GSF_ConnectedPeer' for transmission
153  * @param tc scheduler context
154  */
155 static void
156 schedule_peer_transmission (void *cls,
157                             const struct GNUNET_SCHEDULER_TaskContext *tc);
158
159
160 /**
161  * Function called to get a message for transmission.
162  *
163  * @param cls closure
164  * @param buf_size number of bytes available in buf
165  * @param buf where to copy the message, NULL on error (peer disconnect)
166  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
167  */
168 static size_t 
169 transmit_message_callback (void *cls,
170                            size_t buf_size,
171                            void *buf)
172 {
173   struct PeerPlan *pp = cls;
174   struct GSF_RequestPlan *rp;
175   size_t msize;
176
177   pp->pth = NULL;
178   if (NULL == buf)
179     {
180       /* failed, try again... */
181       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
182       return 0;
183     }
184   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
185   if (NULL == rp)
186     {
187       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
188       return 0;
189     }
190   msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf);
191   if (msize > buf_size)
192     {
193       /* buffer to small (message changed), try again */
194       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
195       return 0;
196     }
197   /* remove from root, add again elsewhere... */
198   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
199   rp->hn = NULL;
200   rp->last_transmission = GNUNET_TIME_absolute_get ();
201   rp->transmission_counter++;
202   plan (pp, rp);
203   return msize;
204 }
205
206
207 /**
208  * Figure out when and how to transmit to the given peer.
209  *
210  * @param cls the 'struct PeerPlan'
211  * @param tc scheduler context
212  */
213 static void
214 schedule_peer_transmission (void *cls,
215                             const struct GNUNET_SCHEDULER_TaskContext *tc)
216 {
217   struct PeerPlan *pp = cls;
218   struct GSF_RequestPlan *rp;
219   struct GSF_PendingRequestData *prd;
220   size_t msize;
221
222   pp->task = GNUNET_SCHEDULER_NO_TASK;
223   GNUNET_assert (NULL == pp->pth);
224   /* move ready requests to priority queue */
225   while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
226           (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
227     {
228       rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap);
229       GNUNET_CONTAINER_heap_insert (pp->priority_heap,
230                                     rp, 
231                                     rp->priority);                                      
232       if (NULL == (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap)))
233         break;
234     }   
235   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
236     {
237       /* priority heap (still) empty, check for delay... */
238       rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
239       if (NULL == rp)
240         return; /* both queues empty */
241       pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
242                                                &schedule_peer_transmission,
243                                                pp);
244       return;
245     }
246   /* process from priority heap */
247   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
248   prd = GSF_pending_request_get_data_ (rp->pr);
249   msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);                                      
250   pp->pth = GSF_peer_transmit_ (pp->cp,
251                                 GNUNET_YES,
252                                 rp->priority,
253                                 GNUNET_TIME_UNIT_FOREVER_REL,
254                                 msize,
255                                 &transmit_message_callback,
256                                 pp);
257   GNUNET_assert (NULL != pp->pth);
258 }
259
260
261 /**
262  * Create a new query plan entry.
263  *
264  * @param cp peer with the entry
265  * @param pr request with the entry
266  */
267 void
268 GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
269                struct GSF_PendingRequest *pr)
270 {
271   struct GNUNET_PeerIdentity id;
272   struct PeerPlan *pp;
273   struct GSF_PendingRequestData *prd;
274   struct GSF_RequestPlan *rp;
275   
276   GSF_connected_peer_get_identity_ (cp, &id);
277   pp = GNUNET_CONTAINER_multihashmap_get (plans,
278                                           &id.hashPubKey);
279   if (NULL == pp)
280     {
281       pp = GNUNET_malloc (sizeof (struct PeerPlan));
282       pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
283       pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
284       GNUNET_CONTAINER_multihashmap_put (plans,
285                                          &id.hashPubKey,
286                                          pp,
287                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
288     }
289   prd = GSF_pending_request_get_data_ (pr);
290   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
291   rp->pr = pr;
292   GNUNET_CONTAINER_DLL_insert (prd->rp_head,
293                                prd->rp_tail,
294                                rp);
295   plan (pp, rp);
296   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
297     {
298       /* no request that should be done immediately, figure out delay */
299       if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap))
300         return; /* did not change delay heap top, no need to do anything */
301       GNUNET_assert (NULL == pp->pth);
302       if (GNUNET_SCHEDULER_NO_TASK != pp->task)
303         GNUNET_SCHEDULER_cancel (pp->task);
304       pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
305                                                &schedule_peer_transmission,
306                                                pp);
307       return;
308     }
309
310   if (pp->pth != NULL)
311     {
312       if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap))
313         return; /* did not change priority heap top, no need to do anyhing */
314       GSF_peer_transmit_cancel_ (pp->pth);
315       pp->pth = NULL;
316     }
317   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
318     GNUNET_SCHEDULER_cancel (pp->task);
319   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
320                                        pp);
321 }
322
323
324 /**
325  * Notify the plan about a peer being no longer available;
326  * destroy all entries associated with this peer.
327  *
328  * @param cp connected peer 
329  */
330 void
331 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
332 {
333   struct GNUNET_PeerIdentity id;
334   struct PeerPlan *pp;
335   struct GSF_RequestPlan *rp;
336   struct GSF_PendingRequestData *prd;
337
338   GSF_connected_peer_get_identity_ (cp, &id);
339   pp = GNUNET_CONTAINER_multihashmap_get (plans,
340                                           &id.hashPubKey);
341   GNUNET_CONTAINER_multihashmap_remove (plans,
342                                         &id.hashPubKey,
343                                         pp);
344   if (NULL != pp->pth)
345     GSF_peer_transmit_cancel_ (pp->pth);
346   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
347     GNUNET_SCHEDULER_cancel (pp->task);
348   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
349     {
350       prd = GSF_pending_request_get_data_ (rp->pr);
351       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
352                                    prd->rp_tail,
353                                    rp);
354       GNUNET_free (rp);
355     }
356   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
357   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
358     {
359       prd = GSF_pending_request_get_data_ (rp->pr);
360       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
361                                    prd->rp_tail,
362                                    rp);
363       GNUNET_free (rp);
364     }
365   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
366   GNUNET_free (pp);
367 }
368
369
370 /**
371  * Notify the plan about a request being done; destroy all entries
372  * associated with this request.
373  *
374  * @param pr request that is done
375  */
376 void
377 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
378 {
379   struct GSF_RequestPlan *rp;
380   struct GSF_PendingRequestData *prd;
381
382   prd = GSF_pending_request_get_data_ (pr);
383   while (NULL != (rp = prd->rp_head))
384     {
385       GNUNET_CONTAINER_heap_remove_node (rp->hn);
386       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
387                                    prd->rp_tail,
388                                    rp);
389       GNUNET_free (rp);
390     }
391 }
392
393
394 /**
395  * Initialize plan subsystem.
396  */
397 void
398 GSF_plan_init ()
399 {
400   plans = GNUNET_CONTAINER_multihashmap_create (256);
401 }
402
403
404 /**
405  * Shutdown plan subsystem.
406  */
407 void
408 GSF_plan_done ()
409 {
410   GNUNET_assert (0 == 
411                  GNUNET_CONTAINER_multihashmap_size (plans));
412   GNUNET_CONTAINER_multihashmap_destroy (plans);
413 }
414
415
416
417 /* end of gnunet-service-fs_pe.h */