306bb948c657a8d65c6643c58bcddf7dda5e3396
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_cp.h"
28 #include "gnunet-service-fs_pe.h"
29 #include "gnunet-service-fs_pr.h"
30
31
32 /**
33  * Information we keep per request per peer.  This is a doubly-linked
34  * list (with head and tail in the 'struct GSF_PendingRequestData')
35  * with one entry in each heap of each 'struct PeerPlan'.  Each
36  * entry tracks information relevant for this request and this peer.
37  */
38 struct GSF_RequestPlan
39 {
40
41   /**
42    * This is a doubly-linked list.
43    */
44   struct GSF_RequestPlan *next;
45
46   /**
47    * This is a doubly-linked list.
48    */
49   struct GSF_RequestPlan *prev;
50
51   /**
52    * Heap node associated with this request and this peer.
53    */
54   struct GNUNET_CONTAINER_HeapNode *hn;
55
56   /**
57    * Associated pending request.
58    */
59   struct GSF_PendingRequest *pr;
60
61   /**
62    * Earliest time we'd be happy to transmit this request.
63    */
64   struct GNUNET_TIME_Absolute earliest_transmission;
65
66 };
67
68
69 /**
70  * Transmission plan for a peer.
71  */
72 struct PeerPlan
73 {
74   /**
75    * Heap with pending queries (struct GSF_RequestPlan), smaller weights mean higher priority.
76    */
77   struct GNUNET_CONTAINER_Heap *heap;
78
79   /**
80    * Current transmission request handle.
81    */
82   struct GSF_PeerTransmitHandle *pth;
83
84   /**
85    * Peer for which this is the plan.
86    */
87   struct GSF_ConnectedPeer *cp;
88
89   /**
90    * Current task for executing the plan.
91    */
92   GNUNET_SCHEDULER_TaskIdentifier task;
93 };
94
95
96 /**
97  * Hash map from peer identities to PeerPlans.
98  */
99 static struct GNUNET_CONTAINER_MultiHashMap *plans;
100
101
102 /**
103  * Insert the given request plan into the heap with the appropriate weight.
104  *
105  * @param pp associated peer's plan
106  * @param rp request to plan
107  */
108 static void
109 plan (struct PeerPlan *pp,
110       struct GSF_RequestPlan *rp)
111 {
112   GNUNET_CONTAINER_HeapCostType weight;
113   struct GSF_PendingRequestData *prd;
114
115   prd = GSF_pending_request_get_data_ (rp->pr);
116   weight = 0; // FIXME: calculate real weight!
117   // FIXME: calculate 'rp->earliest_transmission'!
118   rp->hn = GNUNET_CONTAINER_heap_insert (pp->heap,
119                                          rp,
120                                          weight);
121 }
122
123
124 /**
125  * Figure out when and how to transmit to the given peer.
126  *
127  * @param cls the 'struct GSF_ConnectedPeer' for transmission
128  * @param tc scheduler context
129  */
130 static void
131 schedule_peer_transmission (void *cls,
132                             const struct GNUNET_SCHEDULER_TaskContext *tc);
133
134
135 /**
136  * Function called to get a message for transmission.
137  *
138  * @param cls closure
139  * @param buf_size number of bytes available in buf
140  * @param buf where to copy the message, NULL on error (peer disconnect)
141  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
142  */
143 static size_t 
144 transmit_message_callback (void *cls,
145                            size_t buf_size,
146                            void *buf)
147 {
148   struct PeerPlan *pp = cls;
149   struct GSF_RequestPlan *rp;
150   size_t msize;
151
152   if (NULL == buf)
153     {
154       /* failed, try again... */
155       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
156       return 0;
157     }
158   rp = GNUNET_CONTAINER_heap_peek (pp->heap);
159   msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf);
160   if (msize > buf_size)
161     {
162       /* buffer to small (message changed), try again */
163       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
164       return 0;
165     }
166   /* remove from root, add again elsewhere... */
167   GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->heap));
168   rp->hn = NULL;
169   plan (pp, rp);
170   return msize;
171 }
172
173
174 /**
175  * Figure out when and how to transmit to the given peer.
176  *
177  * @param cls the 'struct PeerPlan'
178  * @param tc scheduler context
179  */
180 static void
181 schedule_peer_transmission (void *cls,
182                             const struct GNUNET_SCHEDULER_TaskContext *tc)
183 {
184   struct PeerPlan *pp = cls;
185   struct GSF_RequestPlan *rp;
186   struct GSF_PendingRequestData *prd;
187   size_t msize;
188   struct GNUNET_TIME_Relative delay;
189
190   pp->task = GNUNET_SCHEDULER_NO_TASK;
191   if (NULL == pp->heap)
192     return;
193   if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap))
194     return;
195   GNUNET_assert (NULL == pp->pth);
196   rp = GNUNET_CONTAINER_heap_peek (pp->heap);
197   prd = GSF_pending_request_get_data_ (rp->pr);
198   delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
199   if (delay.rel_value > 0)
200     {
201       pp->task = GNUNET_SCHEDULER_add_delayed (delay,
202                                                &schedule_peer_transmission,
203                                                pp);
204       return;
205     }
206   msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);                                      
207   pp->pth = GSF_peer_transmit_ (pp->cp,
208                                 GNUNET_YES,
209                                 0 /* FIXME: pr->priority? */,
210                                 GNUNET_TIME_UNIT_FOREVER_REL,
211                                 msize,
212                                 &transmit_message_callback,
213                                 pp);
214   GNUNET_assert (NULL != pp->pth);
215 }
216
217
218 /**
219  * Create a new query plan entry.
220  *
221  * @param cp peer with the entry
222  * @param pr request with the entry
223  */
224 void
225 GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
226                struct GSF_PendingRequest *pr)
227 {
228   struct GNUNET_PeerIdentity id;
229   struct PeerPlan *pp;
230   struct GSF_PendingRequestData *prd;
231   struct GSF_RequestPlan *rp;
232   
233   GSF_connected_peer_get_identity_ (cp, &id);
234   pp = GNUNET_CONTAINER_multihashmap_get (plans,
235                                           &id.hashPubKey);
236   if (NULL == pp)
237     {
238       pp = GNUNET_malloc (sizeof (struct PeerPlan));
239       pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
240       GNUNET_CONTAINER_multihashmap_put (plans,
241                                          &id.hashPubKey,
242                                          pp,
243                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
244     }
245   prd = GSF_pending_request_get_data_ (pr);
246   rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
247   rp->pr = pr;
248   GNUNET_CONTAINER_DLL_insert (prd->rp_head,
249                                prd->rp_tail,
250                                rp);
251   plan (pp, rp);
252   if (pp->pth != NULL)
253     {
254       if (rp != GNUNET_CONTAINER_heap_peek (pp->heap))
255         return;
256       GSF_peer_transmit_cancel_ (pp->pth);
257       pp->pth = NULL;
258     }
259   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
260     GNUNET_SCHEDULER_cancel (pp->task);
261   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
262                                        pp);
263 }
264
265
266 /**
267  * Notify the plan about a peer being no longer available;
268  * destroy all entries associated with this peer.
269  *
270  * @param cp connected peer 
271  */
272 void
273 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
274 {
275   struct GNUNET_PeerIdentity id;
276   struct PeerPlan *pp;
277   struct GSF_RequestPlan *rp;
278   struct GSF_PendingRequestData *prd;
279
280   GSF_connected_peer_get_identity_ (cp, &id);
281   pp = GNUNET_CONTAINER_multihashmap_get (plans,
282                                           &id.hashPubKey);
283   GNUNET_CONTAINER_multihashmap_remove (plans,
284                                         &id.hashPubKey,
285                                         pp);
286   if (NULL != pp->pth)
287     GSF_peer_transmit_cancel_ (pp->pth);
288   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
289     GNUNET_SCHEDULER_cancel (pp->task);
290   while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->heap)))
291     {
292       prd = GSF_pending_request_get_data_ (rp->pr);
293       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
294                                    prd->rp_tail,
295                                    rp);
296       GNUNET_free (rp);
297     }
298   GNUNET_CONTAINER_heap_destroy (pp->heap);
299   GNUNET_free (pp);
300 }
301
302
303 /**
304  * Notify the plan about a request being done; destroy all entries
305  * associated with this request.
306  *
307  * @param pr request that is done
308  */
309 void
310 GSF_plan_notify_request_done_ (const struct GSF_PendingRequest *pr)
311 {
312   struct GSF_RequestPlan *rp;
313   struct GSF_PendingRequestData *prd;
314
315   while (NULL != (rp = prd->rp_head))
316     {
317       prd = GSF_pending_request_get_data_ (rp->pr);
318       GNUNET_CONTAINER_heap_remove_node (rp->hn);
319       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
320                                    prd->rp_tail,
321                                    rp);
322       GNUNET_free (rp);
323     }
324 }
325
326
327 /**
328  * Initialize plan subsystem.
329  */
330 void
331 GSF_plan_init ()
332 {
333   plans = GNUNET_CONTAINER_multihashmap_create (256);
334 }
335
336
337 /**
338  * Shutdown plan subsystem.
339  */
340 void
341 GSF_plan_done ()
342 {
343   GNUNET_assert (0 == 
344                  GNUNET_CONTAINER_multihashmap_size (plans));
345   GNUNET_CONTAINER_multihashmap_destroy (plans);
346 }
347
348
349
350 /* end of gnunet-service-fs_pe.h */