e7608653ed739e4af00a37a9904316b6658cb473
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pe.c
23  * @brief API to manage query plan
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_cp.h"
28 #include "gnunet-service-fs_pe.h"
29 #include "gnunet-service-fs_pr.h"
30
31 /**
32  * Transmission plan for a peer.
33  */
34 struct PeerPlan
35 {
36   /**
37    * Heap with pending queries, smaller weights mean higher priority.
38    */
39   struct GNUNET_CONTAINER_Heap *heap;
40
41   /**
42    * Current transmission request handle.
43    */
44   struct GSF_PeerTransmitHandle *pth;
45
46   /**
47    * Peer for which this is the plan.
48    */
49   struct GSF_ConnectedPeer *cp;
50
51   /**
52    * Current task for executing the plan.
53    */
54   GNUNET_SCHEDULER_TaskIdentifier task;
55 };
56
57
58 /**
59  * Hash map from peer identities to PeerPlans.
60  */
61 static struct GNUNET_CONTAINER_MultiHashMap *plans;
62
63
64 /**
65  * Figure out when and how to transmit to the given peer.
66  *
67  * @param cls the 'struct GSF_ConnectedPeer' for transmission
68  * @param tc scheduler context
69  */
70 static void
71 schedule_peer_transmission (void *cls,
72                             const struct GNUNET_SCHEDULER_TaskContext *tc);
73
74
75 /**
76  * Function called to get a message for transmission.
77  *
78  * @param cls closure
79  * @param buf_size number of bytes available in buf
80  * @param buf where to copy the message, NULL on error (peer disconnect)
81  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
82  */
83 static size_t 
84 transmit_message_callback (void *cls,
85                            size_t buf_size,
86                            void *buf)
87 {
88   struct PeerPlan *pp = cls;
89   struct GSF_PendingRequest *pr;
90   size_t msize;
91
92   if (NULL == buf)
93     {
94       /* failed, try again... */
95       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
96       return 0;
97     }
98   pr = GNUNET_CONTAINER_heap_peek (pp->heap);
99   msize = GSF_pending_request_get_message_ (pr, buf_size, buf);
100   if (msize > buf_size)
101     {
102       /* buffer to small (message changed), try again */
103       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
104       return 0;
105     }
106   /* remove from root, add again elsewhere... */
107   GNUNET_assert (pr == GNUNET_CONTAINER_heap_remove_root (pp->heap));
108   GSF_plan_add_ (pp->cp, pr);
109   return msize;
110 }
111
112
113 /**
114  * Figure out when and how to transmit to the given peer.
115  *
116  * @param cls the 'struct PeerPlan'
117  * @param tc scheduler context
118  */
119 static void
120 schedule_peer_transmission (void *cls,
121                             const struct GNUNET_SCHEDULER_TaskContext *tc)
122 {
123   struct PeerPlan *pp = cls;
124   struct GSF_PendingRequest *pr;
125   size_t msize;
126   struct GNUNET_TIME_Relative delay;
127
128   pp->task = GNUNET_SCHEDULER_NO_TASK;
129   if (NULL == pp->heap)
130     return;
131   if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap))
132     return;
133   GNUNET_assert (NULL == pp->pth);
134   pr = GNUNET_CONTAINER_heap_peek (pp->heap);
135   if (0) // FIXME: if (re)transmission should wait, wait...
136     {
137       delay = GNUNET_TIME_UNIT_SECONDS;
138       // FIXME
139       pp->task = GNUNET_SCHEDULER_add_delayed (delay,
140                                                &schedule_peer_transmission,
141                                                pp);
142       return;
143     }
144   msize = GSF_pending_request_get_message_ (pr, 0, NULL);                                          
145   pp->pth = GSF_peer_transmit_ (pp->cp,
146                                 GNUNET_YES,
147                                 0 /* FIXME: pr->priority? */,
148                                 GNUNET_TIME_UNIT_FOREVER_REL,
149                                 msize,
150                                 &transmit_message_callback,
151                                 pp);
152   GNUNET_assert (NULL != pp->pth);
153 }
154
155
156 /**
157  * Create a new query plan entry.
158  *
159  * @param cp peer with the entry
160  * @param pr request with the entry
161  */
162 void
163 GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
164                struct GSF_PendingRequest *pr)
165 {
166   struct GNUNET_PeerIdentity id;
167   struct PeerPlan *pp;
168   GNUNET_CONTAINER_HeapCostType weight;
169   
170   GSF_connected_peer_get_identity_ (cp, &id);
171   pp = GNUNET_CONTAINER_multihashmap_get (plans,
172                                           &id.hashPubKey);
173   if (NULL == pp)
174     {
175       pp = GNUNET_malloc (sizeof (struct PeerPlan));
176       pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
177       GNUNET_CONTAINER_multihashmap_put (plans,
178                                          &id.hashPubKey,
179                                          pp,
180                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
181     }
182   weight = 0; // FIXME: calculate real weight!
183   GNUNET_CONTAINER_heap_insert (pp->heap,
184                                 pr,
185                                 weight);
186   if (pp->pth != NULL)
187     {
188       if (pr != GNUNET_CONTAINER_heap_peek (pp->heap))
189         return;
190       GSF_peer_transmit_cancel_ (pp->pth);
191       pp->pth = NULL;
192     }
193   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
194     GNUNET_SCHEDULER_cancel (pp->task);
195   pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
196                                        pp);
197 }
198
199
200 /**
201  * Notify the plan about a peer being no longer available;
202  * destroy all entries associated with this peer.
203  *
204  * @param cp connected peer 
205  */
206 void
207 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
208 {
209   struct GNUNET_PeerIdentity id;
210   struct PeerPlan *pp;
211
212   GSF_connected_peer_get_identity_ (cp, &id);
213   pp = GNUNET_CONTAINER_multihashmap_get (plans,
214                                           &id.hashPubKey);
215   GNUNET_CONTAINER_multihashmap_remove (plans,
216                                         &id.hashPubKey,
217                                         pp);
218   if (NULL != pp->pth)
219     GSF_peer_transmit_cancel_ (pp->pth);
220   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
221     GNUNET_SCHEDULER_cancel (pp->task);
222   GNUNET_CONTAINER_heap_destroy (pp->heap);
223   GNUNET_free (pp);
224 }
225
226
227 /**
228  * Closure for 'find_request'.
229  */
230 struct FindRequestClosure
231 {
232   /**
233    * Place to store the node that was found (NULL for none).
234    */
235   struct GNUNET_CONTAINER_HeapNode *node;
236
237   /**
238    * Value we're looking for
239    */
240   const struct GSF_PendingRequest *pr;
241 };
242
243
244 /**
245  * Find a heap node where the value matches the
246  * pending request given in the closure.
247  *
248  * @param cls the 'struct FindRequestClosure'
249  * @param node heap structure we're looking for on a match
250  * @param element the pending request stored in the heap
251  * @param cost weight of the request
252  * @return GNUNET_YES to continue looking
253  */
254 static int
255 find_request (void *cls,
256               struct GNUNET_CONTAINER_HeapNode *node,
257               void *element,
258               GNUNET_CONTAINER_HeapCostType cost)
259 {
260   struct FindRequestClosure *frc = cls;
261   struct GSF_PendingRequest *pr = element;
262
263   if (pr == frc->pr)
264     {
265       frc->node = node;
266       return GNUNET_NO;
267     }
268   return GNUNET_YES;
269 }
270
271
272 /**
273  * Remove the given request from all heaps. * FIXME: O(n) -- inefficient!
274  *
275  * @param cls 'struct GSF_PendingRequest' to purge
276  * @param key identity of the peer we're currently looking at (unused)
277  * @param value PeerPlan for the given peer to search for the 'cls'
278  * @return GNUNET_OK (continue iteration)
279  */
280 static int
281 remove_request (void *cls,
282                 const GNUNET_HashCode *key,
283                 void *value)
284 {
285   const struct GSF_PendingRequest *pr = cls;
286   struct PeerPlan *pp = value;
287   struct GNUNET_CONTAINER_Heap *h = pp->heap;
288   struct FindRequestClosure frc;
289
290   frc.pr = pr;
291   do
292     {
293       frc.node = NULL;
294       GNUNET_CONTAINER_heap_iterate (h, &find_request, &frc);
295       if (frc.node != NULL)
296         GNUNET_CONTAINER_heap_remove_node (h, frc.node);
297     }
298   while (NULL != frc.node);
299   return GNUNET_OK;
300 }
301
302
303 /**
304  * Notify the plan about a request being done; destroy all entries
305  * associated with this request.  Note that this implementation is
306  * currently terribly inefficient (O(n)) and could instead be done in
307  * O(1).  But for now, I first want to see it work correctly...
308  *
309  * @param pr request that is done
310  */
311 void
312 GSF_plan_notify_request_done_ (const struct GSF_PendingRequest *pr)
313 {
314   GNUNET_CONTAINER_multihashmap_iterate (plans,
315                                          &remove_request,
316                                          (void*) pr);
317 }
318
319
320 /**
321  * Initialize plan subsystem.
322  */
323 void
324 GSF_plan_init ()
325 {
326   plans = GNUNET_CONTAINER_multihashmap_create (256);
327 }
328
329
330 /**
331  * Shutdown plan subsystem.
332  */
333 void
334 GSF_plan_done ()
335 {
336   GNUNET_assert (0 == 
337                  GNUNET_CONTAINER_multihashmap_size (plans));
338   GNUNET_CONTAINER_multihashmap_destroy (plans);
339 }
340
341
342
343 /* end of gnunet-service-fs_pe.h */