src: for every AGPL3.0 file, add SPDX identifier.
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * Maximum number of blocks we keep in memory for migration.
36  */
37 #define MAX_MIGRATION_QUEUE 8
38
39 /**
40  * Blocks are at most migrated to this number of peers
41  * plus one, each time they are fetched from the database.
42  */
43 #define MIGRATION_LIST_SIZE 2
44
45 /**
46  * How long must content remain valid for us to consider it for migration?
47  * If content will expire too soon, there is clearly no point in pushing
48  * it to other peers.  This value gives the threshold for migration.  Note
49  * that if this value is increased, the migration testcase may need to be
50  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
51  */
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
53
54
55 /**
56  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
57  */
58 struct MigrationReadyBlock
59 {
60
61   /**
62    * This is a doubly-linked list.
63    */
64   struct MigrationReadyBlock *next;
65
66   /**
67    * This is a doubly-linked list.
68    */
69   struct MigrationReadyBlock *prev;
70
71   /**
72    * Query for the block.
73    */
74   struct GNUNET_HashCode query;
75
76   /**
77    * When does this block expire?
78    */
79   struct GNUNET_TIME_Absolute expiration;
80
81   /**
82    * Peers we already forwarded this
83    * block to.  Zero for empty entries.
84    */
85   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
86
87   /**
88    * Size of the block.
89    */
90   size_t size;
91
92   /**
93    *  Number of targets already used.
94    */
95   unsigned int used_targets;
96
97   /**
98    * Type of the block.
99    */
100   enum GNUNET_BLOCK_Type type;
101 };
102
103
104 /**
105  * Information about a peer waiting for migratable data.
106  */
107 struct MigrationReadyPeer
108 {
109   /**
110    * This is a doubly-linked list.
111    */
112   struct MigrationReadyPeer *next;
113
114   /**
115    * This is a doubly-linked list.
116    */
117   struct MigrationReadyPeer *prev;
118
119   /**
120    * Handle to peer.
121    */
122   struct GSF_ConnectedPeer *peer;
123
124   /**
125    * Envelope of the currently pushed message.
126    */
127   struct GNUNET_MQ_Envelope *env;
128 };
129
130
131 /**
132  * Head of linked list of blocks that can be migrated.
133  */
134 static struct MigrationReadyBlock *mig_head;
135
136 /**
137  * Tail of linked list of blocks that can be migrated.
138  */
139 static struct MigrationReadyBlock *mig_tail;
140
141 /**
142  * Head of linked list of peers.
143  */
144 static struct MigrationReadyPeer *peer_head;
145
146 /**
147  * Tail of linked list of peers.
148  */
149 static struct MigrationReadyPeer *peer_tail;
150
151 /**
152  * Request to datastore for migration (or NULL).
153  */
154 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
155
156 /**
157  * ID of task that collects blocks for migration.
158  */
159 static struct GNUNET_SCHEDULER_Task *mig_task;
160
161 /**
162  * What is the maximum frequency at which we are allowed to
163  * poll the datastore for migration content?
164  */
165 static struct GNUNET_TIME_Relative min_migration_delay;
166
167 /**
168  * Size of the doubly-linked list of migration blocks.
169  */
170 static unsigned int mig_size;
171
172 /**
173  * Is this module enabled?
174  */
175 static int enabled;
176
177 /**
178  * Did we find anything in the datastore?
179  */
180 static int value_found;
181
182
183 /**
184  * Delete the given migration block.
185  *
186  * @param mb block to delete
187  */
188 static void
189 delete_migration_block (struct MigrationReadyBlock *mb)
190 {
191   GNUNET_CONTAINER_DLL_remove (mig_head,
192                                mig_tail,
193                                mb);
194   GNUNET_PEER_decrement_rcs (mb->target_list,
195                              MIGRATION_LIST_SIZE);
196   mig_size--;
197   GNUNET_free (mb);
198 }
199
200
201 /**
202  * Find content for migration to this peer.
203  *
204  * @param cls a `struct MigrationReadyPeer *`
205  */
206 static void
207 find_content (void *cls);
208
209
210 /**
211  * Send the given block to the given peer.
212  *
213  * @param peer target peer
214  * @param block the block
215  * @return #GNUNET_YES if the block was deleted (!)
216  */
217 static int
218 transmit_content (struct MigrationReadyPeer *mrp,
219                   struct MigrationReadyBlock *block)
220 {
221   struct PutMessage *msg;
222   unsigned int i;
223   struct GSF_PeerPerformanceData *ppd;
224   int ret;
225
226   ppd = GSF_get_peer_performance_data_ (mrp->peer);
227   GNUNET_assert (NULL == mrp->env);
228   mrp->env = GNUNET_MQ_msg_extra (msg,
229                                   block->size,
230                                   GNUNET_MESSAGE_TYPE_FS_PUT);
231   msg->type = htonl (block->type);
232   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
233   GNUNET_memcpy (&msg[1],
234                  &block[1],
235                  block->size);
236   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
237   {
238     if (block->target_list[i] == 0)
239     {
240       block->target_list[i] = ppd->pid;
241       GNUNET_PEER_change_rc (block->target_list[i],
242                              1);
243       break;
244     }
245   }
246   if (MIGRATION_LIST_SIZE == i)
247   {
248     delete_migration_block (block);
249     ret = GNUNET_YES;
250   }
251   else
252   {
253     ret = GNUNET_NO;
254   }
255   GNUNET_MQ_notify_sent (mrp->env,
256                          &find_content,
257                          mrp);
258   GSF_peer_transmit_ (mrp->peer,
259                       GNUNET_NO,
260                       0 /* priority */ ,
261                       mrp->env);
262   return ret;
263 }
264
265
266 /**
267  * Count the number of peers this block has
268  * already been forwarded to.
269  *
270  * @param block the block
271  * @return number of times block was forwarded
272  */
273 static unsigned int
274 count_targets (struct MigrationReadyBlock *block)
275 {
276   unsigned int i;
277
278   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
279     if (block->target_list[i] == 0)
280       return i;
281   return i;
282 }
283
284
285 /**
286  * Check if sending this block to this peer would
287  * be a good idea.
288  *
289  * @param mrp target peer
290  * @param block the block
291  * @return score (>= 0: feasible, negative: infeasible)
292  */
293 static long
294 score_content (struct MigrationReadyPeer *mrp,
295                struct MigrationReadyBlock *block)
296 {
297   unsigned int i;
298   struct GSF_PeerPerformanceData *ppd;
299   struct GNUNET_PeerIdentity id;
300   struct GNUNET_HashCode hc;
301   uint32_t dist;
302
303   ppd = GSF_get_peer_performance_data_ (mrp->peer);
304   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
305     if (block->target_list[i] == ppd->pid)
306       return -1;
307   GNUNET_assert (0 != ppd->pid);
308   GNUNET_PEER_resolve (ppd->pid,
309                        &id);
310   GNUNET_CRYPTO_hash (&id,
311                       sizeof (struct GNUNET_PeerIdentity),
312                       &hc);
313   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
314                                           &hc);
315   /* closer distance, higher score: */
316   return UINT32_MAX - dist;
317 }
318
319
320 /**
321  * If the migration task is not currently running, consider
322  * (re)scheduling it with the appropriate delay.
323  */
324 static void
325 consider_gathering (void);
326
327
328 /**
329  * Find content for migration to this peer.
330  *
331  * @param cls peer to find content for
332  */
333 static void
334 find_content (void *cls)
335 {
336   struct MigrationReadyPeer *mrp = cls;
337   struct MigrationReadyBlock *pos;
338   long score;
339   long best_score;
340   struct MigrationReadyBlock *best;
341
342   mrp->env = NULL;
343   best = NULL;
344   best_score = -1;
345   pos = mig_head;
346   while (NULL != pos)
347   {
348     score = score_content (mrp, pos);
349     if (score > best_score)
350     {
351       best_score = score;
352       best = pos;
353     }
354     pos = pos->next;
355   }
356   if (NULL == best)
357   {
358     if (mig_size < MAX_MIGRATION_QUEUE)
359     {
360       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
361                   "No content found for pushing, waiting for queue to fill\n");
362       return;                   /* will fill up eventually... */
363     }
364     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365                 "No suitable content found, purging content from full queue\n");
366     /* failed to find migration target AND
367      * queue is full, purge most-forwarded
368      * block from queue to make room for more */
369     pos = mig_head;
370     while (NULL != pos)
371     {
372       score = count_targets (pos);
373       if (score >= best_score)
374       {
375         best_score = score;
376         best = pos;
377       }
378       pos = pos->next;
379     }
380     GNUNET_assert (NULL != best);
381     delete_migration_block (best);
382     consider_gathering ();
383     return;
384   }
385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386               "Preparing to push best content to peer\n");
387   transmit_content (mrp,
388                     best);
389 }
390
391
392 /**
393  * Task that is run periodically to obtain blocks for content
394  * migration
395  *
396  * @param cls unused
397  */
398 static void
399 gather_migration_blocks (void *cls);
400
401
402 /**
403  * If the migration task is not currently running, consider
404  * (re)scheduling it with the appropriate delay.
405  */
406 static void
407 consider_gathering ()
408 {
409   struct GNUNET_TIME_Relative delay;
410
411   if (NULL == GSF_dsh)
412     return;
413   if (NULL != mig_qe)
414     return;
415   if (NULL != mig_task)
416     return;
417   if (mig_size >= MAX_MIGRATION_QUEUE)
418     return;
419   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
420                                          mig_size);
421   delay = GNUNET_TIME_relative_divide (delay,
422                                        MAX_MIGRATION_QUEUE);
423   delay = GNUNET_TIME_relative_max (delay,
424                                     min_migration_delay);
425   if (GNUNET_NO == value_found)
426   {
427     /* wait at least 5s if the datastore is empty */
428     delay = GNUNET_TIME_relative_max (delay,
429                                       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
430                                                                      5));
431   }
432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433               "Scheduling gathering task (queue size: %u)\n",
434               mig_size);
435   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
436                                            &gather_migration_blocks,
437                                            NULL);
438 }
439
440
441 /**
442  * Process content offered for migration.
443  *
444  * @param cls closure
445  * @param key key for the content
446  * @param size number of bytes in data
447  * @param data content stored
448  * @param type type of the content
449  * @param priority priority of the content
450  * @param anonymity anonymity-level for the content
451  * @param replication replication-level for the content
452  * @param expiration expiration time for the content
453  * @param uid unique identifier for the datum;
454  *        maybe 0 if no unique identifier is available
455  */
456 static void
457 process_migration_content (void *cls,
458                            const struct GNUNET_HashCode *key,
459                            size_t size,
460                            const void *data,
461                            enum GNUNET_BLOCK_Type type,
462                            uint32_t priority,
463                            uint32_t anonymity,
464                            uint32_t replication,
465                            struct GNUNET_TIME_Absolute expiration,
466                            uint64_t uid)
467 {
468   struct MigrationReadyBlock *mb;
469   struct MigrationReadyPeer *pos;
470
471   mig_qe = NULL;
472   if (NULL == key)
473   {
474     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
475                 "No content found for migration...\n");
476     consider_gathering ();
477     return;
478   }
479   value_found = GNUNET_YES;
480   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
481       MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
482   {
483     /* content will expire soon, don't bother */
484     consider_gathering ();
485     return;
486   }
487   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
488   {
489     if (GNUNET_OK !=
490         GNUNET_FS_handle_on_demand_block (key,
491                                           size,
492                                           data,
493                                           type,
494                                           priority,
495                                           anonymity,
496                                           replication,
497                                           expiration,
498                                           uid,
499                                           &process_migration_content,
500                                           NULL))
501       consider_gathering ();
502     return;
503   }
504   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
505               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
506               GNUNET_h2s (key),
507               type, mig_size + 1,
508               MAX_MIGRATION_QUEUE);
509   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
510   mb->query = *key;
511   mb->expiration = expiration;
512   mb->size = size;
513   mb->type = type;
514   GNUNET_memcpy (&mb[1], data, size);
515   GNUNET_CONTAINER_DLL_insert_after (mig_head,
516                                      mig_tail,
517                                      mig_tail,
518                                      mb);
519   mig_size++;
520   for (pos = peer_head; NULL != pos; pos = pos->next)
521   {
522     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
523                 "Preparing to push best content to peer %s\n",
524                 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
525     if ( (NULL == pos->env) &&
526          (GNUNET_YES == transmit_content (pos,
527                                           mb)) ) {
528       break;                  /* 'mb' was freed! */
529     }
530   }
531   consider_gathering ();
532 }
533
534
535 /**
536  * Task that is run periodically to obtain blocks for content
537  * migration
538  *
539  * @param cls unused
540  */
541 static void
542 gather_migration_blocks (void *cls)
543 {
544   mig_task = NULL;
545   if (mig_size >= MAX_MIGRATION_QUEUE)
546     return;
547   if (NULL == GSF_dsh)
548     return;
549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550               "Asking datastore for content for replication (queue size: %u)\n",
551               mig_size);
552   value_found = GNUNET_NO;
553   mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
554                                                  0,
555                                                  UINT_MAX,
556                                                  &process_migration_content,
557                                                  NULL);
558   if (NULL == mig_qe)
559     consider_gathering ();
560 }
561
562
563 /**
564  * A peer connected to us.  Start pushing content
565  * to this peer.
566  *
567  * @param peer handle for the peer that connected
568  */
569 void
570 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
571 {
572   struct MigrationReadyPeer *mrp;
573
574   if (GNUNET_YES != enabled)
575     return;
576   for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
577     if (mrp->peer == peer)
578       break;
579   if (NULL != mrp)
580   {
581     /* same peer added twice, must not happen */
582     GNUNET_break (0);
583     return;
584   }
585
586   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
587               "Adding peer %s to list for pushing\n",
588               GNUNET_i2s (GSF_connected_peer_get_identity2_(peer)));
589
590   mrp = GNUNET_new (struct MigrationReadyPeer);
591   mrp->peer = peer;
592   find_content (mrp);
593   GNUNET_CONTAINER_DLL_insert (peer_head,
594                                peer_tail,
595                                mrp);
596 }
597
598
599 /**
600  * A peer disconnected from us.  Stop pushing content
601  * to this peer.
602  *
603  * @param peer handle for the peer that disconnected
604  */
605 void
606 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
607 {
608   struct MigrationReadyPeer *pos;
609
610   for (pos = peer_head; NULL != pos; pos = pos->next)
611     if (pos->peer == peer)
612       break;
613   if (NULL == pos)
614     return;
615   if (NULL != pos->env)
616     GNUNET_MQ_send_cancel (pos->env);
617   GNUNET_CONTAINER_DLL_remove (peer_head,
618                                peer_tail,
619                                pos);
620   GNUNET_free (pos);
621 }
622
623
624 /**
625  * Setup the module.
626  */
627 void
628 GSF_push_init_ ()
629 {
630   enabled =
631     GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
632                                           "FS",
633                                           "CONTENT_PUSHING");
634   if (GNUNET_YES != enabled)
635     return;
636
637   if (GNUNET_OK !=
638       GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
639                                            "fs",
640                                            "MIN_MIGRATION_DELAY",
641                                            &min_migration_delay))
642   {
643     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
644                                "fs",
645                                "MIN_MIGRATION_DELAY",
646                                _("time required, content pushing disabled"));
647     return;
648   }
649   consider_gathering ();
650 }
651
652
653 /**
654  * Shutdown the module.
655  */
656 void
657 GSF_push_done_ ()
658 {
659   if (NULL != mig_task)
660   {
661     GNUNET_SCHEDULER_cancel (mig_task);
662     mig_task = NULL;
663   }
664   if (NULL != mig_qe)
665   {
666     GNUNET_DATASTORE_cancel (mig_qe);
667     mig_qe = NULL;
668   }
669   while (NULL != mig_head)
670     delete_migration_block (mig_head);
671   GNUNET_assert (0 == mig_size);
672 }
673
674 /* end of gnunet-service-fs_push.c */