2 This file is part of GNUnet.
3 Copyright (C) 2011, 2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file fs/gnunet-service-fs_push.c
23 * @brief API to push content from our datastore to other peers
24 * ('anonymous'-content P2P migration)
25 * @author Christian Grothoff
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
35 * Maximum number of blocks we keep in memory for migration.
37 #define MAX_MIGRATION_QUEUE 8
40 * Blocks are at most migrated to this number of peers
41 * plus one, each time they are fetched from the database.
43 #define MIGRATION_LIST_SIZE 2
46 * How long must content remain valid for us to consider it for migration?
47 * If content will expire too soon, there is clearly no point in pushing
48 * it to other peers. This value gives the threshold for migration. Note
49 * that if this value is increased, the migration testcase may need to be
50 * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
56 * Block that is ready for migration to other peers. Actual data is at the end of the block.
58 struct MigrationReadyBlock
62 * This is a doubly-linked list.
64 struct MigrationReadyBlock *next;
67 * This is a doubly-linked list.
69 struct MigrationReadyBlock *prev;
72 * Query for the block.
74 struct GNUNET_HashCode query;
77 * When does this block expire?
79 struct GNUNET_TIME_Absolute expiration;
82 * Peers we already forwarded this
83 * block to. Zero for empty entries.
85 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
93 * Number of targets already used.
95 unsigned int used_targets;
100 enum GNUNET_BLOCK_Type type;
105 * Information about a peer waiting for migratable data.
107 struct MigrationReadyPeer
110 * This is a doubly-linked list.
112 struct MigrationReadyPeer *next;
115 * This is a doubly-linked list.
117 struct MigrationReadyPeer *prev;
122 struct GSF_ConnectedPeer *peer;
125 * Envelope of the currently pushed message.
127 struct GNUNET_MQ_Envelope *env;
132 * Head of linked list of blocks that can be migrated.
134 static struct MigrationReadyBlock *mig_head;
137 * Tail of linked list of blocks that can be migrated.
139 static struct MigrationReadyBlock *mig_tail;
142 * Head of linked list of peers.
144 static struct MigrationReadyPeer *peer_head;
147 * Tail of linked list of peers.
149 static struct MigrationReadyPeer *peer_tail;
152 * Request to datastore for migration (or NULL).
154 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
157 * ID of task that collects blocks for migration.
159 static struct GNUNET_SCHEDULER_Task *mig_task;
162 * What is the maximum frequency at which we are allowed to
163 * poll the datastore for migration content?
165 static struct GNUNET_TIME_Relative min_migration_delay;
168 * Size of the doubly-linked list of migration blocks.
170 static unsigned int mig_size;
173 * Is this module enabled?
178 * Did we find anything in the datastore?
180 static int value_found;
184 * Delete the given migration block.
186 * @param mb block to delete
189 delete_migration_block (struct MigrationReadyBlock *mb)
191 GNUNET_CONTAINER_DLL_remove (mig_head,
194 GNUNET_PEER_decrement_rcs (mb->target_list,
195 MIGRATION_LIST_SIZE);
202 * Find content for migration to this peer.
204 * @param cls a `struct MigrationReadyPeer *`
207 find_content (void *cls);
211 * Send the given block to the given peer.
213 * @param peer target peer
214 * @param block the block
215 * @return #GNUNET_YES if the block was deleted (!)
218 transmit_content (struct MigrationReadyPeer *mrp,
219 struct MigrationReadyBlock *block)
221 struct PutMessage *msg;
223 struct GSF_PeerPerformanceData *ppd;
226 ppd = GSF_get_peer_performance_data_ (mrp->peer);
227 GNUNET_assert (NULL == mrp->env);
228 mrp->env = GNUNET_MQ_msg_extra (msg,
230 GNUNET_MESSAGE_TYPE_FS_PUT);
231 msg->type = htonl (block->type);
232 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
233 GNUNET_memcpy (&msg[1],
236 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
238 if (block->target_list[i] == 0)
240 block->target_list[i] = ppd->pid;
241 GNUNET_PEER_change_rc (block->target_list[i],
246 if (MIGRATION_LIST_SIZE == i)
248 delete_migration_block (block);
255 GNUNET_MQ_notify_sent (mrp->env,
258 GSF_peer_transmit_ (mrp->peer,
267 * Count the number of peers this block has
268 * already been forwarded to.
270 * @param block the block
271 * @return number of times block was forwarded
274 count_targets (struct MigrationReadyBlock *block)
278 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
279 if (block->target_list[i] == 0)
286 * Check if sending this block to this peer would
289 * @param mrp target peer
290 * @param block the block
291 * @return score (>= 0: feasible, negative: infeasible)
294 score_content (struct MigrationReadyPeer *mrp,
295 struct MigrationReadyBlock *block)
298 struct GSF_PeerPerformanceData *ppd;
299 struct GNUNET_PeerIdentity id;
300 struct GNUNET_HashCode hc;
303 ppd = GSF_get_peer_performance_data_ (mrp->peer);
304 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
305 if (block->target_list[i] == ppd->pid)
307 GNUNET_assert (0 != ppd->pid);
308 GNUNET_PEER_resolve (ppd->pid,
310 GNUNET_CRYPTO_hash (&id,
311 sizeof (struct GNUNET_PeerIdentity),
313 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
315 /* closer distance, higher score: */
316 return UINT32_MAX - dist;
321 * If the migration task is not currently running, consider
322 * (re)scheduling it with the appropriate delay.
325 consider_gathering (void);
329 * Find content for migration to this peer.
331 * @param cls peer to find content for
334 find_content (void *cls)
336 struct MigrationReadyPeer *mrp = cls;
337 struct MigrationReadyBlock *pos;
340 struct MigrationReadyBlock *best;
348 score = score_content (mrp, pos);
349 if (score > best_score)
358 if (mig_size < MAX_MIGRATION_QUEUE)
360 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
361 "No content found for pushing, waiting for queue to fill\n");
362 return; /* will fill up eventually... */
364 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365 "No suitable content found, purging content from full queue\n");
366 /* failed to find migration target AND
367 * queue is full, purge most-forwarded
368 * block from queue to make room for more */
372 score = count_targets (pos);
373 if (score >= best_score)
380 GNUNET_assert (NULL != best);
381 delete_migration_block (best);
382 consider_gathering ();
385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386 "Preparing to push best content to peer\n");
387 transmit_content (mrp,
393 * Task that is run periodically to obtain blocks for content
399 gather_migration_blocks (void *cls);
403 * If the migration task is not currently running, consider
404 * (re)scheduling it with the appropriate delay.
407 consider_gathering ()
409 struct GNUNET_TIME_Relative delay;
415 if (NULL != mig_task)
417 if (mig_size >= MAX_MIGRATION_QUEUE)
419 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
421 delay = GNUNET_TIME_relative_divide (delay,
422 MAX_MIGRATION_QUEUE);
423 delay = GNUNET_TIME_relative_max (delay,
424 min_migration_delay);
425 if (GNUNET_NO == value_found)
427 /* wait at least 5s if the datastore is empty */
428 delay = GNUNET_TIME_relative_max (delay,
429 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
432 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433 "Scheduling gathering task (queue size: %u)\n",
435 mig_task = GNUNET_SCHEDULER_add_delayed (delay,
436 &gather_migration_blocks,
442 * Process content offered for migration.
445 * @param key key for the content
446 * @param size number of bytes in data
447 * @param data content stored
448 * @param type type of the content
449 * @param priority priority of the content
450 * @param anonymity anonymity-level for the content
451 * @param expiration expiration time for the content
452 * @param uid unique identifier for the datum;
453 * maybe 0 if no unique identifier is available
456 process_migration_content (void *cls,
457 const struct GNUNET_HashCode *key,
460 enum GNUNET_BLOCK_Type type,
463 struct GNUNET_TIME_Absolute expiration,
466 struct MigrationReadyBlock *mb;
467 struct MigrationReadyPeer *pos;
472 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
473 "No content found for migration...\n");
474 consider_gathering ();
477 value_found = GNUNET_YES;
478 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
479 MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
481 /* content will expire soon, don't bother */
482 consider_gathering ();
485 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
488 GNUNET_FS_handle_on_demand_block (key,
496 &process_migration_content, NULL))
497 consider_gathering ();
500 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501 "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
504 MAX_MIGRATION_QUEUE);
505 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
507 mb->expiration = expiration;
510 GNUNET_memcpy (&mb[1], data, size);
511 GNUNET_CONTAINER_DLL_insert_after (mig_head,
516 for (pos = peer_head; NULL != pos; pos = pos->next)
518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519 "Preparing to push best content to peer %s\n",
520 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
521 if ( (NULL == pos->env) &&
522 (GNUNET_YES == transmit_content (pos,
524 break; /* 'mb' was freed! */
527 consider_gathering ();
532 * Task that is run periodically to obtain blocks for content
538 gather_migration_blocks (void *cls)
541 if (mig_size >= MAX_MIGRATION_QUEUE)
545 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
546 "Asking datastore for content for replication (queue size: %u)\n",
548 value_found = GNUNET_NO;
549 mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
552 &process_migration_content,
555 consider_gathering ();
560 * A peer connected to us. Start pushing content
563 * @param peer handle for the peer that connected
566 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
568 struct MigrationReadyPeer *mrp;
570 if (GNUNET_YES != enabled)
572 for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
573 if (mrp->peer == peer)
577 /* same peer added twice, must not happen */
582 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
583 "Adding peer %s to list for pushing\n",
584 GNUNET_i2s (GSF_connected_peer_get_identity2_(peer)));
586 mrp = GNUNET_new (struct MigrationReadyPeer);
589 GNUNET_CONTAINER_DLL_insert (peer_head,
596 * A peer disconnected from us. Stop pushing content
599 * @param peer handle for the peer that disconnected
602 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
604 struct MigrationReadyPeer *pos;
606 for (pos = peer_head; NULL != pos; pos = pos->next)
607 if (pos->peer == peer)
611 if (NULL != pos->env)
612 GNUNET_MQ_send_cancel (pos->env);
613 GNUNET_CONTAINER_DLL_remove (peer_head,
627 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
630 if (GNUNET_YES != enabled)
634 GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
636 "MIN_MIGRATION_DELAY",
637 &min_migration_delay))
639 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
641 "MIN_MIGRATION_DELAY",
642 _("time required, content pushing disabled"));
645 consider_gathering ();
650 * Shutdown the module.
655 if (NULL != mig_task)
657 GNUNET_SCHEDULER_cancel (mig_task);
662 GNUNET_DATASTORE_cancel (mig_qe);
665 while (NULL != mig_head)
666 delete_migration_block (mig_head);
667 GNUNET_assert (0 == mig_size);
670 /* end of gnunet-service-fs_push.c */