error handling
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * Maximum number of blocks we keep in memory for migration.
36  */
37 #define MAX_MIGRATION_QUEUE 8
38
39 /**
40  * Blocks are at most migrated to this number of peers
41  * plus one, each time they are fetched from the database.
42  */
43 #define MIGRATION_LIST_SIZE 2
44
45 /**
46  * How long must content remain valid for us to consider it for migration?
47  * If content will expire too soon, there is clearly no point in pushing
48  * it to other peers.  This value gives the threshold for migration.  Note
49  * that if this value is increased, the migration testcase may need to be
50  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
51  */
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply ( \
53     GNUNET_TIME_UNIT_MINUTES, 30)
54
55
56 /**
57  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
58  */
59 struct MigrationReadyBlock
60 {
61   /**
62    * This is a doubly-linked list.
63    */
64   struct MigrationReadyBlock *next;
65
66   /**
67    * This is a doubly-linked list.
68    */
69   struct MigrationReadyBlock *prev;
70
71   /**
72    * Query for the block.
73    */
74   struct GNUNET_HashCode query;
75
76   /**
77    * When does this block expire?
78    */
79   struct GNUNET_TIME_Absolute expiration;
80
81   /**
82    * Peers we already forwarded this
83    * block to.  Zero for empty entries.
84    */
85   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
86
87   /**
88    * Size of the block.
89    */
90   size_t size;
91
92   /**
93    *  Number of targets already used.
94    */
95   unsigned int used_targets;
96
97   /**
98    * Type of the block.
99    */
100   enum GNUNET_BLOCK_Type type;
101 };
102
103
104 /**
105  * Information about a peer waiting for migratable data.
106  */
107 struct MigrationReadyPeer
108 {
109   /**
110    * This is a doubly-linked list.
111    */
112   struct MigrationReadyPeer *next;
113
114   /**
115    * This is a doubly-linked list.
116    */
117   struct MigrationReadyPeer *prev;
118
119   /**
120    * Handle to peer.
121    */
122   struct GSF_ConnectedPeer *peer;
123
124   /**
125    * Envelope of the currently pushed message.
126    */
127   struct GNUNET_MQ_Envelope *env;
128 };
129
130
131 /**
132  * Head of linked list of blocks that can be migrated.
133  */
134 static struct MigrationReadyBlock *mig_head;
135
136 /**
137  * Tail of linked list of blocks that can be migrated.
138  */
139 static struct MigrationReadyBlock *mig_tail;
140
141 /**
142  * Head of linked list of peers.
143  */
144 static struct MigrationReadyPeer *peer_head;
145
146 /**
147  * Tail of linked list of peers.
148  */
149 static struct MigrationReadyPeer *peer_tail;
150
151 /**
152  * Request to datastore for migration (or NULL).
153  */
154 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
155
156 /**
157  * ID of task that collects blocks for migration.
158  */
159 static struct GNUNET_SCHEDULER_Task *mig_task;
160
161 /**
162  * What is the maximum frequency at which we are allowed to
163  * poll the datastore for migration content?
164  */
165 static struct GNUNET_TIME_Relative min_migration_delay;
166
167 /**
168  * Size of the doubly-linked list of migration blocks.
169  */
170 static unsigned int mig_size;
171
172 /**
173  * Is this module enabled?
174  */
175 static int enabled;
176
177 /**
178  * Did we find anything in the datastore?
179  */
180 static int value_found;
181
182
183 /**
184  * Delete the given migration block.
185  *
186  * @param mb block to delete
187  */
188 static void
189 delete_migration_block (struct MigrationReadyBlock *mb)
190 {
191   GNUNET_CONTAINER_DLL_remove (mig_head,
192                                mig_tail,
193                                mb);
194   GNUNET_PEER_decrement_rcs (mb->target_list,
195                              MIGRATION_LIST_SIZE);
196   mig_size--;
197   GNUNET_free (mb);
198 }
199
200
201 /**
202  * Find content for migration to this peer.
203  *
204  * @param cls a `struct MigrationReadyPeer *`
205  */
206 static void
207 find_content (void *cls);
208
209
210 /**
211  * Send the given block to the given peer.
212  *
213  * @param peer target peer
214  * @param block the block
215  * @return #GNUNET_YES if the block was deleted (!)
216  */
217 static int
218 transmit_content (struct MigrationReadyPeer *mrp,
219                   struct MigrationReadyBlock *block)
220 {
221   struct PutMessage *msg;
222   unsigned int i;
223   struct GSF_PeerPerformanceData *ppd;
224   int ret;
225
226   ppd = GSF_get_peer_performance_data_ (mrp->peer);
227   GNUNET_assert (NULL == mrp->env);
228   mrp->env = GNUNET_MQ_msg_extra (msg,
229                                   block->size,
230                                   GNUNET_MESSAGE_TYPE_FS_PUT);
231   msg->type = htonl (block->type);
232   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
233   GNUNET_memcpy (&msg[1],
234                  &block[1],
235                  block->size);
236   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
237   {
238     if (block->target_list[i] == 0)
239     {
240       block->target_list[i] = ppd->pid;
241       GNUNET_PEER_change_rc (block->target_list[i],
242                              1);
243       break;
244     }
245   }
246   if (MIGRATION_LIST_SIZE == i)
247   {
248     delete_migration_block (block);
249     ret = GNUNET_YES;
250   }
251   else
252   {
253     ret = GNUNET_NO;
254   }
255   GNUNET_MQ_notify_sent (mrp->env,
256                          &find_content,
257                          mrp);
258   GSF_peer_transmit_ (mrp->peer,
259                       GNUNET_NO,
260                       0 /* priority */,
261                       mrp->env);
262   return ret;
263 }
264
265
266 /**
267  * Count the number of peers this block has
268  * already been forwarded to.
269  *
270  * @param block the block
271  * @return number of times block was forwarded
272  */
273 static unsigned int
274 count_targets (struct MigrationReadyBlock *block)
275 {
276   unsigned int i;
277
278   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
279     if (block->target_list[i] == 0)
280       return i;
281   return i;
282 }
283
284
285 /**
286  * Check if sending this block to this peer would
287  * be a good idea.
288  *
289  * @param mrp target peer
290  * @param block the block
291  * @return score (>= 0: feasible, negative: infeasible)
292  */
293 static long
294 score_content (struct MigrationReadyPeer *mrp,
295                struct MigrationReadyBlock *block)
296 {
297   unsigned int i;
298   struct GSF_PeerPerformanceData *ppd;
299   struct GNUNET_PeerIdentity id;
300   struct GNUNET_HashCode hc;
301   uint32_t dist;
302
303   ppd = GSF_get_peer_performance_data_ (mrp->peer);
304   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
305     if (block->target_list[i] == ppd->pid)
306       return -1;
307   GNUNET_assert (0 != ppd->pid);
308   GNUNET_PEER_resolve (ppd->pid,
309                        &id);
310   GNUNET_CRYPTO_hash (&id,
311                       sizeof(struct GNUNET_PeerIdentity),
312                       &hc);
313   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
314                                           &hc);
315   /* closer distance, higher score: */
316   return UINT32_MAX - dist;
317 }
318
319
320 /**
321  * If the migration task is not currently running, consider
322  * (re)scheduling it with the appropriate delay.
323  */
324 static void
325 consider_gathering (void);
326
327
328 /**
329  * Find content for migration to this peer.
330  *
331  * @param cls peer to find content for
332  */
333 static void
334 find_content (void *cls)
335 {
336   struct MigrationReadyPeer *mrp = cls;
337   struct MigrationReadyBlock *pos;
338   long score;
339   long best_score;
340   struct MigrationReadyBlock *best;
341
342   mrp->env = NULL;
343   best = NULL;
344   best_score = -1;
345   pos = mig_head;
346   while (NULL != pos)
347   {
348     score = score_content (mrp, pos);
349     if (score > best_score)
350     {
351       best_score = score;
352       best = pos;
353     }
354     pos = pos->next;
355   }
356   if (NULL == best)
357   {
358     if (mig_size < MAX_MIGRATION_QUEUE)
359     {
360       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
361                   "No content found for pushing, waiting for queue to fill\n");
362       return;                   /* will fill up eventually... */
363     }
364     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365                 "No suitable content found, purging content from full queue\n");
366     /* failed to find migration target AND
367      * queue is full, purge most-forwarded
368      * block from queue to make room for more */
369     pos = mig_head;
370     while (NULL != pos)
371     {
372       score = count_targets (pos);
373       if (score >= best_score)
374       {
375         best_score = score;
376         best = pos;
377       }
378       pos = pos->next;
379     }
380     GNUNET_assert (NULL != best);
381     delete_migration_block (best);
382     consider_gathering ();
383     return;
384   }
385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386               "Preparing to push best content to peer\n");
387   transmit_content (mrp,
388                     best);
389 }
390
391
392 /**
393  * Task that is run periodically to obtain blocks for content
394  * migration
395  *
396  * @param cls unused
397  */
398 static void
399 gather_migration_blocks (void *cls);
400
401
402 /**
403  * If the migration task is not currently running, consider
404  * (re)scheduling it with the appropriate delay.
405  */
406 static void
407 consider_gathering ()
408 {
409   struct GNUNET_TIME_Relative delay;
410
411   if (NULL == GSF_dsh)
412     return;
413   if (NULL != mig_qe)
414     return;
415   if (NULL != mig_task)
416     return;
417   if (mig_size >= MAX_MIGRATION_QUEUE)
418     return;
419   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
420                                          mig_size);
421   delay = GNUNET_TIME_relative_divide (delay,
422                                        MAX_MIGRATION_QUEUE);
423   delay = GNUNET_TIME_relative_max (delay,
424                                     min_migration_delay);
425   if (GNUNET_NO == value_found)
426   {
427     /* wait at least 5s if the datastore is empty */
428     delay = GNUNET_TIME_relative_max (delay,
429                                       GNUNET_TIME_relative_multiply (
430                                         GNUNET_TIME_UNIT_SECONDS,
431                                         5));
432   }
433   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
434               "Scheduling gathering task (queue size: %u)\n",
435               mig_size);
436   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
437                                            &gather_migration_blocks,
438                                            NULL);
439 }
440
441
442 /**
443  * Process content offered for migration.
444  *
445  * @param cls closure
446  * @param key key for the content
447  * @param size number of bytes in data
448  * @param data content stored
449  * @param type type of the content
450  * @param priority priority of the content
451  * @param anonymity anonymity-level for the content
452  * @param replication replication-level for the content
453  * @param expiration expiration time for the content
454  * @param uid unique identifier for the datum;
455  *        maybe 0 if no unique identifier is available
456  */
457 static void
458 process_migration_content (void *cls,
459                            const struct GNUNET_HashCode *key,
460                            size_t size,
461                            const void *data,
462                            enum GNUNET_BLOCK_Type type,
463                            uint32_t priority,
464                            uint32_t anonymity,
465                            uint32_t replication,
466                            struct GNUNET_TIME_Absolute expiration,
467                            uint64_t uid)
468 {
469   struct MigrationReadyBlock *mb;
470   struct MigrationReadyPeer *pos;
471
472   mig_qe = NULL;
473   if (NULL == key)
474   {
475     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
476                 "No content found for migration...\n");
477     consider_gathering ();
478     return;
479   }
480   value_found = GNUNET_YES;
481   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
482       MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
483   {
484     /* content will expire soon, don't bother */
485     consider_gathering ();
486     return;
487   }
488   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
489   {
490     if (GNUNET_OK !=
491         GNUNET_FS_handle_on_demand_block (key,
492                                           size,
493                                           data,
494                                           type,
495                                           priority,
496                                           anonymity,
497                                           replication,
498                                           expiration,
499                                           uid,
500                                           &process_migration_content,
501                                           NULL))
502       consider_gathering ();
503     return;
504   }
505   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
506               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
507               GNUNET_h2s (key),
508               type, mig_size + 1,
509               MAX_MIGRATION_QUEUE);
510   mb = GNUNET_malloc (sizeof(struct MigrationReadyBlock) + size);
511   mb->query = *key;
512   mb->expiration = expiration;
513   mb->size = size;
514   mb->type = type;
515   GNUNET_memcpy (&mb[1], data, size);
516   GNUNET_CONTAINER_DLL_insert_after (mig_head,
517                                      mig_tail,
518                                      mig_tail,
519                                      mb);
520   mig_size++;
521   for (pos = peer_head; NULL != pos; pos = pos->next)
522   {
523     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
524                 "Preparing to push best content to peer %s\n",
525                 GNUNET_i2s (GSF_connected_peer_get_identity2_ (pos->peer)));
526     if ((NULL == pos->env) &&
527         (GNUNET_YES == transmit_content (pos,
528                                          mb)))
529     {
530       break;                  /* 'mb' was freed! */
531     }
532   }
533   consider_gathering ();
534 }
535
536
537 /**
538  * Task that is run periodically to obtain blocks for content
539  * migration
540  *
541  * @param cls unused
542  */
543 static void
544 gather_migration_blocks (void *cls)
545 {
546   mig_task = NULL;
547   if (mig_size >= MAX_MIGRATION_QUEUE)
548     return;
549   if (NULL == GSF_dsh)
550     return;
551   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
552               "Asking datastore for content for replication (queue size: %u)\n",
553               mig_size);
554   value_found = GNUNET_NO;
555   mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
556                                                  0,
557                                                  UINT_MAX,
558                                                  &process_migration_content,
559                                                  NULL);
560   if (NULL == mig_qe)
561     consider_gathering ();
562 }
563
564
565 /**
566  * A peer connected to us.  Start pushing content
567  * to this peer.
568  *
569  * @param peer handle for the peer that connected
570  */
571 void
572 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
573 {
574   struct MigrationReadyPeer *mrp;
575
576   if (GNUNET_YES != enabled)
577     return;
578   for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
579     if (mrp->peer == peer)
580       break;
581   if (NULL != mrp)
582   {
583     /* same peer added twice, must not happen */
584     GNUNET_break (0);
585     return;
586   }
587
588   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
589               "Adding peer %s to list for pushing\n",
590               GNUNET_i2s (GSF_connected_peer_get_identity2_ (peer)));
591
592   mrp = GNUNET_new (struct MigrationReadyPeer);
593   mrp->peer = peer;
594   find_content (mrp);
595   GNUNET_CONTAINER_DLL_insert (peer_head,
596                                peer_tail,
597                                mrp);
598 }
599
600
601 /**
602  * A peer disconnected from us.  Stop pushing content
603  * to this peer.
604  *
605  * @param peer handle for the peer that disconnected
606  */
607 void
608 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
609 {
610   struct MigrationReadyPeer *pos;
611
612   for (pos = peer_head; NULL != pos; pos = pos->next)
613     if (pos->peer == peer)
614       break;
615   if (NULL == pos)
616     return;
617   if (NULL != pos->env)
618     GNUNET_MQ_send_cancel (pos->env);
619   GNUNET_CONTAINER_DLL_remove (peer_head,
620                                peer_tail,
621                                pos);
622   GNUNET_free (pos);
623 }
624
625
626 /**
627  * Setup the module.
628  */
629 void
630 GSF_push_init_ ()
631 {
632   enabled =
633     GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
634                                           "FS",
635                                           "CONTENT_PUSHING");
636   if (GNUNET_YES != enabled)
637     return;
638
639   if (GNUNET_OK !=
640       GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
641                                            "fs",
642                                            "MIN_MIGRATION_DELAY",
643                                            &min_migration_delay))
644   {
645     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
646                                "fs",
647                                "MIN_MIGRATION_DELAY",
648                                _ ("time required, content pushing disabled"));
649     return;
650   }
651   consider_gathering ();
652 }
653
654
655 /**
656  * Shutdown the module.
657  */
658 void
659 GSF_push_done_ ()
660 {
661   if (NULL != mig_task)
662   {
663     GNUNET_SCHEDULER_cancel (mig_task);
664     mig_task = NULL;
665   }
666   if (NULL != mig_qe)
667   {
668     GNUNET_DATASTORE_cancel (mig_qe);
669     mig_qe = NULL;
670   }
671   while (NULL != mig_head)
672     delete_migration_block (mig_head);
673   GNUNET_assert (0 == mig_size);
674 }
675
676
677 /* end of gnunet-service-fs_push.c */