2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_push.c
23 * @brief API to push content from our datastore to other peers
24 * ('anonymous'-content P2P migration)
25 * @author Christian Grothoff
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
34 #define DEBUG_FS_MIGRATION GNUNET_EXTRA_LOGGING
37 * Maximum number of blocks we keep in memory for migration.
39 #define MAX_MIGRATION_QUEUE 8
42 * Blocks are at most migrated to this number of peers
43 * plus one, each time they are fetched from the database.
45 #define MIGRATION_LIST_SIZE 2
48 * How long must content remain valid for us to consider it for migration?
49 * If content will expire too soon, there is clearly no point in pushing
50 * it to other peers. This value gives the threshold for migration. Note
51 * that if this value is increased, the migration testcase may need to be
52 * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
54 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
58 * Block that is ready for migration to other peers. Actual data is at the end of the block.
60 struct MigrationReadyBlock
64 * This is a doubly-linked list.
66 struct MigrationReadyBlock *next;
69 * This is a doubly-linked list.
71 struct MigrationReadyBlock *prev;
74 * Query for the block.
76 GNUNET_HashCode query;
79 * When does this block expire?
81 struct GNUNET_TIME_Absolute expiration;
84 * Peers we already forwarded this
85 * block to. Zero for empty entries.
87 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
95 * Number of targets already used.
97 unsigned int used_targets;
102 enum GNUNET_BLOCK_Type type;
107 * Information about a peer waiting for
110 struct MigrationReadyPeer
113 * This is a doubly-linked list.
115 struct MigrationReadyPeer *next;
118 * This is a doubly-linked list.
120 struct MigrationReadyPeer *prev;
125 struct GSF_ConnectedPeer *peer;
128 * Handle for current transmission request,
131 struct GSF_PeerTransmitHandle *th;
134 * Message we are trying to push right now (or NULL)
136 struct PutMessage *msg;
141 * Head of linked list of blocks that can be migrated.
143 static struct MigrationReadyBlock *mig_head;
146 * Tail of linked list of blocks that can be migrated.
148 static struct MigrationReadyBlock *mig_tail;
151 * Head of linked list of peers.
153 static struct MigrationReadyPeer *peer_head;
156 * Tail of linked list of peers.
158 static struct MigrationReadyPeer *peer_tail;
161 * Request to datastore for migration (or NULL).
163 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
166 * ID of task that collects blocks for migration.
168 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
171 * What is the maximum frequency at which we are allowed to
172 * poll the datastore for migration content?
174 static struct GNUNET_TIME_Relative min_migration_delay;
177 * Size of the doubly-linked list of migration blocks.
179 static unsigned int mig_size;
182 * Is this module enabled?
188 * Delete the given migration block.
190 * @param mb block to delete
193 delete_migration_block (struct MigrationReadyBlock *mb)
195 GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb);
196 GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE);
203 * Find content for migration to this peer.
206 find_content (struct MigrationReadyPeer *mrp);
210 * Transmit the message currently scheduled for
213 * @param cls the 'struct MigrationReadyPeer'
214 * @param buf_size number of bytes available in buf
215 * @param buf where to copy the message, NULL on error (peer disconnect)
216 * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
219 transmit_message (void *cls, size_t buf_size, void *buf)
221 struct MigrationReadyPeer *peer = cls;
222 struct PutMessage *msg;
230 #if DEBUG_FS_MIGRATION
231 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
232 "Failed to migrate content to another peer (disconnect)\n");
237 msize = ntohs (msg->header.size);
238 GNUNET_assert (msize <= buf_size);
239 memcpy (buf, msg, msize);
241 #if DEBUG_FS_MIGRATION
242 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pushing %u bytes to another peer\n",
251 * Send the given block to the given peer.
253 * @param peer target peer
254 * @param block the block
255 * @return GNUNET_YES if the block was deleted (!)
258 transmit_content (struct MigrationReadyPeer *peer,
259 struct MigrationReadyBlock *block)
262 struct PutMessage *msg;
264 struct GSF_PeerPerformanceData *ppd;
267 ppd = GSF_get_peer_performance_data_ (peer->peer);
268 GNUNET_assert (NULL == peer->th);
269 msize = sizeof (struct PutMessage) + block->size;
270 msg = GNUNET_malloc (msize);
271 msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
272 msg->header.size = htons (msize);
273 msg->type = htonl (block->type);
274 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
275 memcpy (&msg[1], &block[1], block->size);
277 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
279 if (block->target_list[i] == 0)
281 block->target_list[i] = ppd->pid;
282 GNUNET_PEER_change_rc (block->target_list[i], 1);
286 if (MIGRATION_LIST_SIZE == i)
288 delete_migration_block (block);
295 #if DEBUG_FS_MIGRATION
296 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
297 "Asking for transmission of %u bytes for migration\n", msize);
299 peer->th = GSF_peer_transmit_ (peer->peer, GNUNET_NO, 0 /* priority */ ,
300 GNUNET_TIME_UNIT_FOREVER_REL, msize,
301 &transmit_message, peer);
307 * Count the number of peers this block has
308 * already been forwarded to.
310 * @param block the block
311 * @return number of times block was forwarded
314 count_targets (struct MigrationReadyBlock *block)
318 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
319 if (block->target_list[i] == 0)
326 * Check if sending this block to this peer would
329 * @param peer target peer
330 * @param block the block
331 * @return score (>= 0: feasible, negative: infeasible)
334 score_content (struct MigrationReadyPeer *peer,
335 struct MigrationReadyBlock *block)
338 struct GSF_PeerPerformanceData *ppd;
339 struct GNUNET_PeerIdentity id;
342 ppd = GSF_get_peer_performance_data_ (peer->peer);
343 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
344 if (block->target_list[i] == ppd->pid)
346 GNUNET_assert (0 != ppd->pid);
347 GNUNET_PEER_resolve (ppd->pid, &id);
348 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &id.hashPubKey);
349 /* closer distance, higher score: */
350 return UINT32_MAX - dist;
355 * If the migration task is not currently running, consider
356 * (re)scheduling it with the appropriate delay.
359 consider_gathering (void);
363 * Find content for migration to this peer.
365 * @param mrp peer to find content for
368 find_content (struct MigrationReadyPeer *mrp)
370 struct MigrationReadyBlock *pos;
373 struct MigrationReadyBlock *best;
375 GNUNET_assert (NULL == mrp->th);
381 score = score_content (mrp, pos);
382 if (score > best_score)
391 if (mig_size < MAX_MIGRATION_QUEUE)
393 #if DEBUG_FS_MIGRATION
394 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
395 "No content found for pushing, waiting for queue to fill\n");
397 return; /* will fill up eventually... */
399 #if DEBUG_FS_MIGRATION
400 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
401 "No suitable content found, purging content from full queue\n");
403 /* failed to find migration target AND
404 * queue is full, purge most-forwarded
405 * block from queue to make room for more */
409 score = count_targets (pos);
410 if (score >= best_score)
417 GNUNET_assert (NULL != best);
418 delete_migration_block (best);
419 consider_gathering ();
422 #if DEBUG_FS_MIGRATION
423 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
424 "Preparing to push best content to peer\n");
426 transmit_content (mrp, best);
431 * Task that is run periodically to obtain blocks for content
435 * @param tc scheduler context (also unused)
438 gather_migration_blocks (void *cls,
439 const struct GNUNET_SCHEDULER_TaskContext *tc);
443 * If the migration task is not currently running, consider
444 * (re)scheduling it with the appropriate delay.
447 consider_gathering ()
449 struct GNUNET_TIME_Relative delay;
455 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
457 if (mig_size >= MAX_MIGRATION_QUEUE)
459 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size);
460 delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE);
461 delay = GNUNET_TIME_relative_max (delay, min_migration_delay);
462 #if DEBUG_FS_MIGRATION
463 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
464 "Scheduling gathering task (queue size: %u)\n", mig_size);
467 GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL);
472 * Process content offered for migration.
475 * @param key key for the content
476 * @param size number of bytes in data
477 * @param data content stored
478 * @param type type of the content
479 * @param priority priority of the content
480 * @param anonymity anonymity-level for the content
481 * @param expiration expiration time for the content
482 * @param uid unique identifier for the datum;
483 * maybe 0 if no unique identifier is available
486 process_migration_content (void *cls, const GNUNET_HashCode * key, size_t size,
487 const void *data, enum GNUNET_BLOCK_Type type,
488 uint32_t priority, uint32_t anonymity,
489 struct GNUNET_TIME_Absolute expiration, uint64_t uid)
491 struct MigrationReadyBlock *mb;
492 struct MigrationReadyPeer *pos;
497 #if DEBUG_FS_MIGRATION
498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for migration...\n");
500 consider_gathering ();
503 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
504 MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
506 /* content will expire soon, don't bother */
507 consider_gathering ();
510 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
513 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
514 anonymity, expiration, uid,
515 &process_migration_content, NULL))
516 consider_gathering ();
519 #if DEBUG_FS_MIGRATION
520 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521 "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
522 GNUNET_h2s (key), type, mig_size + 1, MAX_MIGRATION_QUEUE);
524 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
526 mb->expiration = expiration;
529 memcpy (&mb[1], data, size);
530 GNUNET_CONTAINER_DLL_insert_after (mig_head, mig_tail, mig_tail, mb);
537 #if DEBUG_FS_MIGRATION
538 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
539 "Preparing to push best content to peer\n");
541 if (GNUNET_YES == transmit_content (pos, mb))
542 break; /* 'mb' was freed! */
546 consider_gathering ();
551 * Task that is run periodically to obtain blocks for content
555 * @param tc scheduler context (also unused)
558 gather_migration_blocks (void *cls,
559 const struct GNUNET_SCHEDULER_TaskContext *tc)
561 mig_task = GNUNET_SCHEDULER_NO_TASK;
562 if (mig_size >= MAX_MIGRATION_QUEUE)
566 #if DEBUG_FS_MIGRATION
567 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
568 "Asking datastore for content for replication (queue size: %u)\n",
572 GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX,
573 GNUNET_TIME_UNIT_FOREVER_REL,
574 &process_migration_content, NULL);
576 consider_gathering ();
582 * A peer connected to us. Start pushing content
585 * @param peer handle for the peer that connected
588 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
590 struct MigrationReadyPeer *mrp;
592 if (GNUNET_YES != enabled)
594 mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
597 GNUNET_CONTAINER_DLL_insert (peer_head, peer_tail, mrp);
602 * A peer disconnected from us. Stop pushing content
605 * @param peer handle for the peer that disconnected
608 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
610 struct MigrationReadyPeer *pos;
615 if (pos->peer == peer)
617 GNUNET_CONTAINER_DLL_remove (peer_head, peer_tail, pos);
620 GSF_peer_transmit_cancel_ (pos->th);
623 if (NULL != pos->msg)
625 GNUNET_free (pos->msg);
643 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING");
644 if (GNUNET_YES != enabled)
648 GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY",
649 &min_migration_delay))
651 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
653 ("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
654 "MIN_MIGRATION_DELAY", "fs");
657 consider_gathering ();
662 * Shutdown the module.
667 if (GNUNET_SCHEDULER_NO_TASK != mig_task)
669 GNUNET_SCHEDULER_cancel (mig_task);
670 mig_task = GNUNET_SCHEDULER_NO_TASK;
674 GNUNET_DATASTORE_cancel (mig_qe);
677 while (NULL != mig_head)
678 delete_migration_block (mig_head);
679 GNUNET_assert (0 == mig_size);
682 /* end of gnunet-service-fs_push.c */