uncrustify as demanded.
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * Maximum number of blocks we keep in memory for migration.
36  */
37 #define MAX_MIGRATION_QUEUE 8
38
39 /**
40  * Blocks are at most migrated to this number of peers
41  * plus one, each time they are fetched from the database.
42  */
43 #define MIGRATION_LIST_SIZE 2
44
45 /**
46  * How long must content remain valid for us to consider it for migration?
47  * If content will expire too soon, there is clearly no point in pushing
48  * it to other peers.  This value gives the threshold for migration.  Note
49  * that if this value is increased, the migration testcase may need to be
50  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
51  */
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 30)
53
54
55 /**
56  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
57  */
58 struct MigrationReadyBlock {
59   /**
60    * This is a doubly-linked list.
61    */
62   struct MigrationReadyBlock *next;
63
64   /**
65    * This is a doubly-linked list.
66    */
67   struct MigrationReadyBlock *prev;
68
69   /**
70    * Query for the block.
71    */
72   struct GNUNET_HashCode query;
73
74   /**
75    * When does this block expire?
76    */
77   struct GNUNET_TIME_Absolute expiration;
78
79   /**
80    * Peers we already forwarded this
81    * block to.  Zero for empty entries.
82    */
83   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
84
85   /**
86    * Size of the block.
87    */
88   size_t size;
89
90   /**
91    *  Number of targets already used.
92    */
93   unsigned int used_targets;
94
95   /**
96    * Type of the block.
97    */
98   enum GNUNET_BLOCK_Type type;
99 };
100
101
102 /**
103  * Information about a peer waiting for migratable data.
104  */
105 struct MigrationReadyPeer {
106   /**
107    * This is a doubly-linked list.
108    */
109   struct MigrationReadyPeer *next;
110
111   /**
112    * This is a doubly-linked list.
113    */
114   struct MigrationReadyPeer *prev;
115
116   /**
117    * Handle to peer.
118    */
119   struct GSF_ConnectedPeer *peer;
120
121   /**
122    * Envelope of the currently pushed message.
123    */
124   struct GNUNET_MQ_Envelope *env;
125 };
126
127
128 /**
129  * Head of linked list of blocks that can be migrated.
130  */
131 static struct MigrationReadyBlock *mig_head;
132
133 /**
134  * Tail of linked list of blocks that can be migrated.
135  */
136 static struct MigrationReadyBlock *mig_tail;
137
138 /**
139  * Head of linked list of peers.
140  */
141 static struct MigrationReadyPeer *peer_head;
142
143 /**
144  * Tail of linked list of peers.
145  */
146 static struct MigrationReadyPeer *peer_tail;
147
148 /**
149  * Request to datastore for migration (or NULL).
150  */
151 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
152
153 /**
154  * ID of task that collects blocks for migration.
155  */
156 static struct GNUNET_SCHEDULER_Task *mig_task;
157
158 /**
159  * What is the maximum frequency at which we are allowed to
160  * poll the datastore for migration content?
161  */
162 static struct GNUNET_TIME_Relative min_migration_delay;
163
164 /**
165  * Size of the doubly-linked list of migration blocks.
166  */
167 static unsigned int mig_size;
168
169 /**
170  * Is this module enabled?
171  */
172 static int enabled;
173
174 /**
175  * Did we find anything in the datastore?
176  */
177 static int value_found;
178
179
180 /**
181  * Delete the given migration block.
182  *
183  * @param mb block to delete
184  */
185 static void
186 delete_migration_block(struct MigrationReadyBlock *mb)
187 {
188   GNUNET_CONTAINER_DLL_remove(mig_head,
189                               mig_tail,
190                               mb);
191   GNUNET_PEER_decrement_rcs(mb->target_list,
192                             MIGRATION_LIST_SIZE);
193   mig_size--;
194   GNUNET_free(mb);
195 }
196
197
198 /**
199  * Find content for migration to this peer.
200  *
201  * @param cls a `struct MigrationReadyPeer *`
202  */
203 static void
204 find_content(void *cls);
205
206
207 /**
208  * Send the given block to the given peer.
209  *
210  * @param peer target peer
211  * @param block the block
212  * @return #GNUNET_YES if the block was deleted (!)
213  */
214 static int
215 transmit_content(struct MigrationReadyPeer *mrp,
216                  struct MigrationReadyBlock *block)
217 {
218   struct PutMessage *msg;
219   unsigned int i;
220   struct GSF_PeerPerformanceData *ppd;
221   int ret;
222
223   ppd = GSF_get_peer_performance_data_(mrp->peer);
224   GNUNET_assert(NULL == mrp->env);
225   mrp->env = GNUNET_MQ_msg_extra(msg,
226                                  block->size,
227                                  GNUNET_MESSAGE_TYPE_FS_PUT);
228   msg->type = htonl(block->type);
229   msg->expiration = GNUNET_TIME_absolute_hton(block->expiration);
230   GNUNET_memcpy(&msg[1],
231                 &block[1],
232                 block->size);
233   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
234     {
235       if (block->target_list[i] == 0)
236         {
237           block->target_list[i] = ppd->pid;
238           GNUNET_PEER_change_rc(block->target_list[i],
239                                 1);
240           break;
241         }
242     }
243   if (MIGRATION_LIST_SIZE == i)
244     {
245       delete_migration_block(block);
246       ret = GNUNET_YES;
247     }
248   else
249     {
250       ret = GNUNET_NO;
251     }
252   GNUNET_MQ_notify_sent(mrp->env,
253                         &find_content,
254                         mrp);
255   GSF_peer_transmit_(mrp->peer,
256                      GNUNET_NO,
257                      0 /* priority */,
258                      mrp->env);
259   return ret;
260 }
261
262
263 /**
264  * Count the number of peers this block has
265  * already been forwarded to.
266  *
267  * @param block the block
268  * @return number of times block was forwarded
269  */
270 static unsigned int
271 count_targets(struct MigrationReadyBlock *block)
272 {
273   unsigned int i;
274
275   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
276     if (block->target_list[i] == 0)
277       return i;
278   return i;
279 }
280
281
282 /**
283  * Check if sending this block to this peer would
284  * be a good idea.
285  *
286  * @param mrp target peer
287  * @param block the block
288  * @return score (>= 0: feasible, negative: infeasible)
289  */
290 static long
291 score_content(struct MigrationReadyPeer *mrp,
292               struct MigrationReadyBlock *block)
293 {
294   unsigned int i;
295   struct GSF_PeerPerformanceData *ppd;
296   struct GNUNET_PeerIdentity id;
297   struct GNUNET_HashCode hc;
298   uint32_t dist;
299
300   ppd = GSF_get_peer_performance_data_(mrp->peer);
301   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
302     if (block->target_list[i] == ppd->pid)
303       return -1;
304   GNUNET_assert(0 != ppd->pid);
305   GNUNET_PEER_resolve(ppd->pid,
306                       &id);
307   GNUNET_CRYPTO_hash(&id,
308                      sizeof(struct GNUNET_PeerIdentity),
309                      &hc);
310   dist = GNUNET_CRYPTO_hash_distance_u32(&block->query,
311                                          &hc);
312   /* closer distance, higher score: */
313   return UINT32_MAX - dist;
314 }
315
316
317 /**
318  * If the migration task is not currently running, consider
319  * (re)scheduling it with the appropriate delay.
320  */
321 static void
322 consider_gathering(void);
323
324
325 /**
326  * Find content for migration to this peer.
327  *
328  * @param cls peer to find content for
329  */
330 static void
331 find_content(void *cls)
332 {
333   struct MigrationReadyPeer *mrp = cls;
334   struct MigrationReadyBlock *pos;
335   long score;
336   long best_score;
337   struct MigrationReadyBlock *best;
338
339   mrp->env = NULL;
340   best = NULL;
341   best_score = -1;
342   pos = mig_head;
343   while (NULL != pos)
344     {
345       score = score_content(mrp, pos);
346       if (score > best_score)
347         {
348           best_score = score;
349           best = pos;
350         }
351       pos = pos->next;
352     }
353   if (NULL == best)
354     {
355       if (mig_size < MAX_MIGRATION_QUEUE)
356         {
357           GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
358                      "No content found for pushing, waiting for queue to fill\n");
359           return;               /* will fill up eventually... */
360         }
361       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
362                  "No suitable content found, purging content from full queue\n");
363       /* failed to find migration target AND
364        * queue is full, purge most-forwarded
365        * block from queue to make room for more */
366       pos = mig_head;
367       while (NULL != pos)
368         {
369           score = count_targets(pos);
370           if (score >= best_score)
371             {
372               best_score = score;
373               best = pos;
374             }
375           pos = pos->next;
376         }
377       GNUNET_assert(NULL != best);
378       delete_migration_block(best);
379       consider_gathering();
380       return;
381     }
382   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
383              "Preparing to push best content to peer\n");
384   transmit_content(mrp,
385                    best);
386 }
387
388
389 /**
390  * Task that is run periodically to obtain blocks for content
391  * migration
392  *
393  * @param cls unused
394  */
395 static void
396 gather_migration_blocks(void *cls);
397
398
399 /**
400  * If the migration task is not currently running, consider
401  * (re)scheduling it with the appropriate delay.
402  */
403 static void
404 consider_gathering()
405 {
406   struct GNUNET_TIME_Relative delay;
407
408   if (NULL == GSF_dsh)
409     return;
410   if (NULL != mig_qe)
411     return;
412   if (NULL != mig_task)
413     return;
414   if (mig_size >= MAX_MIGRATION_QUEUE)
415     return;
416   delay = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
417                                         mig_size);
418   delay = GNUNET_TIME_relative_divide(delay,
419                                       MAX_MIGRATION_QUEUE);
420   delay = GNUNET_TIME_relative_max(delay,
421                                    min_migration_delay);
422   if (GNUNET_NO == value_found)
423     {
424       /* wait at least 5s if the datastore is empty */
425       delay = GNUNET_TIME_relative_max(delay,
426                                        GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
427                                                                      5));
428     }
429   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
430              "Scheduling gathering task (queue size: %u)\n",
431              mig_size);
432   mig_task = GNUNET_SCHEDULER_add_delayed(delay,
433                                           &gather_migration_blocks,
434                                           NULL);
435 }
436
437
438 /**
439  * Process content offered for migration.
440  *
441  * @param cls closure
442  * @param key key for the content
443  * @param size number of bytes in data
444  * @param data content stored
445  * @param type type of the content
446  * @param priority priority of the content
447  * @param anonymity anonymity-level for the content
448  * @param replication replication-level for the content
449  * @param expiration expiration time for the content
450  * @param uid unique identifier for the datum;
451  *        maybe 0 if no unique identifier is available
452  */
453 static void
454 process_migration_content(void *cls,
455                           const struct GNUNET_HashCode *key,
456                           size_t size,
457                           const void *data,
458                           enum GNUNET_BLOCK_Type type,
459                           uint32_t priority,
460                           uint32_t anonymity,
461                           uint32_t replication,
462                           struct GNUNET_TIME_Absolute expiration,
463                           uint64_t uid)
464 {
465   struct MigrationReadyBlock *mb;
466   struct MigrationReadyPeer *pos;
467
468   mig_qe = NULL;
469   if (NULL == key)
470     {
471       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
472                  "No content found for migration...\n");
473       consider_gathering();
474       return;
475     }
476   value_found = GNUNET_YES;
477   if (GNUNET_TIME_absolute_get_remaining(expiration).rel_value_us <
478       MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
479     {
480       /* content will expire soon, don't bother */
481       consider_gathering();
482       return;
483     }
484   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
485     {
486       if (GNUNET_OK !=
487           GNUNET_FS_handle_on_demand_block(key,
488                                            size,
489                                            data,
490                                            type,
491                                            priority,
492                                            anonymity,
493                                            replication,
494                                            expiration,
495                                            uid,
496                                            &process_migration_content,
497                                            NULL))
498         consider_gathering();
499       return;
500     }
501   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
502              "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
503              GNUNET_h2s(key),
504              type, mig_size + 1,
505              MAX_MIGRATION_QUEUE);
506   mb = GNUNET_malloc(sizeof(struct MigrationReadyBlock) + size);
507   mb->query = *key;
508   mb->expiration = expiration;
509   mb->size = size;
510   mb->type = type;
511   GNUNET_memcpy(&mb[1], data, size);
512   GNUNET_CONTAINER_DLL_insert_after(mig_head,
513                                     mig_tail,
514                                     mig_tail,
515                                     mb);
516   mig_size++;
517   for (pos = peer_head; NULL != pos; pos = pos->next)
518     {
519       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
520                  "Preparing to push best content to peer %s\n",
521                  GNUNET_i2s(GSF_connected_peer_get_identity2_(pos->peer)));
522       if ((NULL == pos->env) &&
523           (GNUNET_YES == transmit_content(pos,
524                                           mb)))
525         {
526           break;              /* 'mb' was freed! */
527         }
528     }
529   consider_gathering();
530 }
531
532
533 /**
534  * Task that is run periodically to obtain blocks for content
535  * migration
536  *
537  * @param cls unused
538  */
539 static void
540 gather_migration_blocks(void *cls)
541 {
542   mig_task = NULL;
543   if (mig_size >= MAX_MIGRATION_QUEUE)
544     return;
545   if (NULL == GSF_dsh)
546     return;
547   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
548              "Asking datastore for content for replication (queue size: %u)\n",
549              mig_size);
550   value_found = GNUNET_NO;
551   mig_qe = GNUNET_DATASTORE_get_for_replication(GSF_dsh,
552                                                 0,
553                                                 UINT_MAX,
554                                                 &process_migration_content,
555                                                 NULL);
556   if (NULL == mig_qe)
557     consider_gathering();
558 }
559
560
561 /**
562  * A peer connected to us.  Start pushing content
563  * to this peer.
564  *
565  * @param peer handle for the peer that connected
566  */
567 void
568 GSF_push_start_(struct GSF_ConnectedPeer *peer)
569 {
570   struct MigrationReadyPeer *mrp;
571
572   if (GNUNET_YES != enabled)
573     return;
574   for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
575     if (mrp->peer == peer)
576       break;
577   if (NULL != mrp)
578     {
579       /* same peer added twice, must not happen */
580       GNUNET_break(0);
581       return;
582     }
583
584   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
585              "Adding peer %s to list for pushing\n",
586              GNUNET_i2s(GSF_connected_peer_get_identity2_(peer)));
587
588   mrp = GNUNET_new(struct MigrationReadyPeer);
589   mrp->peer = peer;
590   find_content(mrp);
591   GNUNET_CONTAINER_DLL_insert(peer_head,
592                               peer_tail,
593                               mrp);
594 }
595
596
597 /**
598  * A peer disconnected from us.  Stop pushing content
599  * to this peer.
600  *
601  * @param peer handle for the peer that disconnected
602  */
603 void
604 GSF_push_stop_(struct GSF_ConnectedPeer *peer)
605 {
606   struct MigrationReadyPeer *pos;
607
608   for (pos = peer_head; NULL != pos; pos = pos->next)
609     if (pos->peer == peer)
610       break;
611   if (NULL == pos)
612     return;
613   if (NULL != pos->env)
614     GNUNET_MQ_send_cancel(pos->env);
615   GNUNET_CONTAINER_DLL_remove(peer_head,
616                               peer_tail,
617                               pos);
618   GNUNET_free(pos);
619 }
620
621
622 /**
623  * Setup the module.
624  */
625 void
626 GSF_push_init_()
627 {
628   enabled =
629     GNUNET_CONFIGURATION_get_value_yesno(GSF_cfg,
630                                          "FS",
631                                          "CONTENT_PUSHING");
632   if (GNUNET_YES != enabled)
633     return;
634
635   if (GNUNET_OK !=
636       GNUNET_CONFIGURATION_get_value_time(GSF_cfg,
637                                           "fs",
638                                           "MIN_MIGRATION_DELAY",
639                                           &min_migration_delay))
640     {
641       GNUNET_log_config_invalid(GNUNET_ERROR_TYPE_WARNING,
642                                 "fs",
643                                 "MIN_MIGRATION_DELAY",
644                                 _("time required, content pushing disabled"));
645       return;
646     }
647   consider_gathering();
648 }
649
650
651 /**
652  * Shutdown the module.
653  */
654 void
655 GSF_push_done_()
656 {
657   if (NULL != mig_task)
658     {
659       GNUNET_SCHEDULER_cancel(mig_task);
660       mig_task = NULL;
661     }
662   if (NULL != mig_qe)
663     {
664       GNUNET_DATASTORE_cancel(mig_qe);
665       mig_qe = NULL;
666     }
667   while (NULL != mig_head)
668     delete_migration_block(mig_head);
669   GNUNET_assert(0 == mig_size);
670 }
671
672 /* end of gnunet-service-fs_push.c */