2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_push.c
23 * @brief API to push content from our datastore to other peers
24 * ('anonymous'-content P2P migration)
25 * @author Christian Grothoff
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
35 * How long must content remain valid for us to consider it for migration?
36 * If content will expire too soon, there is clearly no point in pushing
37 * it to other peers. This value gives the threshold for migration. Note
38 * that if this value is increased, the migration testcase may need to be
39 * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
41 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
45 * Block that is ready for migration to other peers. Actual data is at the end of the block.
47 struct MigrationReadyBlock
51 * This is a doubly-linked list.
53 struct MigrationReadyBlock *next;
56 * This is a doubly-linked list.
58 struct MigrationReadyBlock *prev;
61 * Query for the block.
63 GNUNET_HashCode query;
66 * When does this block expire?
68 struct GNUNET_TIME_Absolute expiration;
71 * Peers we already forwarded this
72 * block to. Zero for empty entries.
74 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
82 * Number of targets already used.
84 unsigned int used_targets;
89 enum GNUNET_BLOCK_Type type;
94 * Information about a peer waiting for
97 struct MigrationReadyPeer
100 * This is a doubly-linked list.
102 struct MigrationReadyPeer *next;
105 * This is a doubly-linked list.
107 struct MigrationReadyPeer *prev;
112 struct GSF_ConnectedPeer *peer;
115 * Handle for current transmission request,
118 struct GSF_PeerTransmitHandle *th;
121 * Message we are trying to push right now (or NULL)
123 struct PutMessage *msg;
128 * Head of linked list of blocks that can be migrated.
130 static struct MigrationReadyBlock *mig_head;
133 * Tail of linked list of blocks that can be migrated.
135 static struct MigrationReadyBlock *mig_tail;
138 * Head of linked list of peers.
140 static struct MigrationReadyPeer *peer_head;
143 * Tail of linked list of peers.
145 static struct MigrationReadyPeer *peer_tail;
148 * Request to datastore for migration (or NULL).
150 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
153 * ID of task that collects blocks for migration.
155 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
158 * What is the maximum frequency at which we are allowed to
159 * poll the datastore for migration content?
161 static struct GNUNET_TIME_Relative min_migration_delay;
164 * Size of the doubly-linked list of migration blocks.
166 static unsigned int mig_size;
169 * Is this module enabled?
175 * Delete the given migration block.
177 * @param mb block to delete
180 delete_migration_block (struct MigrationReadyBlock *mb)
182 GNUNET_CONTAINER_DLL_remove (mig_head,
185 GNUNET_PEER_decrement_rcs (mb->target_list,
186 MIGRATION_LIST_SIZE);
193 * Find content for migration to this peer.
196 find_content (struct MigrationReadyPeer *mrp);
200 * Transmit the message currently scheduled for
203 * @param cls the 'struct MigrationReadyPeer'
204 * @param buf_size number of bytes available in buf
205 * @param buf where to copy the message, NULL on error (peer disconnect)
206 * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
209 transmit_message (void *cls,
213 struct MigrationReadyPeer *peer = cls;
214 struct PutMessage *msg;
225 msize = ntohs (msg->header.size);
226 GNUNET_assert (msize <= buf_size);
227 memcpy (buf, msg, msize);
230 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
231 "Pushing %u bytes to another peer\n",
240 * Send the given block to the given peer.
242 * @param peer target peer
243 * @param block the block
244 * @return GNUNET_YES if the block was deleted (!)
247 transmit_content (struct MigrationReadyPeer *peer,
248 struct MigrationReadyBlock *block)
251 struct PutMessage *msg;
253 struct GSF_PeerPerformanceData *ppd;
256 ppd = GSF_get_peer_performance_data_ (peer->peer);
257 GNUNET_assert (NULL == peer->th);
258 msize = sizeof (struct PutMessage) + block->size;
259 msg = GNUNET_malloc (msize);
260 msg->header.type = htons (42);
261 msg->header.size = htons (msize);
267 for (i=0;i<MIGRATION_LIST_SIZE;i++)
269 if (block->target_list[i] == 0)
271 block->target_list[i] = ppd->pid;
272 GNUNET_PEER_change_rc (block->target_list[i], 1);
276 if (MIGRATION_LIST_SIZE == i)
278 delete_migration_block (block);
285 peer->th = GSF_peer_transmit_ (peer->peer,
288 GNUNET_TIME_UNIT_FOREVER_REL,
297 * Count the number of peers this block has
298 * already been forwarded to.
300 * @param block the block
301 * @return number of times block was forwarded
304 count_targets (struct MigrationReadyBlock *block)
308 for (i=0;i<MIGRATION_LIST_SIZE;i++)
309 if (block->target_list[i] == 0)
316 * Check if sending this block to this peer would
319 * @param peer target peer
320 * @param block the block
321 * @return score (>= 0: feasible, negative: infeasible)
324 score_content (struct MigrationReadyPeer *peer,
325 struct MigrationReadyBlock *block)
328 struct GSF_PeerPerformanceData *ppd;
329 struct GNUNET_PeerIdentity id;
332 ppd = GSF_get_peer_performance_data_ (peer->peer);
333 for (i=0;i<MIGRATION_LIST_SIZE;i++)
334 if (block->target_list[i] == ppd->pid)
336 GNUNET_PEER_resolve (ppd->pid,
338 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
340 /* closer distance, higher score: */
341 return UINT32_MAX - dist;
346 * If the migration task is not currently running, consider
347 * (re)scheduling it with the appropriate delay.
350 consider_gathering (void);
354 * Find content for migration to this peer.
356 * @param mrp peer to find content for
359 find_content (struct MigrationReadyPeer *mrp)
361 struct MigrationReadyBlock *pos;
364 struct MigrationReadyBlock *best;
366 GNUNET_assert (NULL == mrp->th);
372 score = score_content (mrp, pos);
373 if (score > best_score)
382 if (mig_size < MAX_MIGRATION_QUEUE)
385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386 "No content found for pushing, waiting for queue to fill\n");
388 return; /* will fill up eventually... */
391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392 "No suitable content found, purging content from full queue\n");
394 /* failed to find migration target AND
395 queue is full, purge most-forwarded
396 block from queue to make room for more */
401 score = count_targets (pos);
402 if (score >= best_score)
409 GNUNET_assert (NULL != best);
410 delete_migration_block (best);
411 consider_gathering ();
415 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
416 "Preparing to push best content to peer\n");
418 transmit_content (mrp, best);
423 * Task that is run periodically to obtain blocks for content
427 * @param tc scheduler context (also unused)
430 gather_migration_blocks (void *cls,
431 const struct GNUNET_SCHEDULER_TaskContext *tc);
435 * If the migration task is not currently running, consider
436 * (re)scheduling it with the appropriate delay.
439 consider_gathering ()
441 struct GNUNET_TIME_Relative delay;
447 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
449 if (mig_size >= MAX_MIGRATION_QUEUE)
451 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
453 delay = GNUNET_TIME_relative_divide (delay,
454 MAX_MIGRATION_QUEUE);
455 delay = GNUNET_TIME_relative_max (delay,
456 min_migration_delay);
457 mig_task = GNUNET_SCHEDULER_add_delayed (delay,
458 &gather_migration_blocks,
464 * Process content offered for migration.
467 * @param key key for the content
468 * @param size number of bytes in data
469 * @param data content stored
470 * @param type type of the content
471 * @param priority priority of the content
472 * @param anonymity anonymity-level for the content
473 * @param expiration expiration time for the content
474 * @param uid unique identifier for the datum;
475 * maybe 0 if no unique identifier is available
478 process_migration_content (void *cls,
479 const GNUNET_HashCode * key,
482 enum GNUNET_BLOCK_Type type,
485 struct GNUNET_TIME_Absolute
486 expiration, uint64_t uid)
488 struct MigrationReadyBlock *mb;
489 struct MigrationReadyPeer *pos;
494 consider_gathering ();
497 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
498 MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
500 /* content will expire soon, don't bother */
501 GNUNET_DATASTORE_get_next (GSF_dsh);
504 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
507 GNUNET_FS_handle_on_demand_block (key, size, data,
508 type, priority, anonymity,
510 &process_migration_content,
513 GNUNET_DATASTORE_get_next (GSF_dsh);
518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519 "Retrieved block `%s' of type %u for migration\n",
523 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
525 mb->expiration = expiration;
528 memcpy (&mb[1], data, size);
529 GNUNET_CONTAINER_DLL_insert_after (mig_head,
540 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
541 "Preparing to push best content to peer\n");
543 if (GNUNET_YES == transmit_content (pos, mb))
544 break; /* 'mb' was freed! */
548 GNUNET_DATASTORE_get_next (GSF_dsh);
553 * Task that is run periodically to obtain blocks for content
557 * @param tc scheduler context (also unused)
560 gather_migration_blocks (void *cls,
561 const struct GNUNET_SCHEDULER_TaskContext *tc)
563 mig_task = GNUNET_SCHEDULER_NO_TASK;
566 mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh,
568 GNUNET_TIME_UNIT_FOREVER_REL,
569 &process_migration_content, NULL);
570 GNUNET_assert (mig_qe != NULL);
576 * A peer connected to us. Start pushing content
579 * @param peer handle for the peer that connected
582 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
584 struct MigrationReadyPeer *mrp;
586 if (GNUNET_YES != enabled)
588 mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
591 GNUNET_CONTAINER_DLL_insert (peer_head,
598 * A peer disconnected from us. Stop pushing content
601 * @param peer handle for the peer that disconnected
604 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
606 struct MigrationReadyPeer *pos;
611 if (pos->peer == peer)
613 GNUNET_CONTAINER_DLL_remove (peer_head,
617 GSF_peer_transmit_cancel_ (pos->th);
632 enabled = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
635 if (GNUNET_YES != enabled)
639 GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
641 "MIN_MIGRATION_DELAY",
642 &min_migration_delay))
644 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
645 _("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
646 "MIN_MIGRATION_DELAY",
650 consider_gathering ();
655 * Shutdown the module.
660 if (GNUNET_SCHEDULER_NO_TASK != mig_task)
662 GNUNET_SCHEDULER_cancel (mig_task);
663 mig_task = GNUNET_SCHEDULER_NO_TASK;
667 GNUNET_DATASTORE_cancel (mig_qe);
670 while (NULL != mig_head)
671 delete_migration_block (mig_head);
672 GNUNET_assert (0 == mig_size);
675 /* end of gnunet-service-fs_push.c */