prio
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_cp.h"
28 #include "gnunet-service-fs_pe.h"
29 #include "gnunet-service-fs_pr.h"
30
31
32 /**
33  * Information we keep per request per peer.  This is a doubly-linked
34  * list (with head and tail in the 'struct GSF_PendingRequestData')
35  * with one entry in each heap of each 'struct PeerPlan'.  Each
36  * entry tracks information relevant for this request and this peer.
37  */
38 struct GSF_RequestPlan
39 {
40
41   /**
42    * This is a doubly-linked list.
43    */
44   struct GSF_RequestPlan *next;
45
46   /**
47    * This is a doubly-linked list.
48    */
49   struct GSF_RequestPlan *prev;
50
51   /**
52    * Heap node associated with this request and this peer.
53    */
54   struct GNUNET_CONTAINER_HeapNode *hn;
55
56   /**
57    * Associated pending request.
58    */
59   struct GSF_PendingRequest *pr;
60
61   /**
62    * Earliest time we'd be happy to transmit this request.
63    */
64   struct GNUNET_TIME_Absolute earliest_transmission;
65
66   /**
67    * Priority for this request for this target.
68    */
69   uint32_t priority;
70
71 };
72
73
74 /**
75  * Transmission plan for a peer.
76  */
77 struct PeerPlan
78 {
79   /**
80    * Heap with pending queries (struct GSF_RequestPlan), smaller weights mean higher priority.
81    */
82   struct GNUNET_CONTAINER_Heap *heap;
83
84   /**
85    * Current transmission request handle.
86    */
87   struct GSF_PeerTransmitHandle *pth;
88
89   /**
90    * Peer for which this is the plan.
91    */
92   struct GSF_ConnectedPeer *cp;
93
94   /**
95    * Current task for executing the plan.
96    */
97   GNUNET_SCHEDULER_TaskIdentifier task;
98 };
99
100
101 /**
102  * Hash map from peer identities to PeerPlans.
103  */
104 static struct GNUNET_CONTAINER_MultiHashMap *plans;
105
106
107 /**
108  * Insert the given request plan into the heap with the appropriate weight.
109  *
110  * @param pp associated peer's plan
111  * @param rp request to plan
112  */
113 static void
114 plan (struct PeerPlan *pp,
115       struct GSF_RequestPlan *rp)
116 {
117   GNUNET_CONTAINER_HeapCostType weight;
118   struct GSF_PendingRequestData *prd;
119
120   prd = GSF_pending_request_get_data_ (rp->pr);
121   weight = 0; // FIXME: calculate real weight!
122   // FIXME: calculate 'rp->earliest_transmission'!
123   // fIXME: claculate 'rp->priority'! 
124   rp->hn = GNUNET_CONTAINER_heap_insert (pp->heap,
125                                          rp,
126                                          weight);
127 }
128
129
130 /**
131  * Figure out when and how to transmit to the given peer.
132  *
133  * @param cls the 'struct GSF_ConnectedPeer' for transmission
134  * @param tc scheduler context
135  */
136 static void
137 schedule_peer_transmission (void *cls,
138                             const struct GNUNET_SCHEDULER_TaskContext *tc);
139
140
141 /**
142  * Function called to get a message for transmission.
143  *
144  * @param cls closure
145  * @param buf_size number of bytes available in buf
146  * @param buf where to copy the message, NULL on error (peer disconnect)
147  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
148  */
149 static size_t 
150 transmit_message_callback (void *cls,
151                            size_t buf_size,
152                            void *buf)
153 {
154   struct PeerPlan *pp = cls;
155   struct GSF_RequestPlan *rp;
156   size_t msize;
157
158   if (NULL == buf)
159     {
160       /* failed, try again... */
161       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
162       return 0;
163     }
164   rp = GNUNET_CONTAINER_heap_peek (pp->heap);
165   msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf);
166   if (msize > buf_size)
167     {
168       /* buffer to small (message changed), try again */
169       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
170       return 0;
171     }
172   /* remove from root, add again elsewhere... */
173   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->heap));
174   rp->hn = NULL;
175   plan (pp, rp);
176   return msize;
177 }
178
179
180 /**
181  * Figure out when and how to transmit to the given peer.
182  *
183  * @param cls the 'struct PeerPlan'
184  * @param tc scheduler context
185  */
186 static void
187 schedule_peer_transmission (void *cls,
188                             const struct GNUNET_SCHEDULER_TaskContext *tc)
189 {
190   struct PeerPlan *pp = cls;
191   struct GSF_RequestPlan *rp;
192   struct GSF_PendingRequestData *prd;
193   size_t msize;
194   struct GNUNET_TIME_Relative delay;
195
196   pp->task = GNUNET_SCHEDULER_NO_TASK;
197   if (NULL == pp->heap)
198     return;
199   if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap))
200     return;
201   GNUNET_assert (NULL == pp->pth);
202   rp = GNUNET_CONTAINER_heap_peek (pp->heap);
203   prd = GSF_pending_request_get_data_ (rp->pr);
204   delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
205   if (delay.rel_value > 0)
206     {
207       pp->task = GNUNET_SCHEDULER_add_delayed (delay,
208                                                &schedule_peer_transmission,
209                                                pp);
210       return;
211     }
212   msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);                                      
213   pp->pth = GSF_peer_transmit_ (pp->cp,
214                                 GNUNET_YES,
215                                 rp->priority,
216                                 GNUNET_TIME_UNIT_FOREVER_REL,
217                                 msize,
218                                 &transmit_message_callback,
219                                 pp);
220   GNUNET_assert (NULL != pp->pth);
221 }
222
223
224 /**
225  * Create a new query plan entry.
226  *
227  * @param cp peer with the entry
228  * @param pr request with the entry
229  */
230 void
231 GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
232                struct GSF_PendingRequest *pr)
233 {
234   struct GNUNET_PeerIdentity id;
235   struct PeerPlan *pp;
236   struct GSF_PendingRequestData *prd;
237   struct GSF_RequestPlan *rp;
238   
239   GSF_connected_peer_get_identity_ (cp, &id);
240   pp = GNUNET_CONTAINER_multihashmap_get (plans,
241                                           &id.hashPubKey);
242   if (NULL == pp)
243     {
244       pp = GNUNET_malloc (sizeof (struct PeerPlan));
245       pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
246       GNUNET_CONTAINER_multihashmap_put (plans,
247                                          &id.hashPubKey,
248                                          pp,
249                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
250     }
251   prd = GSF_pending_request_get_data_ (pr);
252   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
253   rp->pr = pr;
254   GNUNET_CONTAINER_DLL_insert (prd->rp_head,
255                                prd->rp_tail,
256                                rp);
257   plan (pp, rp);
258   if (pp->pth != NULL)
259     {
260       if (rp != GNUNET_CONTAINER_heap_peek (pp->heap))
261         return;
262       GSF_peer_transmit_cancel_ (pp->pth);
263       pp->pth = NULL;
264     }
265   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
266     GNUNET_SCHEDULER_cancel (pp->task);
267   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
268                                        pp);
269 }
270
271
272 /**
273  * Notify the plan about a peer being no longer available;
274  * destroy all entries associated with this peer.
275  *
276  * @param cp connected peer 
277  */
278 void
279 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
280 {
281   struct GNUNET_PeerIdentity id;
282   struct PeerPlan *pp;
283   struct GSF_RequestPlan *rp;
284   struct GSF_PendingRequestData *prd;
285
286   GSF_connected_peer_get_identity_ (cp, &id);
287   pp = GNUNET_CONTAINER_multihashmap_get (plans,
288                                           &id.hashPubKey);
289   GNUNET_CONTAINER_multihashmap_remove (plans,
290                                         &id.hashPubKey,
291                                         pp);
292   if (NULL != pp->pth)
293     GSF_peer_transmit_cancel_ (pp->pth);
294   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
295     GNUNET_SCHEDULER_cancel (pp->task);
296   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->heap)))
297     {
298       prd = GSF_pending_request_get_data_ (rp->pr);
299       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
300                                    prd->rp_tail,
301                                    rp);
302       GNUNET_free (rp);
303     }
304   GNUNET_CONTAINER_heap_destroy (pp->heap);
305   GNUNET_free (pp);
306 }
307
308
309 /**
310  * Notify the plan about a request being done; destroy all entries
311  * associated with this request.
312  *
313  * @param pr request that is done
314  */
315 void
316 GSF_plan_notify_request_done_ (const struct GSF_PendingRequest *pr)
317 {
318   struct GSF_RequestPlan *rp;
319   struct GSF_PendingRequestData *prd;
320
321   while (NULL != (rp = prd->rp_head))
322     {
323       prd = GSF_pending_request_get_data_ (rp->pr);
324       GNUNET_CONTAINER_heap_remove_node (rp->hn);
325       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
326                                    prd->rp_tail,
327                                    rp);
328       GNUNET_free (rp);
329     }
330 }
331
332
333 /**
334  * Initialize plan subsystem.
335  */
336 void
337 GSF_plan_init ()
338 {
339   plans = GNUNET_CONTAINER_multihashmap_create (256);
340 }
341
342
343 /**
344  * Shutdown plan subsystem.
345  */
346 void
347 GSF_plan_done ()
348 {
349   GNUNET_assert (0 == 
350                  GNUNET_CONTAINER_multihashmap_size (plans));
351   GNUNET_CONTAINER_multihashmap_destroy (plans);
352 }
353
354
355
356 /* end of gnunet-service-fs_pe.h */