2 This file is part of GNUnet.
3 Copyright (C) 2011, 2016 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
17 * @file fs/gnunet-service-fs_push.c
18 * @brief API to push content from our datastore to other peers
19 * ('anonymous'-content P2P migration)
20 * @author Christian Grothoff
23 #include "gnunet-service-fs.h"
24 #include "gnunet-service-fs_cp.h"
25 #include "gnunet-service-fs_indexing.h"
26 #include "gnunet-service-fs_push.h"
30 * Maximum number of blocks we keep in memory for migration.
32 #define MAX_MIGRATION_QUEUE 8
35 * Blocks are at most migrated to this number of peers
36 * plus one, each time they are fetched from the database.
38 #define MIGRATION_LIST_SIZE 2
41 * How long must content remain valid for us to consider it for migration?
42 * If content will expire too soon, there is clearly no point in pushing
43 * it to other peers. This value gives the threshold for migration. Note
44 * that if this value is increased, the migration testcase may need to be
45 * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
47 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
51 * Block that is ready for migration to other peers. Actual data is at the end of the block.
53 struct MigrationReadyBlock
57 * This is a doubly-linked list.
59 struct MigrationReadyBlock *next;
62 * This is a doubly-linked list.
64 struct MigrationReadyBlock *prev;
67 * Query for the block.
69 struct GNUNET_HashCode query;
72 * When does this block expire?
74 struct GNUNET_TIME_Absolute expiration;
77 * Peers we already forwarded this
78 * block to. Zero for empty entries.
80 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
88 * Number of targets already used.
90 unsigned int used_targets;
95 enum GNUNET_BLOCK_Type type;
100 * Information about a peer waiting for migratable data.
102 struct MigrationReadyPeer
105 * This is a doubly-linked list.
107 struct MigrationReadyPeer *next;
110 * This is a doubly-linked list.
112 struct MigrationReadyPeer *prev;
117 struct GSF_ConnectedPeer *peer;
120 * Envelope of the currently pushed message.
122 struct GNUNET_MQ_Envelope *env;
127 * Head of linked list of blocks that can be migrated.
129 static struct MigrationReadyBlock *mig_head;
132 * Tail of linked list of blocks that can be migrated.
134 static struct MigrationReadyBlock *mig_tail;
137 * Head of linked list of peers.
139 static struct MigrationReadyPeer *peer_head;
142 * Tail of linked list of peers.
144 static struct MigrationReadyPeer *peer_tail;
147 * Request to datastore for migration (or NULL).
149 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
152 * ID of task that collects blocks for migration.
154 static struct GNUNET_SCHEDULER_Task *mig_task;
157 * What is the maximum frequency at which we are allowed to
158 * poll the datastore for migration content?
160 static struct GNUNET_TIME_Relative min_migration_delay;
163 * Size of the doubly-linked list of migration blocks.
165 static unsigned int mig_size;
168 * Is this module enabled?
173 * Did we find anything in the datastore?
175 static int value_found;
179 * Delete the given migration block.
181 * @param mb block to delete
184 delete_migration_block (struct MigrationReadyBlock *mb)
186 GNUNET_CONTAINER_DLL_remove (mig_head,
189 GNUNET_PEER_decrement_rcs (mb->target_list,
190 MIGRATION_LIST_SIZE);
197 * Find content for migration to this peer.
199 * @param cls a `struct MigrationReadyPeer *`
202 find_content (void *cls);
206 * Send the given block to the given peer.
208 * @param peer target peer
209 * @param block the block
210 * @return #GNUNET_YES if the block was deleted (!)
213 transmit_content (struct MigrationReadyPeer *mrp,
214 struct MigrationReadyBlock *block)
216 struct PutMessage *msg;
218 struct GSF_PeerPerformanceData *ppd;
221 ppd = GSF_get_peer_performance_data_ (mrp->peer);
222 GNUNET_assert (NULL == mrp->env);
223 mrp->env = GNUNET_MQ_msg_extra (msg,
225 GNUNET_MESSAGE_TYPE_FS_PUT);
226 msg->type = htonl (block->type);
227 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
228 GNUNET_memcpy (&msg[1],
231 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
233 if (block->target_list[i] == 0)
235 block->target_list[i] = ppd->pid;
236 GNUNET_PEER_change_rc (block->target_list[i],
241 if (MIGRATION_LIST_SIZE == i)
243 delete_migration_block (block);
250 GNUNET_MQ_notify_sent (mrp->env,
253 GSF_peer_transmit_ (mrp->peer,
262 * Count the number of peers this block has
263 * already been forwarded to.
265 * @param block the block
266 * @return number of times block was forwarded
269 count_targets (struct MigrationReadyBlock *block)
273 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
274 if (block->target_list[i] == 0)
281 * Check if sending this block to this peer would
284 * @param mrp target peer
285 * @param block the block
286 * @return score (>= 0: feasible, negative: infeasible)
289 score_content (struct MigrationReadyPeer *mrp,
290 struct MigrationReadyBlock *block)
293 struct GSF_PeerPerformanceData *ppd;
294 struct GNUNET_PeerIdentity id;
295 struct GNUNET_HashCode hc;
298 ppd = GSF_get_peer_performance_data_ (mrp->peer);
299 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
300 if (block->target_list[i] == ppd->pid)
302 GNUNET_assert (0 != ppd->pid);
303 GNUNET_PEER_resolve (ppd->pid,
305 GNUNET_CRYPTO_hash (&id,
306 sizeof (struct GNUNET_PeerIdentity),
308 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
310 /* closer distance, higher score: */
311 return UINT32_MAX - dist;
316 * If the migration task is not currently running, consider
317 * (re)scheduling it with the appropriate delay.
320 consider_gathering (void);
324 * Find content for migration to this peer.
326 * @param cls peer to find content for
329 find_content (void *cls)
331 struct MigrationReadyPeer *mrp = cls;
332 struct MigrationReadyBlock *pos;
335 struct MigrationReadyBlock *best;
343 score = score_content (mrp, pos);
344 if (score > best_score)
353 if (mig_size < MAX_MIGRATION_QUEUE)
355 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
356 "No content found for pushing, waiting for queue to fill\n");
357 return; /* will fill up eventually... */
359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360 "No suitable content found, purging content from full queue\n");
361 /* failed to find migration target AND
362 * queue is full, purge most-forwarded
363 * block from queue to make room for more */
367 score = count_targets (pos);
368 if (score >= best_score)
375 GNUNET_assert (NULL != best);
376 delete_migration_block (best);
377 consider_gathering ();
380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
381 "Preparing to push best content to peer\n");
382 transmit_content (mrp,
388 * Task that is run periodically to obtain blocks for content
394 gather_migration_blocks (void *cls);
398 * If the migration task is not currently running, consider
399 * (re)scheduling it with the appropriate delay.
402 consider_gathering ()
404 struct GNUNET_TIME_Relative delay;
410 if (NULL != mig_task)
412 if (mig_size >= MAX_MIGRATION_QUEUE)
414 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
416 delay = GNUNET_TIME_relative_divide (delay,
417 MAX_MIGRATION_QUEUE);
418 delay = GNUNET_TIME_relative_max (delay,
419 min_migration_delay);
420 if (GNUNET_NO == value_found)
422 /* wait at least 5s if the datastore is empty */
423 delay = GNUNET_TIME_relative_max (delay,
424 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
427 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
428 "Scheduling gathering task (queue size: %u)\n",
430 mig_task = GNUNET_SCHEDULER_add_delayed (delay,
431 &gather_migration_blocks,
437 * Process content offered for migration.
440 * @param key key for the content
441 * @param size number of bytes in data
442 * @param data content stored
443 * @param type type of the content
444 * @param priority priority of the content
445 * @param anonymity anonymity-level for the content
446 * @param replication replication-level for the content
447 * @param expiration expiration time for the content
448 * @param uid unique identifier for the datum;
449 * maybe 0 if no unique identifier is available
452 process_migration_content (void *cls,
453 const struct GNUNET_HashCode *key,
456 enum GNUNET_BLOCK_Type type,
459 uint32_t replication,
460 struct GNUNET_TIME_Absolute expiration,
463 struct MigrationReadyBlock *mb;
464 struct MigrationReadyPeer *pos;
469 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
470 "No content found for migration...\n");
471 consider_gathering ();
474 value_found = GNUNET_YES;
475 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
476 MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
478 /* content will expire soon, don't bother */
479 consider_gathering ();
482 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
485 GNUNET_FS_handle_on_demand_block (key,
494 &process_migration_content,
496 consider_gathering ();
499 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500 "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
503 MAX_MIGRATION_QUEUE);
504 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
506 mb->expiration = expiration;
509 GNUNET_memcpy (&mb[1], data, size);
510 GNUNET_CONTAINER_DLL_insert_after (mig_head,
515 for (pos = peer_head; NULL != pos; pos = pos->next)
517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518 "Preparing to push best content to peer %s\n",
519 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
520 if ( (NULL == pos->env) &&
521 (GNUNET_YES == transmit_content (pos,
523 break; /* 'mb' was freed! */
526 consider_gathering ();
531 * Task that is run periodically to obtain blocks for content
537 gather_migration_blocks (void *cls)
540 if (mig_size >= MAX_MIGRATION_QUEUE)
544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545 "Asking datastore for content for replication (queue size: %u)\n",
547 value_found = GNUNET_NO;
548 mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
551 &process_migration_content,
554 consider_gathering ();
559 * A peer connected to us. Start pushing content
562 * @param peer handle for the peer that connected
565 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
567 struct MigrationReadyPeer *mrp;
569 if (GNUNET_YES != enabled)
571 for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
572 if (mrp->peer == peer)
576 /* same peer added twice, must not happen */
581 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
582 "Adding peer %s to list for pushing\n",
583 GNUNET_i2s (GSF_connected_peer_get_identity2_(peer)));
585 mrp = GNUNET_new (struct MigrationReadyPeer);
588 GNUNET_CONTAINER_DLL_insert (peer_head,
595 * A peer disconnected from us. Stop pushing content
598 * @param peer handle for the peer that disconnected
601 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
603 struct MigrationReadyPeer *pos;
605 for (pos = peer_head; NULL != pos; pos = pos->next)
606 if (pos->peer == peer)
610 if (NULL != pos->env)
611 GNUNET_MQ_send_cancel (pos->env);
612 GNUNET_CONTAINER_DLL_remove (peer_head,
626 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
629 if (GNUNET_YES != enabled)
633 GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
635 "MIN_MIGRATION_DELAY",
636 &min_migration_delay))
638 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
640 "MIN_MIGRATION_DELAY",
641 _("time required, content pushing disabled"));
644 consider_gathering ();
649 * Shutdown the module.
654 if (NULL != mig_task)
656 GNUNET_SCHEDULER_cancel (mig_task);
661 GNUNET_DATASTORE_cancel (mig_qe);
664 while (NULL != mig_head)
665 delete_migration_block (mig_head);
666 GNUNET_assert (0 == mig_size);
669 /* end of gnunet-service-fs_push.c */