warn later
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * To use:
27  * - consider re-issue GSF_dht_lookup_ after non-DHT reply received 
28  * - implement 'SUPPORT_DELAYS'
29  *
30  */
31 #include "platform.h"
32 #include <float.h>
33 #include "gnunet_constants.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_dht_service.h"
36 #include "gnunet_datastore_service.h"
37 #include "gnunet_load_lib.h"
38 #include "gnunet_peer_lib.h"
39 #include "gnunet_protocols.h"
40 #include "gnunet_signatures.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_transport_service.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet-service-fs_cp.h"
45 #include "gnunet-service-fs_indexing.h"
46 #include "gnunet-service-fs_lc.h"
47 #include "gnunet-service-fs_pe.h"
48 #include "gnunet-service-fs_pr.h"
49 #include "gnunet-service-fs_push.h"
50 #include "gnunet-service-fs_put.h"
51 #include "fs.h"
52
53 /**
54  * Size for the hash map for DHT requests from the FS
55  * service.  Should be about the number of concurrent
56  * DHT requests we plan to make.
57  */
58 #define FS_DHT_HT_SIZE 1024
59
60
61 /**
62  * How quickly do we age cover traffic?  At the given 
63  * time interval, remaining cover traffic counters are
64  * decremented by 1/16th.
65  */
66 #define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
67
68
69 /* ****************************** globals ****************************** */
70
71 /**
72  * Our connection to the datastore.
73  */
74 struct GNUNET_DATASTORE_Handle *GSF_dsh;
75
76 /**
77  * Our configuration.
78  */
79 const struct GNUNET_CONFIGURATION_Handle *GSF_cfg;
80
81 /**
82  * Handle for reporting statistics.
83  */
84 struct GNUNET_STATISTICS_Handle *GSF_stats;
85
86 /**
87  * Handle for DHT operations.
88  */
89 struct GNUNET_DHT_Handle *GSF_dht;
90
91 /**
92  * How long do requests typically stay in the routing table?
93  */
94 struct GNUNET_LOAD_Value *GSF_rt_entry_lifetime;
95
96 /**
97  * Typical priorities we're seeing from other peers right now.  Since
98  * most priorities will be zero, this value is the weighted average of
99  * non-zero priorities seen "recently".  In order to ensure that new
100  * values do not dramatically change the ratio, values are first
101  * "capped" to a reasonable range (+N of the current value) and then
102  * averaged into the existing value by a ratio of 1:N.  Hence
103  * receiving the largest possible priority can still only raise our
104  * "current_priorities" by at most 1.
105  */
106 double GSF_current_priorities;
107
108 /**
109  * How many query messages have we received 'recently' that 
110  * have not yet been claimed as cover traffic?
111  */
112 unsigned int GSF_cover_query_count;
113
114 /**
115  * How many content messages have we received 'recently' that 
116  * have not yet been claimed as cover traffic?
117  */
118 unsigned int GSF_cover_content_count;
119
120 /**
121  * Our block context.
122  */
123 struct GNUNET_BLOCK_Context *GSF_block_ctx;
124
125 /**
126  * Pointer to handle to the core service (points to NULL until we've
127  * connected to it).
128  */
129 struct GNUNET_CORE_Handle *GSF_core;
130
131 /**
132  * Are we introducing randomized delays for better anonymity?
133  */
134 int GSF_enable_randomized_delays;
135
136 /* ***************************** locals ******************************* */
137
138 /**
139  * Configuration for block library.
140  */
141 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
142
143 /**
144  * ID of our task that we use to age the cover counters.
145  */
146 static GNUNET_SCHEDULER_TaskIdentifier cover_age_task;
147
148 /**
149  * Datastore 'GET' load tracking.
150  */
151 static struct GNUNET_LOAD_Value *datastore_get_load;
152
153 /**
154  * Identity of this peer.
155  */
156 static struct GNUNET_PeerIdentity my_id;
157
158 /**
159  * Task that periodically ages our cover traffic statistics.
160  *
161  * @param cls unused closure
162  * @param tc task context
163  */
164 static void
165 age_cover_counters (void *cls,
166                     const struct GNUNET_SCHEDULER_TaskContext *tc)
167 {
168   GSF_cover_content_count = (GSF_cover_content_count * 15) / 16;
169   GSF_cover_query_count = (GSF_cover_query_count * 15) / 16;
170   cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
171                                                  &age_cover_counters,
172                                                  NULL);
173 }
174
175
176
177 /**
178  * We've just now completed a datastore request.  Update our
179  * datastore load calculations.
180  *
181  * @param start time when the datastore request was issued
182  */
183 void
184 GSF_update_datastore_delay_ (struct GNUNET_TIME_Absolute start)
185 {
186   struct GNUNET_TIME_Relative delay;
187
188   delay = GNUNET_TIME_absolute_get_duration (start);
189   GNUNET_LOAD_update (datastore_get_load,
190                       delay.rel_value);
191 }
192
193
194 /**
195  * Test if the DATABASE (GET) load on this peer is too high
196  * to even consider processing the query at
197  * all.  
198  * 
199  * @return GNUNET_YES if the load is too high to do anything (load high)
200  *         GNUNET_NO to process normally (load normal)
201  *         GNUNET_SYSERR to process for free (load low)
202  */
203 int
204 GSF_test_get_load_too_high_ (uint32_t priority)
205 {
206   double ld;
207
208   ld = GNUNET_LOAD_get_load (datastore_get_load);
209   if (ld < 1)
210     return GNUNET_SYSERR;    
211   if (ld <= priority)    
212     return GNUNET_NO;    
213   return GNUNET_YES;
214 }
215
216
217 /**
218  * Handle P2P "PUT" message.
219  *
220  * @param cls closure, always NULL
221  * @param other the other peer involved (sender or receiver, NULL
222  *        for loopback messages where we are both sender and receiver)
223  * @param message the actual message
224  * @param atsi performance information
225  * @return GNUNET_OK to keep the connection open,
226  *         GNUNET_SYSERR to close it (signal serious error)
227  */
228 static int
229 handle_p2p_put (void *cls,
230                 const struct GNUNET_PeerIdentity *other,
231                 const struct GNUNET_MessageHeader *message,
232                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
233 {
234   struct GSF_ConnectedPeer *cp;
235
236   cp = GSF_peer_get_ (other);
237   if (NULL == cp)
238     {
239       GNUNET_break (0);
240       return GNUNET_OK;
241     }
242   GSF_cover_content_count++;
243   return GSF_handle_p2p_content_ (cp, message);
244 }
245
246
247 /**
248  * We have a new request, consider forwarding it to the given
249  * peer.
250  *
251  * @param cls the 'struct GSF_PendingRequest'
252  * @param peer identity of the peer
253  * @param cp handle to the connected peer record
254  * @param ppd peer performance data
255  */
256 static void
257 consider_request_for_forwarding (void *cls,
258                                  const struct GNUNET_PeerIdentity *peer,
259                                  struct GSF_ConnectedPeer *cp,
260                                  const struct GSF_PeerPerformanceData *ppd)
261 {
262   struct GSF_PendingRequest *pr = cls;
263
264   GSF_plan_add_ (cp, pr);
265 }
266
267
268 /**
269  * Function to be called after we're done processing
270  * replies from the local lookup.  If the result status
271  * code indicates that there may be more replies, plan
272  * forwarding the request.
273  *
274  * @param cls closure (NULL)
275  * @param pr the pending request we were processing
276  * @param result final datastore lookup result
277  */
278 static void
279 consider_forwarding (void *cls,
280                      struct GSF_PendingRequest *pr,
281                      enum GNUNET_BLOCK_EvaluationResult result)
282 {
283   if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
284     return; /* we're done... */
285   GSF_iterate_connected_peers_ (&consider_request_for_forwarding,
286                                 pr);
287 }
288
289
290 /**
291  * Handle P2P "GET" request.
292  *
293  * @param cls closure, always NULL
294  * @param other the other peer involved (sender or receiver, NULL
295  *        for loopback messages where we are both sender and receiver)
296  * @param message the actual message
297  * @param atsi performance information
298  * @return GNUNET_OK to keep the connection open,
299  *         GNUNET_SYSERR to close it (signal serious error)
300  */
301 static int
302 handle_p2p_get (void *cls,
303                 const struct GNUNET_PeerIdentity *other,
304                 const struct GNUNET_MessageHeader *message,
305                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
306 {
307   struct GSF_PendingRequest *pr;
308
309   pr = GSF_handle_p2p_query_ (other, message);
310   if (NULL == pr)
311     return GNUNET_SYSERR;
312   GSF_local_lookup_ (pr, 
313                      &consider_forwarding,
314                      NULL);
315   return GNUNET_OK;
316 }
317
318
319 /**
320  * We're done with the local lookup, now consider
321  * P2P processing (depending on request options and
322  * result status).  Also signal that we can now 
323  * receive more request information from the client.
324  *
325  * @param cls the client doing the request ('struct GNUNET_SERVER_Client')
326  * @param pr the pending request we were processing
327  * @param result final datastore lookup result
328  */
329 static void
330 start_p2p_processing (void *cls,
331                       struct GSF_PendingRequest *pr,
332                       enum GNUNET_BLOCK_EvaluationResult result)
333 {
334   struct GNUNET_SERVER_Client *client = cls;
335   struct GSF_PendingRequestData *prd;
336
337   prd = GSF_pending_request_get_data_ (pr);
338 #if DEBUG_FS_CLIENT
339   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
340               "Finished database lookup for local request `%s' with result %d\n",
341               GNUNET_h2s (&prd->query),
342               result);
343 #endif
344   GNUNET_SERVER_receive_done (client,
345                               GNUNET_OK);
346   if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
347     return; /* we're done, 'pr' was already destroyed... */
348   if (0 != (GSF_PRO_LOCAL_ONLY & prd->options) )
349     {
350       GSF_pending_request_cancel_ (pr);
351       return;
352     }
353   GSF_dht_lookup_ (pr);
354   consider_forwarding (NULL, pr, result);
355 }
356
357
358 /**
359  * Handle START_SEARCH-message (search request from client).
360  *
361  * @param cls closure
362  * @param client identification of the client
363  * @param message the actual message
364  */
365 static void
366 handle_start_search (void *cls,
367                      struct GNUNET_SERVER_Client *client,
368                      const struct GNUNET_MessageHeader *message)
369 {
370   struct GSF_PendingRequest *pr;
371
372   pr = GSF_local_client_start_search_handler_ (client, message);
373   if (NULL == pr)
374     {
375       /* GNUNET_SERVER_receive_done was already called! */
376       return;
377     }
378   GSF_local_lookup_ (pr, 
379                      &start_p2p_processing,
380                      client);
381 }
382
383
384 /**
385  * Task run during shutdown.
386  *
387  * @param cls unused
388  * @param tc unused
389  */
390 static void
391 shutdown_task (void *cls,
392                const struct GNUNET_SCHEDULER_TaskContext *tc)
393 {
394   if (NULL != GSF_core)
395     {
396       GNUNET_CORE_disconnect (GSF_core);
397       GSF_core = NULL;
398     }
399   GSF_put_done_ ();
400   GSF_push_done_ ();
401   GSF_pending_request_done_ ();
402   GSF_plan_done ();
403   GSF_connected_peer_done_ ();
404   GNUNET_DATASTORE_disconnect (GSF_dsh, GNUNET_NO);
405   GSF_dsh = NULL;
406   GNUNET_DHT_disconnect (GSF_dht);
407   GSF_dht = NULL;
408   GNUNET_BLOCK_context_destroy (GSF_block_ctx);
409   GSF_block_ctx = NULL;
410   GNUNET_CONFIGURATION_destroy (block_cfg);
411   block_cfg = NULL;
412   GNUNET_STATISTICS_destroy (GSF_stats, GNUNET_NO);
413   GSF_stats = NULL;
414   if (GNUNET_SCHEDULER_NO_TASK != cover_age_task)
415     {
416       GNUNET_SCHEDULER_cancel (cover_age_task);
417       cover_age_task = GNUNET_SCHEDULER_NO_TASK;
418     }
419   GNUNET_FS_indexing_done ();
420   GNUNET_LOAD_value_free (datastore_get_load);
421   datastore_get_load = NULL;
422   GNUNET_LOAD_value_free (GSF_rt_entry_lifetime);
423   GSF_rt_entry_lifetime = NULL;
424 }
425
426
427 /**
428  * Function called for each pending request whenever a new
429  * peer connects, giving us a chance to decide about submitting
430  * the existing request to the new peer.
431  *
432  * @param cls the 'struct GSF_ConnectedPeer' of the new peer
433  * @param key query for the request
434  * @param pr handle to the pending request
435  * @return GNUNET_YES to continue to iterate
436  */
437 static int
438 consider_peer_for_forwarding (void *cls,
439                               const GNUNET_HashCode *key,
440                               struct GSF_PendingRequest *pr)
441 {
442   struct GSF_ConnectedPeer *cp = cls;
443   
444   GSF_plan_add_ (cp, pr);
445   return GNUNET_YES;
446 }
447
448
449 /**
450  * Method called whenever a given peer connects.
451  *
452  * @param cls closure, not used
453  * @param peer peer identity this notification is about
454  * @param atsi performance information
455  */
456 static void 
457 peer_connect_handler (void *cls,
458                       const struct GNUNET_PeerIdentity *peer,
459                       const struct GNUNET_TRANSPORT_ATS_Information *atsi)
460 {
461   struct GSF_ConnectedPeer *cp;
462
463   if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
464     return;
465    cp = GSF_peer_connect_handler_ (peer, atsi);
466   if (NULL == cp)
467     return;
468   GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
469                                  cp);
470 }
471
472
473 /**
474  * Function called after GNUNET_CORE_connect has succeeded
475  * (or failed for good).  Note that the private key of the
476  * peer is intentionally not exposed here; if you need it,
477  * your process should try to read the private key file
478  * directly (which should work if you are authorized...).
479  *
480  * @param cls closure
481  * @param server handle to the server, NULL if we failed
482  * @param my_identity ID of this peer, NULL if we failed
483  * @param publicKey public key of this peer, NULL if we failed
484  */
485 static void
486 peer_init_handler (void *cls,
487                    struct GNUNET_CORE_Handle * server,
488                    const struct GNUNET_PeerIdentity *
489                    my_identity,
490                    const struct
491                    GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
492                    publicKey)
493 {
494   my_id = *my_identity;
495 }
496
497
498 /**
499  * Process fs requests.
500  *
501  * @param server the initialized server
502  * @param c configuration to use
503  */
504 static int
505 main_init (struct GNUNET_SERVER_Handle *server,
506            const struct GNUNET_CONFIGURATION_Handle *c)
507 {
508   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
509     {
510       { &handle_p2p_get, 
511         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
512       { &handle_p2p_put, 
513         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
514       { &GSF_handle_p2p_migration_stop_, 
515         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
516         sizeof (struct MigrationStopMessage) },
517       { NULL, 0, 0 }
518     };
519   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
520     {&GNUNET_FS_handle_index_start, NULL, 
521      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
522     {&GNUNET_FS_handle_index_list_get, NULL, 
523      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
524     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
525      sizeof (struct UnindexMessage) },
526     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
527      0 },
528     {NULL, NULL, 0, 0}
529   };
530
531   GSF_core = GNUNET_CORE_connect (GSF_cfg,
532                                   2, /* larger? */
533                                   NULL,
534                                   &peer_init_handler,
535                                   &peer_connect_handler,
536                                   &GSF_peer_disconnect_handler_,
537                                   &GSF_peer_status_handler_,
538                                   NULL, GNUNET_NO,
539                                   NULL, GNUNET_NO,
540                                   p2p_handlers);
541   if (NULL == GSF_core)
542     {
543       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
544                   _("Failed to connect to `%s' service.\n"),
545                   "core");
546       return GNUNET_SYSERR;
547     }
548   GNUNET_SERVER_disconnect_notify (server, 
549                                    &GSF_client_disconnect_handler_,
550                                    NULL);
551   GNUNET_SERVER_add_handlers (server, handlers);
552   cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
553                                                  &age_cover_counters,
554                                                  NULL);
555   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
556   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
557                                 &shutdown_task,
558                                 NULL);
559   return GNUNET_OK;
560 }
561
562
563 /**
564  * Process fs requests.
565  *
566  * @param cls closure
567  * @param server the initialized server
568  * @param cfg configuration to use
569  */
570 static void
571 run (void *cls,
572      struct GNUNET_SERVER_Handle *server,
573      const struct GNUNET_CONFIGURATION_Handle *cfg)
574 {
575   GSF_cfg = cfg;
576   GSF_enable_randomized_delays = GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY");
577   GSF_dsh = GNUNET_DATASTORE_connect (cfg);
578   if (NULL == GSF_dsh)
579     {
580       GNUNET_SCHEDULER_shutdown ();
581       return;
582     }
583   GSF_rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
584   GSF_stats = GNUNET_STATISTICS_create ("fs", cfg);
585   block_cfg = GNUNET_CONFIGURATION_create ();
586   GNUNET_CONFIGURATION_set_value_string (block_cfg,
587                                          "block",
588                                          "PLUGINS",
589                                          "fs");
590   GSF_block_ctx = GNUNET_BLOCK_context_create (block_cfg);
591   GNUNET_assert (NULL != GSF_block_ctx);
592   GSF_dht = GNUNET_DHT_connect (cfg,
593                                 FS_DHT_HT_SIZE);
594   GSF_plan_init ();
595   GSF_pending_request_init_ ();
596   GSF_connected_peer_init_ ();
597   GSF_push_init_ ();
598   GSF_put_init_ ();
599   if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, GSF_dsh)) ||
600        
601        (GNUNET_OK != main_init (server, cfg)) )
602     {    
603       GNUNET_SCHEDULER_shutdown ();
604       shutdown_task (NULL, NULL);
605       return;   
606     }
607 }
608
609
610 /**
611  * The main function for the fs service.
612  *
613  * @param argc number of arguments from the command line
614  * @param argv command line arguments
615  * @return 0 ok, 1 on error
616  */
617 int
618 main (int argc, char *const *argv)
619 {
620   return (GNUNET_OK ==
621           GNUNET_SERVICE_run (argc,
622                               argv,
623                               "fs",
624                               GNUNET_SERVICE_OPTION_NONE,
625                               &run, NULL)) ? 0 : 1;
626 }
627
628 /* end of gnunet-service-fs.c */