2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_push.c
23 * @brief API to push content from our datastore to other peers
24 * ('anonymous'-content P2P migration)
25 * @author Christian Grothoff
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
34 #define DEBUG_FS_MIGRATION GNUNET_NO
37 * How long must content remain valid for us to consider it for migration?
38 * If content will expire too soon, there is clearly no point in pushing
39 * it to other peers. This value gives the threshold for migration. Note
40 * that if this value is increased, the migration testcase may need to be
41 * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
43 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
47 * Block that is ready for migration to other peers. Actual data is at the end of the block.
49 struct MigrationReadyBlock
53 * This is a doubly-linked list.
55 struct MigrationReadyBlock *next;
58 * This is a doubly-linked list.
60 struct MigrationReadyBlock *prev;
63 * Query for the block.
65 GNUNET_HashCode query;
68 * When does this block expire?
70 struct GNUNET_TIME_Absolute expiration;
73 * Peers we already forwarded this
74 * block to. Zero for empty entries.
76 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
84 * Number of targets already used.
86 unsigned int used_targets;
91 enum GNUNET_BLOCK_Type type;
96 * Information about a peer waiting for
99 struct MigrationReadyPeer
102 * This is a doubly-linked list.
104 struct MigrationReadyPeer *next;
107 * This is a doubly-linked list.
109 struct MigrationReadyPeer *prev;
114 struct GSF_ConnectedPeer *peer;
117 * Handle for current transmission request,
120 struct GSF_PeerTransmitHandle *th;
123 * Message we are trying to push right now (or NULL)
125 struct PutMessage *msg;
130 * Head of linked list of blocks that can be migrated.
132 static struct MigrationReadyBlock *mig_head;
135 * Tail of linked list of blocks that can be migrated.
137 static struct MigrationReadyBlock *mig_tail;
140 * Head of linked list of peers.
142 static struct MigrationReadyPeer *peer_head;
145 * Tail of linked list of peers.
147 static struct MigrationReadyPeer *peer_tail;
150 * Request to datastore for migration (or NULL).
152 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
155 * ID of task that collects blocks for migration.
157 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
160 * What is the maximum frequency at which we are allowed to
161 * poll the datastore for migration content?
163 static struct GNUNET_TIME_Relative min_migration_delay;
166 * Size of the doubly-linked list of migration blocks.
168 static unsigned int mig_size;
171 * Is this module enabled?
177 * Delete the given migration block.
179 * @param mb block to delete
182 delete_migration_block (struct MigrationReadyBlock *mb)
184 GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb);
185 GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE);
192 * Find content for migration to this peer.
195 find_content (struct MigrationReadyPeer *mrp);
199 * Transmit the message currently scheduled for
202 * @param cls the 'struct MigrationReadyPeer'
203 * @param buf_size number of bytes available in buf
204 * @param buf where to copy the message, NULL on error (peer disconnect)
205 * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
208 transmit_message (void *cls, size_t buf_size, void *buf)
210 struct MigrationReadyPeer *peer = cls;
211 struct PutMessage *msg;
219 #if DEBUG_FS_MIGRATION
220 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
221 "Failed to migrate content to another peer (disconnect)\n");
226 msize = ntohs (msg->header.size);
227 GNUNET_assert (msize <= buf_size);
228 memcpy (buf, msg, msize);
230 #if DEBUG_FS_MIGRATION
231 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pushing %u bytes to another peer\n",
240 * Send the given block to the given peer.
242 * @param peer target peer
243 * @param block the block
244 * @return GNUNET_YES if the block was deleted (!)
247 transmit_content (struct MigrationReadyPeer *peer,
248 struct MigrationReadyBlock *block)
251 struct PutMessage *msg;
253 struct GSF_PeerPerformanceData *ppd;
256 ppd = GSF_get_peer_performance_data_ (peer->peer);
257 GNUNET_assert (NULL == peer->th);
258 msize = sizeof (struct PutMessage) + block->size;
259 msg = GNUNET_malloc (msize);
260 msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
261 msg->header.size = htons (msize);
262 msg->type = htonl (block->type);
263 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
264 memcpy (&msg[1], &block[1], block->size);
266 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
268 if (block->target_list[i] == 0)
270 block->target_list[i] = ppd->pid;
271 GNUNET_PEER_change_rc (block->target_list[i], 1);
275 if (MIGRATION_LIST_SIZE == i)
277 delete_migration_block (block);
284 #if DEBUG_FS_MIGRATION
285 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
286 "Asking for transmission of %u bytes for migration\n", msize);
288 peer->th = GSF_peer_transmit_ (peer->peer, GNUNET_NO, 0 /* priority */ ,
289 GNUNET_TIME_UNIT_FOREVER_REL, msize,
290 &transmit_message, peer);
296 * Count the number of peers this block has
297 * already been forwarded to.
299 * @param block the block
300 * @return number of times block was forwarded
303 count_targets (struct MigrationReadyBlock *block)
307 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
308 if (block->target_list[i] == 0)
315 * Check if sending this block to this peer would
318 * @param peer target peer
319 * @param block the block
320 * @return score (>= 0: feasible, negative: infeasible)
323 score_content (struct MigrationReadyPeer *peer,
324 struct MigrationReadyBlock *block)
327 struct GSF_PeerPerformanceData *ppd;
328 struct GNUNET_PeerIdentity id;
331 ppd = GSF_get_peer_performance_data_ (peer->peer);
332 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
333 if (block->target_list[i] == ppd->pid)
335 GNUNET_assert (0 != ppd->pid);
336 GNUNET_PEER_resolve (ppd->pid, &id);
337 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &id.hashPubKey);
338 /* closer distance, higher score: */
339 return UINT32_MAX - dist;
344 * If the migration task is not currently running, consider
345 * (re)scheduling it with the appropriate delay.
348 consider_gathering (void);
352 * Find content for migration to this peer.
354 * @param mrp peer to find content for
357 find_content (struct MigrationReadyPeer *mrp)
359 struct MigrationReadyBlock *pos;
362 struct MigrationReadyBlock *best;
364 GNUNET_assert (NULL == mrp->th);
370 score = score_content (mrp, pos);
371 if (score > best_score)
380 if (mig_size < MAX_MIGRATION_QUEUE)
382 #if DEBUG_FS_MIGRATION
383 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384 "No content found for pushing, waiting for queue to fill\n");
386 return; /* will fill up eventually... */
388 #if DEBUG_FS_MIGRATION
389 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
390 "No suitable content found, purging content from full queue\n");
392 /* failed to find migration target AND
393 * queue is full, purge most-forwarded
394 * block from queue to make room for more */
398 score = count_targets (pos);
399 if (score >= best_score)
406 GNUNET_assert (NULL != best);
407 delete_migration_block (best);
408 consider_gathering ();
411 #if DEBUG_FS_MIGRATION
412 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413 "Preparing to push best content to peer\n");
415 transmit_content (mrp, best);
420 * Task that is run periodically to obtain blocks for content
424 * @param tc scheduler context (also unused)
427 gather_migration_blocks (void *cls,
428 const struct GNUNET_SCHEDULER_TaskContext *tc);
432 * If the migration task is not currently running, consider
433 * (re)scheduling it with the appropriate delay.
436 consider_gathering ()
438 struct GNUNET_TIME_Relative delay;
444 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
446 if (mig_size >= MAX_MIGRATION_QUEUE)
448 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size);
449 delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE);
450 delay = GNUNET_TIME_relative_max (delay, min_migration_delay);
451 #if DEBUG_FS_MIGRATION
452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453 "Scheduling gathering task (queue size: %u)\n", mig_size);
456 GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL);
461 * Process content offered for migration.
464 * @param key key for the content
465 * @param size number of bytes in data
466 * @param data content stored
467 * @param type type of the content
468 * @param priority priority of the content
469 * @param anonymity anonymity-level for the content
470 * @param expiration expiration time for the content
471 * @param uid unique identifier for the datum;
472 * maybe 0 if no unique identifier is available
475 process_migration_content (void *cls, const GNUNET_HashCode * key, size_t size,
476 const void *data, enum GNUNET_BLOCK_Type type,
477 uint32_t priority, uint32_t anonymity,
478 struct GNUNET_TIME_Absolute expiration, uint64_t uid)
480 struct MigrationReadyBlock *mb;
481 struct MigrationReadyPeer *pos;
486 #if DEBUG_FS_MIGRATION
487 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for migration...\n");
489 consider_gathering ();
492 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
493 MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
495 /* content will expire soon, don't bother */
496 consider_gathering ();
499 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
502 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
503 anonymity, expiration, uid,
504 &process_migration_content, NULL))
505 consider_gathering ();
508 #if DEBUG_FS_MIGRATION
509 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510 "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
511 GNUNET_h2s (key), type, mig_size + 1, MAX_MIGRATION_QUEUE);
513 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
515 mb->expiration = expiration;
518 memcpy (&mb[1], data, size);
519 GNUNET_CONTAINER_DLL_insert_after (mig_head, mig_tail, mig_tail, mb);
526 #if DEBUG_FS_MIGRATION
527 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
528 "Preparing to push best content to peer\n");
530 if (GNUNET_YES == transmit_content (pos, mb))
531 break; /* 'mb' was freed! */
535 consider_gathering ();
540 * Task that is run periodically to obtain blocks for content
544 * @param tc scheduler context (also unused)
547 gather_migration_blocks (void *cls,
548 const struct GNUNET_SCHEDULER_TaskContext *tc)
550 mig_task = GNUNET_SCHEDULER_NO_TASK;
551 if (mig_size >= MAX_MIGRATION_QUEUE)
555 #if DEBUG_FS_MIGRATION
556 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
557 "Asking datastore for content for replication (queue size: %u)\n",
561 GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX,
562 GNUNET_TIME_UNIT_FOREVER_REL,
563 &process_migration_content, NULL);
565 consider_gathering ();
571 * A peer connected to us. Start pushing content
574 * @param peer handle for the peer that connected
577 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
579 struct MigrationReadyPeer *mrp;
581 if (GNUNET_YES != enabled)
583 mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
586 GNUNET_CONTAINER_DLL_insert (peer_head, peer_tail, mrp);
591 * A peer disconnected from us. Stop pushing content
594 * @param peer handle for the peer that disconnected
597 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
599 struct MigrationReadyPeer *pos;
604 if (pos->peer == peer)
606 GNUNET_CONTAINER_DLL_remove (peer_head, peer_tail, pos);
609 GSF_peer_transmit_cancel_ (pos->th);
612 if (NULL != pos->msg)
614 GNUNET_free (pos->msg);
632 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING");
633 if (GNUNET_YES != enabled)
637 GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY",
638 &min_migration_delay))
640 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
642 ("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
643 "MIN_MIGRATION_DELAY", "fs");
646 consider_gathering ();
651 * Shutdown the module.
656 if (GNUNET_SCHEDULER_NO_TASK != mig_task)
658 GNUNET_SCHEDULER_cancel (mig_task);
659 mig_task = GNUNET_SCHEDULER_NO_TASK;
663 GNUNET_DATASTORE_cancel (mig_qe);
666 while (NULL != mig_head)
667 delete_migration_block (mig_head);
668 GNUNET_assert (0 == mig_size);
671 /* end of gnunet-service-fs_push.c */