2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_push.c
23 * @brief API to push content from our datastore to other peers
24 * ('anonymous'-content P2P migration)
25 * @author Christian Grothoff
28 #include "gnunet-service-fs_push.h"
31 /* FIXME: below are only old code fragments to use... */
34 * Block that is ready for migration to other peers. Actual data is at the end of the block.
36 struct MigrationReadyBlock
40 * This is a doubly-linked list.
42 struct MigrationReadyBlock *next;
45 * This is a doubly-linked list.
47 struct MigrationReadyBlock *prev;
50 * Query for the block.
52 GNUNET_HashCode query;
55 * When does this block expire?
57 struct GNUNET_TIME_Absolute expiration;
60 * Peers we would consider forwarding this
61 * block to. Zero for empty entries.
63 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
71 * Number of targets already used.
73 unsigned int used_targets;
78 enum GNUNET_BLOCK_Type type;
83 * Head of linked list of blocks that can be migrated.
85 static struct MigrationReadyBlock *mig_head;
88 * Tail of linked list of blocks that can be migrated.
90 static struct MigrationReadyBlock *mig_tail;
93 * Request to datastore for migration (or NULL).
95 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
98 * ID of task that collects blocks for migration.
100 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
103 * What is the maximum frequency at which we are allowed to
104 * poll the datastore for migration content?
106 static struct GNUNET_TIME_Relative min_migration_delay;
109 * Are we allowed to push out content from this peer.
111 static int active_from_migration;
114 * Size of the doubly-linked list of migration blocks.
116 static unsigned int mig_size;
120 * Delete the given migration block.
122 * @param mb block to delete
125 delete_migration_block (struct MigrationReadyBlock *mb)
127 GNUNET_CONTAINER_DLL_remove (mig_head,
130 GNUNET_PEER_decrement_rcs (mb->target_list,
131 MIGRATION_LIST_SIZE);
138 * Compare the distance of two peers to a key.
141 * @param p1 first peer
142 * @param p2 second peer
143 * @return GNUNET_YES if P1 is closer to key than P2
146 is_closer (const GNUNET_HashCode *key,
147 const struct GNUNET_PeerIdentity *p1,
148 const struct GNUNET_PeerIdentity *p2)
150 return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
157 * Consider migrating content to a given peer.
159 * @param cls 'struct MigrationReadyBlock*' to select
160 * targets for (or NULL for none)
161 * @param key ID of the peer
162 * @param value 'struct ConnectedPeer' of the peer
163 * @return GNUNET_YES (always continue iteration)
166 consider_migration (void *cls,
167 const GNUNET_HashCode *key,
170 struct MigrationReadyBlock *mb = cls;
171 struct ConnectedPeer *cp = value;
172 struct MigrationReadyBlock *pos;
173 struct GNUNET_PeerIdentity cppid;
174 struct GNUNET_PeerIdentity otherpid;
175 struct GNUNET_PeerIdentity worstpid;
180 /* consider 'cp' as a migration target for mb */
181 if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
182 return GNUNET_YES; /* peer has requested no migration! */
185 GNUNET_PEER_resolve (cp->pid,
187 repl = MIGRATION_LIST_SIZE;
188 for (i=0;i<MIGRATION_LIST_SIZE;i++)
190 if (mb->target_list[i] == 0)
192 mb->target_list[i] = cp->pid;
193 GNUNET_PEER_change_rc (mb->target_list[i], 1);
194 repl = MIGRATION_LIST_SIZE;
197 GNUNET_PEER_resolve (mb->target_list[i],
199 if ( (repl == MIGRATION_LIST_SIZE) &&
200 is_closer (&mb->query,
207 else if ( (repl != MIGRATION_LIST_SIZE) &&
208 (is_closer (&mb->query,
216 if (repl != MIGRATION_LIST_SIZE)
218 GNUNET_PEER_change_rc (mb->target_list[repl], -1);
219 mb->target_list[repl] = cp->pid;
220 GNUNET_PEER_change_rc (mb->target_list[repl], 1);
224 /* consider scheduling transmission to cp for content migration */
231 for (i=0;i<MIGRATION_LIST_SIZE;i++)
233 if (cp->pid == pos->target_list[i])
238 msize = GNUNET_MIN (msize,
246 return GNUNET_YES; /* no content available */
248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
249 "Trying to migrate at least %u bytes to peer `%s'\n",
253 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
255 GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
256 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
259 = GNUNET_CORE_notify_transmit_ready (core,
260 0, GNUNET_TIME_UNIT_FOREVER_REL,
261 (const struct GNUNET_PeerIdentity*) key,
262 msize + sizeof (struct PutMessage),
270 * Task that is run periodically to obtain blocks for content
274 * @param tc scheduler context (also unused)
277 gather_migration_blocks (void *cls,
278 const struct GNUNET_SCHEDULER_TaskContext *tc);
284 * If the migration task is not currently running, consider
285 * (re)scheduling it with the appropriate delay.
288 consider_migration_gathering ()
290 struct GNUNET_TIME_Relative delay;
296 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
298 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
300 delay = GNUNET_TIME_relative_divide (delay,
301 MAX_MIGRATION_QUEUE);
302 delay = GNUNET_TIME_relative_max (delay,
303 min_migration_delay);
304 mig_task = GNUNET_SCHEDULER_add_delayed (delay,
305 &gather_migration_blocks,
313 * Process content offered for migration.
316 * @param key key for the content
317 * @param size number of bytes in data
318 * @param data content stored
319 * @param type type of the content
320 * @param priority priority of the content
321 * @param anonymity anonymity-level for the content
322 * @param expiration expiration time for the content
323 * @param uid unique identifier for the datum;
324 * maybe 0 if no unique identifier is available
327 process_migration_content (void *cls,
328 const GNUNET_HashCode * key,
331 enum GNUNET_BLOCK_Type type,
334 struct GNUNET_TIME_Absolute
335 expiration, uint64_t uid)
337 struct MigrationReadyBlock *mb;
342 if (mig_size < MAX_MIGRATION_QUEUE)
343 consider_migration_gathering ();
346 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
347 MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
349 /* content will expire soon, don't bother */
350 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
353 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
356 GNUNET_FS_handle_on_demand_block (key, size, data,
357 type, priority, anonymity,
359 &process_migration_content,
362 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368 "Retrieved block `%s' of type %u for migration\n",
372 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
374 mb->expiration = expiration;
377 memcpy (&mb[1], data, size);
378 GNUNET_CONTAINER_DLL_insert_after (mig_head,
383 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
386 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
392 * Task that is run periodically to obtain blocks for content
396 * @param tc scheduler context (also unused)
399 gather_migration_blocks (void *cls,
400 const struct GNUNET_SCHEDULER_TaskContext *tc)
402 mig_task = GNUNET_SCHEDULER_NO_TASK;
405 mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
406 GNUNET_TIME_UNIT_FOREVER_REL,
407 &process_migration_content, NULL);
408 GNUNET_assert (mig_qe != NULL);
416 size_t size, void *buf)
419 while (NULL != (mb = next))
422 for (i=0;i<MIGRATION_LIST_SIZE;i++)
424 if ( (cp->pid == mb->target_list[i]) &&
425 (mb->size + sizeof (migm) <= size) )
427 GNUNET_PEER_change_rc (mb->target_list[i], -1);
428 mb->target_list[i] = 0;
430 memset (&migm, 0, sizeof (migm));
431 migm.header.size = htons (sizeof (migm) + mb->size);
432 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
433 migm.type = htonl (mb->type);
434 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
435 memcpy (&cbuf[msize], &migm, sizeof (migm));
436 msize += sizeof (migm);
437 size -= sizeof (migm);
438 memcpy (&cbuf[msize], &mb[1], mb->size);
442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443 "Pushing migration block `%s' (%u bytes) to `%s'\n",
444 GNUNET_h2s (&mb->query),
445 (unsigned int) mb->size,
453 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
454 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
455 GNUNET_h2s (&mb->query),
456 (unsigned int) mb->size,
461 if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
462 (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
464 delete_migration_block (mb);
465 consider_migration_gathering ();
468 consider_migration (NULL,