2 This file is part of GNUnet.
3 Copyright (C) 2011, 2016 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file fs/gnunet-service-fs_push.c
23 * @brief API to push content from our datastore to other peers
24 * ('anonymous'-content P2P migration)
25 * @author Christian Grothoff
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
35 * Maximum number of blocks we keep in memory for migration.
37 #define MAX_MIGRATION_QUEUE 8
40 * Blocks are at most migrated to this number of peers
41 * plus one, each time they are fetched from the database.
43 #define MIGRATION_LIST_SIZE 2
46 * How long must content remain valid for us to consider it for migration?
47 * If content will expire too soon, there is clearly no point in pushing
48 * it to other peers. This value gives the threshold for migration. Note
49 * that if this value is increased, the migration testcase may need to be
50 * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 30)
56 * Block that is ready for migration to other peers. Actual data is at the end of the block.
58 struct MigrationReadyBlock {
60 * This is a doubly-linked list.
62 struct MigrationReadyBlock *next;
65 * This is a doubly-linked list.
67 struct MigrationReadyBlock *prev;
70 * Query for the block.
72 struct GNUNET_HashCode query;
75 * When does this block expire?
77 struct GNUNET_TIME_Absolute expiration;
80 * Peers we already forwarded this
81 * block to. Zero for empty entries.
83 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
91 * Number of targets already used.
93 unsigned int used_targets;
98 enum GNUNET_BLOCK_Type type;
103 * Information about a peer waiting for migratable data.
105 struct MigrationReadyPeer {
107 * This is a doubly-linked list.
109 struct MigrationReadyPeer *next;
112 * This is a doubly-linked list.
114 struct MigrationReadyPeer *prev;
119 struct GSF_ConnectedPeer *peer;
122 * Envelope of the currently pushed message.
124 struct GNUNET_MQ_Envelope *env;
129 * Head of linked list of blocks that can be migrated.
131 static struct MigrationReadyBlock *mig_head;
134 * Tail of linked list of blocks that can be migrated.
136 static struct MigrationReadyBlock *mig_tail;
139 * Head of linked list of peers.
141 static struct MigrationReadyPeer *peer_head;
144 * Tail of linked list of peers.
146 static struct MigrationReadyPeer *peer_tail;
149 * Request to datastore for migration (or NULL).
151 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
154 * ID of task that collects blocks for migration.
156 static struct GNUNET_SCHEDULER_Task *mig_task;
159 * What is the maximum frequency at which we are allowed to
160 * poll the datastore for migration content?
162 static struct GNUNET_TIME_Relative min_migration_delay;
165 * Size of the doubly-linked list of migration blocks.
167 static unsigned int mig_size;
170 * Is this module enabled?
175 * Did we find anything in the datastore?
177 static int value_found;
181 * Delete the given migration block.
183 * @param mb block to delete
186 delete_migration_block(struct MigrationReadyBlock *mb)
188 GNUNET_CONTAINER_DLL_remove(mig_head,
191 GNUNET_PEER_decrement_rcs(mb->target_list,
192 MIGRATION_LIST_SIZE);
199 * Find content for migration to this peer.
201 * @param cls a `struct MigrationReadyPeer *`
204 find_content(void *cls);
208 * Send the given block to the given peer.
210 * @param peer target peer
211 * @param block the block
212 * @return #GNUNET_YES if the block was deleted (!)
215 transmit_content(struct MigrationReadyPeer *mrp,
216 struct MigrationReadyBlock *block)
218 struct PutMessage *msg;
220 struct GSF_PeerPerformanceData *ppd;
223 ppd = GSF_get_peer_performance_data_(mrp->peer);
224 GNUNET_assert(NULL == mrp->env);
225 mrp->env = GNUNET_MQ_msg_extra(msg,
227 GNUNET_MESSAGE_TYPE_FS_PUT);
228 msg->type = htonl(block->type);
229 msg->expiration = GNUNET_TIME_absolute_hton(block->expiration);
230 GNUNET_memcpy(&msg[1],
233 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
235 if (block->target_list[i] == 0)
237 block->target_list[i] = ppd->pid;
238 GNUNET_PEER_change_rc(block->target_list[i],
243 if (MIGRATION_LIST_SIZE == i)
245 delete_migration_block(block);
252 GNUNET_MQ_notify_sent(mrp->env,
255 GSF_peer_transmit_(mrp->peer,
264 * Count the number of peers this block has
265 * already been forwarded to.
267 * @param block the block
268 * @return number of times block was forwarded
271 count_targets(struct MigrationReadyBlock *block)
275 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
276 if (block->target_list[i] == 0)
283 * Check if sending this block to this peer would
286 * @param mrp target peer
287 * @param block the block
288 * @return score (>= 0: feasible, negative: infeasible)
291 score_content(struct MigrationReadyPeer *mrp,
292 struct MigrationReadyBlock *block)
295 struct GSF_PeerPerformanceData *ppd;
296 struct GNUNET_PeerIdentity id;
297 struct GNUNET_HashCode hc;
300 ppd = GSF_get_peer_performance_data_(mrp->peer);
301 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
302 if (block->target_list[i] == ppd->pid)
304 GNUNET_assert(0 != ppd->pid);
305 GNUNET_PEER_resolve(ppd->pid,
307 GNUNET_CRYPTO_hash(&id,
308 sizeof(struct GNUNET_PeerIdentity),
310 dist = GNUNET_CRYPTO_hash_distance_u32(&block->query,
312 /* closer distance, higher score: */
313 return UINT32_MAX - dist;
318 * If the migration task is not currently running, consider
319 * (re)scheduling it with the appropriate delay.
322 consider_gathering(void);
326 * Find content for migration to this peer.
328 * @param cls peer to find content for
331 find_content(void *cls)
333 struct MigrationReadyPeer *mrp = cls;
334 struct MigrationReadyBlock *pos;
337 struct MigrationReadyBlock *best;
345 score = score_content(mrp, pos);
346 if (score > best_score)
355 if (mig_size < MAX_MIGRATION_QUEUE)
357 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
358 "No content found for pushing, waiting for queue to fill\n");
359 return; /* will fill up eventually... */
361 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
362 "No suitable content found, purging content from full queue\n");
363 /* failed to find migration target AND
364 * queue is full, purge most-forwarded
365 * block from queue to make room for more */
369 score = count_targets(pos);
370 if (score >= best_score)
377 GNUNET_assert(NULL != best);
378 delete_migration_block(best);
379 consider_gathering();
382 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
383 "Preparing to push best content to peer\n");
384 transmit_content(mrp,
390 * Task that is run periodically to obtain blocks for content
396 gather_migration_blocks(void *cls);
400 * If the migration task is not currently running, consider
401 * (re)scheduling it with the appropriate delay.
406 struct GNUNET_TIME_Relative delay;
412 if (NULL != mig_task)
414 if (mig_size >= MAX_MIGRATION_QUEUE)
416 delay = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
418 delay = GNUNET_TIME_relative_divide(delay,
419 MAX_MIGRATION_QUEUE);
420 delay = GNUNET_TIME_relative_max(delay,
421 min_migration_delay);
422 if (GNUNET_NO == value_found)
424 /* wait at least 5s if the datastore is empty */
425 delay = GNUNET_TIME_relative_max(delay,
426 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
429 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
430 "Scheduling gathering task (queue size: %u)\n",
432 mig_task = GNUNET_SCHEDULER_add_delayed(delay,
433 &gather_migration_blocks,
439 * Process content offered for migration.
442 * @param key key for the content
443 * @param size number of bytes in data
444 * @param data content stored
445 * @param type type of the content
446 * @param priority priority of the content
447 * @param anonymity anonymity-level for the content
448 * @param replication replication-level for the content
449 * @param expiration expiration time for the content
450 * @param uid unique identifier for the datum;
451 * maybe 0 if no unique identifier is available
454 process_migration_content(void *cls,
455 const struct GNUNET_HashCode *key,
458 enum GNUNET_BLOCK_Type type,
461 uint32_t replication,
462 struct GNUNET_TIME_Absolute expiration,
465 struct MigrationReadyBlock *mb;
466 struct MigrationReadyPeer *pos;
471 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
472 "No content found for migration...\n");
473 consider_gathering();
476 value_found = GNUNET_YES;
477 if (GNUNET_TIME_absolute_get_remaining(expiration).rel_value_us <
478 MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
480 /* content will expire soon, don't bother */
481 consider_gathering();
484 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
487 GNUNET_FS_handle_on_demand_block(key,
496 &process_migration_content,
498 consider_gathering();
501 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
502 "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
505 MAX_MIGRATION_QUEUE);
506 mb = GNUNET_malloc(sizeof(struct MigrationReadyBlock) + size);
508 mb->expiration = expiration;
511 GNUNET_memcpy(&mb[1], data, size);
512 GNUNET_CONTAINER_DLL_insert_after(mig_head,
517 for (pos = peer_head; NULL != pos; pos = pos->next)
519 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
520 "Preparing to push best content to peer %s\n",
521 GNUNET_i2s(GSF_connected_peer_get_identity2_(pos->peer)));
522 if ((NULL == pos->env) &&
523 (GNUNET_YES == transmit_content(pos,
526 break; /* 'mb' was freed! */
529 consider_gathering();
534 * Task that is run periodically to obtain blocks for content
540 gather_migration_blocks(void *cls)
543 if (mig_size >= MAX_MIGRATION_QUEUE)
547 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
548 "Asking datastore for content for replication (queue size: %u)\n",
550 value_found = GNUNET_NO;
551 mig_qe = GNUNET_DATASTORE_get_for_replication(GSF_dsh,
554 &process_migration_content,
557 consider_gathering();
562 * A peer connected to us. Start pushing content
565 * @param peer handle for the peer that connected
568 GSF_push_start_(struct GSF_ConnectedPeer *peer)
570 struct MigrationReadyPeer *mrp;
572 if (GNUNET_YES != enabled)
574 for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
575 if (mrp->peer == peer)
579 /* same peer added twice, must not happen */
584 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
585 "Adding peer %s to list for pushing\n",
586 GNUNET_i2s(GSF_connected_peer_get_identity2_(peer)));
588 mrp = GNUNET_new(struct MigrationReadyPeer);
591 GNUNET_CONTAINER_DLL_insert(peer_head,
598 * A peer disconnected from us. Stop pushing content
601 * @param peer handle for the peer that disconnected
604 GSF_push_stop_(struct GSF_ConnectedPeer *peer)
606 struct MigrationReadyPeer *pos;
608 for (pos = peer_head; NULL != pos; pos = pos->next)
609 if (pos->peer == peer)
613 if (NULL != pos->env)
614 GNUNET_MQ_send_cancel(pos->env);
615 GNUNET_CONTAINER_DLL_remove(peer_head,
629 GNUNET_CONFIGURATION_get_value_yesno(GSF_cfg,
632 if (GNUNET_YES != enabled)
636 GNUNET_CONFIGURATION_get_value_time(GSF_cfg,
638 "MIN_MIGRATION_DELAY",
639 &min_migration_delay))
641 GNUNET_log_config_invalid(GNUNET_ERROR_TYPE_WARNING,
643 "MIN_MIGRATION_DELAY",
644 _("time required, content pushing disabled"));
647 consider_gathering();
652 * Shutdown the module.
657 if (NULL != mig_task)
659 GNUNET_SCHEDULER_cancel(mig_task);
664 GNUNET_DATASTORE_cancel(mig_qe);
667 while (NULL != mig_head)
668 delete_migration_block(mig_head);
669 GNUNET_assert(0 == mig_size);
672 /* end of gnunet-service-fs_push.c */