7b0f874fff5524018f472b3f6552f0797ad55327
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_cp.h"
28 #include "gnunet-service-fs_pe.h"
29 #include "gnunet-service-fs_pr.h"
30
31
32 /**
33  * Information we keep per request per peer.  This is a doubly-linked
34  * list (with head and tail in the 'struct GSF_PendingRequestData')
35  * with one entry in each heap of each 'struct PeerPlan'.  Each
36  * entry tracks information relevant for this request and this peer.
37  */
38 struct GSF_RequestPlan
39 {
40
41   /**
42    * This is a doubly-linked list.
43    */
44   struct GSF_RequestPlan *next;
45
46   /**
47    * This is a doubly-linked list.
48    */
49   struct GSF_RequestPlan *prev;
50
51   /**
52    * Heap node associated with this request and this peer.
53    */
54   struct GNUNET_CONTAINER_HeapNode *hn;
55
56   /**
57    * Associated pending request.
58    */
59   struct GSF_PendingRequest *pr;
60
61   /**
62    * Earliest time we'd be happy to (re)transmit this request.
63    */
64   struct GNUNET_TIME_Absolute earliest_transmission;
65
66   /**
67    * When was the last time we transmitted this request to this peer? 0 for never.
68    */
69   struct GNUNET_TIME_Absolute last_transmission;
70
71   /**
72    * Current priority for this request for this target.
73    */
74   uint64_t priority;
75
76   /**
77    * How often did we transmit this request to this peer?
78    */
79   unsigned int transmission_counter;
80
81 };
82
83
84 /**
85  * Transmission plan for a peer.
86  */
87 struct PeerPlan
88 {
89   /**
90    * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
91    */
92   struct GNUNET_CONTAINER_Heap *priority_heap;
93
94   /**
95    * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
96    */
97   struct GNUNET_CONTAINER_Heap *delay_heap;
98
99   /**
100    * Current transmission request handle.
101    */
102   struct GSF_PeerTransmitHandle *pth;
103
104   /**
105    * Peer for which this is the plan.
106    */
107   struct GSF_ConnectedPeer *cp;
108
109   /**
110    * Current task for executing the plan.
111    */
112   GNUNET_SCHEDULER_TaskIdentifier task;
113 };
114
115
116 /**
117  * Hash map from peer identities to PeerPlans.
118  */
119 static struct GNUNET_CONTAINER_MultiHashMap *plans;
120
121
122 /**
123  * Insert the given request plan into the heap with the appropriate weight.
124  *
125  * @param pp associated peer's plan
126  * @param rp request to plan
127  */
128 static void
129 plan (struct PeerPlan *pp,
130       struct GSF_RequestPlan *rp)
131 {
132   struct GSF_PendingRequestData *prd;
133
134   prd = GSF_pending_request_get_data_ (rp->pr);
135   // FIXME: calculate 'rp->earliest_transmission'!
136   // fIXME: claculate 'rp->priority'! 
137
138   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
139     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
140                                            rp,
141                                            rp->priority);
142   else
143     rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
144                                            rp,
145                                            rp->earliest_transmission.abs_value);
146 }
147
148
149 /**
150  * Figure out when and how to transmit to the given peer.
151  *
152  * @param cls the 'struct GSF_ConnectedPeer' for transmission
153  * @param tc scheduler context
154  */
155 static void
156 schedule_peer_transmission (void *cls,
157                             const struct GNUNET_SCHEDULER_TaskContext *tc);
158
159
160 /**
161  * Function called to get a message for transmission.
162  *
163  * @param cls closure
164  * @param buf_size number of bytes available in buf
165  * @param buf where to copy the message, NULL on error (peer disconnect)
166  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
167  */
168 static size_t 
169 transmit_message_callback (void *cls,
170                            size_t buf_size,
171                            void *buf)
172 {
173   struct PeerPlan *pp = cls;
174   struct GSF_RequestPlan *rp;
175   size_t msize;
176
177   if (NULL == buf)
178     {
179       /* failed, try again... */
180       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
181       return 0;
182     }
183   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
184   msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf);
185   if (msize > buf_size)
186     {
187       /* buffer to small (message changed), try again */
188       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
189       return 0;
190     }
191   /* remove from root, add again elsewhere... */
192   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
193   rp->hn = NULL;
194   rp->last_transmission = GNUNET_TIME_absolute_get ();
195   rp->transmission_counter++;
196   plan (pp, rp);
197   return msize;
198 }
199
200
201 /**
202  * Figure out when and how to transmit to the given peer.
203  *
204  * @param cls the 'struct PeerPlan'
205  * @param tc scheduler context
206  */
207 static void
208 schedule_peer_transmission (void *cls,
209                             const struct GNUNET_SCHEDULER_TaskContext *tc)
210 {
211   struct PeerPlan *pp = cls;
212   struct GSF_RequestPlan *rp;
213   struct GSF_PendingRequestData *prd;
214   size_t msize;
215
216   pp->task = GNUNET_SCHEDULER_NO_TASK;
217   GNUNET_assert (NULL == pp->pth);
218   /* move ready requests to priority queue */
219   while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
220           (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
221     {
222       rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap);
223       GNUNET_CONTAINER_heap_insert (pp->priority_heap,
224                                     rp, 
225                                     rp->priority);                                      
226       if (NULL == (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap)))
227         break;
228     }   
229   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
230     {
231       /* priority heap (still) empty, check for delay... */
232       rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
233       if (NULL == rp)
234         return; /* both queues empty */
235       pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
236                                                &schedule_peer_transmission,
237                                                pp);
238       return;
239     }
240   /* process from priority heap */
241   rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
242   prd = GSF_pending_request_get_data_ (rp->pr);
243   msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);                                      
244   pp->pth = GSF_peer_transmit_ (pp->cp,
245                                 GNUNET_YES,
246                                 rp->priority,
247                                 GNUNET_TIME_UNIT_FOREVER_REL,
248                                 msize,
249                                 &transmit_message_callback,
250                                 pp);
251   GNUNET_assert (NULL != pp->pth);
252 }
253
254
255 /**
256  * Create a new query plan entry.
257  *
258  * @param cp peer with the entry
259  * @param pr request with the entry
260  */
261 void
262 GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
263                struct GSF_PendingRequest *pr)
264 {
265   struct GNUNET_PeerIdentity id;
266   struct PeerPlan *pp;
267   struct GSF_PendingRequestData *prd;
268   struct GSF_RequestPlan *rp;
269   
270   GSF_connected_peer_get_identity_ (cp, &id);
271   pp = GNUNET_CONTAINER_multihashmap_get (plans,
272                                           &id.hashPubKey);
273   if (NULL == pp)
274     {
275       pp = GNUNET_malloc (sizeof (struct PeerPlan));
276       pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
277       pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
278       GNUNET_CONTAINER_multihashmap_put (plans,
279                                          &id.hashPubKey,
280                                          pp,
281                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
282     }
283   prd = GSF_pending_request_get_data_ (pr);
284   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
285   rp->pr = pr;
286   GNUNET_CONTAINER_DLL_insert (prd->rp_head,
287                                prd->rp_tail,
288                                rp);
289   plan (pp, rp);
290   if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
291     {
292       /* no request that should be done immediately, figure out delay */
293       if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap))
294         return; /* did not change delay heap top, no need to do anything */
295       GNUNET_assert (NULL == pp->pth);
296       if (GNUNET_SCHEDULER_NO_TASK != pp->task)
297         GNUNET_SCHEDULER_cancel (pp->task);
298       pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
299                                                &schedule_peer_transmission,
300                                                pp);
301       return;
302     }
303
304   if (pp->pth != NULL)
305     {
306       if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap))
307         return; /* did not change priority heap top, no need to do anyhing */
308       GSF_peer_transmit_cancel_ (pp->pth);
309       pp->pth = NULL;
310     }
311   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
312     GNUNET_SCHEDULER_cancel (pp->task);
313   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
314                                        pp);
315 }
316
317
318 /**
319  * Notify the plan about a peer being no longer available;
320  * destroy all entries associated with this peer.
321  *
322  * @param cp connected peer 
323  */
324 void
325 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
326 {
327   struct GNUNET_PeerIdentity id;
328   struct PeerPlan *pp;
329   struct GSF_RequestPlan *rp;
330   struct GSF_PendingRequestData *prd;
331
332   GSF_connected_peer_get_identity_ (cp, &id);
333   pp = GNUNET_CONTAINER_multihashmap_get (plans,
334                                           &id.hashPubKey);
335   GNUNET_CONTAINER_multihashmap_remove (plans,
336                                         &id.hashPubKey,
337                                         pp);
338   if (NULL != pp->pth)
339     GSF_peer_transmit_cancel_ (pp->pth);
340   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
341     GNUNET_SCHEDULER_cancel (pp->task);
342   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
343     {
344       prd = GSF_pending_request_get_data_ (rp->pr);
345       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
346                                    prd->rp_tail,
347                                    rp);
348       GNUNET_free (rp);
349     }
350   GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
351   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
352     {
353       prd = GSF_pending_request_get_data_ (rp->pr);
354       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
355                                    prd->rp_tail,
356                                    rp);
357       GNUNET_free (rp);
358     }
359   GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
360   GNUNET_free (pp);
361 }
362
363
364 /**
365  * Notify the plan about a request being done; destroy all entries
366  * associated with this request.
367  *
368  * @param pr request that is done
369  */
370 void
371 GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
372 {
373   struct GSF_RequestPlan *rp;
374   struct GSF_PendingRequestData *prd;
375
376   prd = GSF_pending_request_get_data_ (pr);
377   while (NULL != (rp = prd->rp_head))
378     {
379       GNUNET_CONTAINER_heap_remove_node (rp->hn);
380       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
381                                    prd->rp_tail,
382                                    rp);
383       GNUNET_free (rp);
384     }
385 }
386
387
388 /**
389  * Initialize plan subsystem.
390  */
391 void
392 GSF_plan_init ()
393 {
394   plans = GNUNET_CONTAINER_multihashmap_create (256);
395 }
396
397
398 /**
399  * Shutdown plan subsystem.
400  */
401 void
402 GSF_plan_done ()
403 {
404   GNUNET_assert (0 == 
405                  GNUNET_CONTAINER_multihashmap_size (plans));
406   GNUNET_CONTAINER_multihashmap_destroy (plans);
407 }
408
409
410
411 /* end of gnunet-service-fs_pe.h */