2 This file is part of GNUnet.
3 Copyright (C) 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_push.c
23 * @brief API to push content from our datastore to other peers
24 * ('anonymous'-content P2P migration)
25 * @author Christian Grothoff
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
35 * Maximum number of blocks we keep in memory for migration.
37 #define MAX_MIGRATION_QUEUE 8
40 * Blocks are at most migrated to this number of peers
41 * plus one, each time they are fetched from the database.
43 #define MIGRATION_LIST_SIZE 2
46 * How long must content remain valid for us to consider it for migration?
47 * If content will expire too soon, there is clearly no point in pushing
48 * it to other peers. This value gives the threshold for migration. Note
49 * that if this value is increased, the migration testcase may need to be
50 * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
56 * Block that is ready for migration to other peers. Actual data is at the end of the block.
58 struct MigrationReadyBlock
62 * This is a doubly-linked list.
64 struct MigrationReadyBlock *next;
67 * This is a doubly-linked list.
69 struct MigrationReadyBlock *prev;
72 * Query for the block.
74 struct GNUNET_HashCode query;
77 * When does this block expire?
79 struct GNUNET_TIME_Absolute expiration;
82 * Peers we already forwarded this
83 * block to. Zero for empty entries.
85 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
93 * Number of targets already used.
95 unsigned int used_targets;
100 enum GNUNET_BLOCK_Type type;
105 * Information about a peer waiting for
108 struct MigrationReadyPeer
111 * This is a doubly-linked list.
113 struct MigrationReadyPeer *next;
116 * This is a doubly-linked list.
118 struct MigrationReadyPeer *prev;
123 struct GSF_ConnectedPeer *peer;
126 * Handle for current transmission request,
129 struct GSF_PeerTransmitHandle *th;
132 * Message we are trying to push right now (or NULL)
134 struct PutMessage *msg;
139 * Head of linked list of blocks that can be migrated.
141 static struct MigrationReadyBlock *mig_head;
144 * Tail of linked list of blocks that can be migrated.
146 static struct MigrationReadyBlock *mig_tail;
149 * Head of linked list of peers.
151 static struct MigrationReadyPeer *peer_head;
154 * Tail of linked list of peers.
156 static struct MigrationReadyPeer *peer_tail;
159 * Request to datastore for migration (or NULL).
161 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
164 * ID of task that collects blocks for migration.
166 static struct GNUNET_SCHEDULER_Task * mig_task;
169 * What is the maximum frequency at which we are allowed to
170 * poll the datastore for migration content?
172 static struct GNUNET_TIME_Relative min_migration_delay;
175 * Size of the doubly-linked list of migration blocks.
177 static unsigned int mig_size;
180 * Is this module enabled?
185 * Did we find anything in the datastore?
187 static int value_found;
191 * Delete the given migration block.
193 * @param mb block to delete
196 delete_migration_block (struct MigrationReadyBlock *mb)
198 GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb);
199 GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE);
206 * Find content for migration to this peer.
209 find_content (struct MigrationReadyPeer *mrp);
213 * Transmit the message currently scheduled for transmission.
215 * @param cls the `struct MigrationReadyPeer`
216 * @param buf_size number of bytes available in @a buf
217 * @param buf where to copy the message, NULL on error (peer disconnect)
218 * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
221 transmit_message (void *cls,
225 struct MigrationReadyPeer *peer = cls;
226 struct PutMessage *msg;
234 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
235 "Failed to migrate content to another peer (disconnect)\n");
239 msize = ntohs (msg->header.size);
240 GNUNET_assert (msize <= buf_size);
241 memcpy (buf, msg, msize);
243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244 "Pushing %u bytes to %s\n",
246 GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
253 * Send the given block to the given peer.
255 * @param peer target peer
256 * @param block the block
257 * @return #GNUNET_YES if the block was deleted (!)
260 transmit_content (struct MigrationReadyPeer *peer,
261 struct MigrationReadyBlock *block)
264 struct PutMessage *msg;
266 struct GSF_PeerPerformanceData *ppd;
269 ppd = GSF_get_peer_performance_data_ (peer->peer);
270 GNUNET_assert (NULL == peer->th);
271 msize = sizeof (struct PutMessage) + block->size;
272 msg = GNUNET_malloc (msize);
273 msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
274 msg->header.size = htons (msize);
275 msg->type = htonl (block->type);
276 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
277 memcpy (&msg[1], &block[1], block->size);
279 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
281 if (block->target_list[i] == 0)
283 block->target_list[i] = ppd->pid;
284 GNUNET_PEER_change_rc (block->target_list[i], 1);
288 if (MIGRATION_LIST_SIZE == i)
290 delete_migration_block (block);
297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
298 "Asking for transmission of %u bytes to %s for migration\n",
300 GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
301 peer->th = GSF_peer_transmit_ (peer->peer,
302 GNUNET_NO, 0 /* priority */ ,
303 GNUNET_TIME_UNIT_FOREVER_REL,
305 &transmit_message, peer);
311 * Count the number of peers this block has
312 * already been forwarded to.
314 * @param block the block
315 * @return number of times block was forwarded
318 count_targets (struct MigrationReadyBlock *block)
322 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
323 if (block->target_list[i] == 0)
330 * Check if sending this block to this peer would
333 * @param peer target peer
334 * @param block the block
335 * @return score (>= 0: feasible, negative: infeasible)
338 score_content (struct MigrationReadyPeer *peer,
339 struct MigrationReadyBlock *block)
342 struct GSF_PeerPerformanceData *ppd;
343 struct GNUNET_PeerIdentity id;
344 struct GNUNET_HashCode hc;
347 ppd = GSF_get_peer_performance_data_ (peer->peer);
348 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
349 if (block->target_list[i] == ppd->pid)
351 GNUNET_assert (0 != ppd->pid);
352 GNUNET_PEER_resolve (ppd->pid, &id);
353 GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc);
354 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc);
355 /* closer distance, higher score: */
356 return UINT32_MAX - dist;
361 * If the migration task is not currently running, consider
362 * (re)scheduling it with the appropriate delay.
365 consider_gathering (void);
369 * Find content for migration to this peer.
371 * @param mrp peer to find content for
374 find_content (struct MigrationReadyPeer *mrp)
376 struct MigrationReadyBlock *pos;
379 struct MigrationReadyBlock *best;
381 GNUNET_assert (NULL == mrp->th);
387 score = score_content (mrp, pos);
388 if (score > best_score)
397 if (mig_size < MAX_MIGRATION_QUEUE)
399 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
400 "No content found for pushing, waiting for queue to fill\n");
401 return; /* will fill up eventually... */
403 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404 "No suitable content found, purging content from full queue\n");
405 /* failed to find migration target AND
406 * queue is full, purge most-forwarded
407 * block from queue to make room for more */
411 score = count_targets (pos);
412 if (score >= best_score)
419 GNUNET_assert (NULL != best);
420 delete_migration_block (best);
421 consider_gathering ();
424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425 "Preparing to push best content to peer\n");
426 transmit_content (mrp, best);
431 * Task that is run periodically to obtain blocks for content
435 * @param tc scheduler context (also unused)
438 gather_migration_blocks (void *cls,
439 const struct GNUNET_SCHEDULER_TaskContext *tc);
443 * If the migration task is not currently running, consider
444 * (re)scheduling it with the appropriate delay.
447 consider_gathering ()
449 struct GNUNET_TIME_Relative delay;
455 if (NULL != mig_task)
457 if (mig_size >= MAX_MIGRATION_QUEUE)
459 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size);
460 delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE);
461 delay = GNUNET_TIME_relative_max (delay, min_migration_delay);
462 if (GNUNET_NO == value_found)
464 /* wait at least 5s if the datastore is empty */
465 delay = GNUNET_TIME_relative_max (delay,
466 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
469 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
470 "Scheduling gathering task (queue size: %u)\n",
473 GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL);
478 * Process content offered for migration.
481 * @param key key for the content
482 * @param size number of bytes in data
483 * @param data content stored
484 * @param type type of the content
485 * @param priority priority of the content
486 * @param anonymity anonymity-level for the content
487 * @param expiration expiration time for the content
488 * @param uid unique identifier for the datum;
489 * maybe 0 if no unique identifier is available
492 process_migration_content (void *cls,
493 const struct GNUNET_HashCode *key,
496 enum GNUNET_BLOCK_Type type,
499 struct GNUNET_TIME_Absolute expiration,
502 struct MigrationReadyBlock *mb;
503 struct MigrationReadyPeer *pos;
508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
509 "No content found for migration...\n");
510 consider_gathering ();
513 value_found = GNUNET_YES;
514 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
515 MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
517 /* content will expire soon, don't bother */
518 consider_gathering ();
521 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
524 GNUNET_FS_handle_on_demand_block (key,
532 &process_migration_content, NULL))
533 consider_gathering ();
536 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
537 "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
540 MAX_MIGRATION_QUEUE);
541 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
543 mb->expiration = expiration;
546 memcpy (&mb[1], data, size);
547 GNUNET_CONTAINER_DLL_insert_after (mig_head,
552 for (pos = peer_head; NULL != pos; pos = pos->next)
556 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
557 "Preparing to push best content to peer %s\n",
558 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
559 if (GNUNET_YES == transmit_content (pos, mb))
560 break; /* 'mb' was freed! */
563 consider_gathering ();
568 * Task that is run periodically to obtain blocks for content
572 * @param tc scheduler context (also unused)
575 gather_migration_blocks (void *cls,
576 const struct GNUNET_SCHEDULER_TaskContext *tc)
579 if (mig_size >= MAX_MIGRATION_QUEUE)
583 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
584 "Asking datastore for content for replication (queue size: %u)\n",
586 value_found = GNUNET_NO;
588 GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX,
589 GNUNET_TIME_UNIT_FOREVER_REL,
590 &process_migration_content, NULL);
592 consider_gathering ();
597 * A peer connected to us. Start pushing content
600 * @param peer handle for the peer that connected
603 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
605 struct MigrationReadyPeer *mrp;
607 if (GNUNET_YES != enabled)
609 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
610 "Adding peer %s to list for pushing\n",
611 GNUNET_i2s (GSF_connected_peer_get_identity2_(peer)));
613 mrp = GNUNET_new (struct MigrationReadyPeer);
616 GNUNET_CONTAINER_DLL_insert (peer_head,
623 * A peer disconnected from us. Stop pushing content
626 * @param peer handle for the peer that disconnected
629 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
631 struct MigrationReadyPeer *pos;
633 for (pos = peer_head; NULL != pos; pos = pos->next)
634 if (pos->peer == peer)
638 GNUNET_CONTAINER_DLL_remove (peer_head,
643 GSF_peer_transmit_cancel_ (pos->th);
646 if (NULL != pos->msg)
648 GNUNET_free (pos->msg);
662 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING");
663 if (GNUNET_YES != enabled)
667 GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY",
668 &min_migration_delay))
670 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
671 "fs", "MIN_MIGRATION_DELAY",
672 _("time required, content pushing disabled"));
675 consider_gathering ();
680 * Shutdown the module.
685 if (NULL != mig_task)
687 GNUNET_SCHEDULER_cancel (mig_task);
692 GNUNET_DATASTORE_cancel (mig_qe);
695 while (NULL != mig_head)
696 delete_migration_block (mig_head);
697 GNUNET_assert (0 == mig_size);
700 /* end of gnunet-service-fs_push.c */