no includes on top level please
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs_push.h"
29
30
31 /* FIXME: below are only old code fragments to use... */
32
33 /**
34  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
35  */
36 struct MigrationReadyBlock
37 {
38
39   /**
40    * This is a doubly-linked list.
41    */
42   struct MigrationReadyBlock *next;
43
44   /**
45    * This is a doubly-linked list.
46    */
47   struct MigrationReadyBlock *prev;
48
49   /**
50    * Query for the block.
51    */
52   GNUNET_HashCode query;
53
54   /**
55    * When does this block expire? 
56    */
57   struct GNUNET_TIME_Absolute expiration;
58
59   /**
60    * Peers we would consider forwarding this
61    * block to.  Zero for empty entries.
62    */
63   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
64
65   /**
66    * Size of the block.
67    */
68   size_t size;
69
70   /**
71    *  Number of targets already used.
72    */
73   unsigned int used_targets;
74
75   /**
76    * Type of the block.
77    */
78   enum GNUNET_BLOCK_Type type;
79 };
80
81
82 /**
83  * Head of linked list of blocks that can be migrated.
84  */
85 static struct MigrationReadyBlock *mig_head;
86
87 /**
88  * Tail of linked list of blocks that can be migrated.
89  */
90 static struct MigrationReadyBlock *mig_tail;
91
92 /**
93  * Request to datastore for migration (or NULL).
94  */
95 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
96
97 /**
98  * ID of task that collects blocks for migration.
99  */
100 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
101
102 /**
103  * What is the maximum frequency at which we are allowed to
104  * poll the datastore for migration content?
105  */
106 static struct GNUNET_TIME_Relative min_migration_delay;
107
108 /**
109  * Are we allowed to push out content from this peer.
110  */
111 static int active_from_migration;
112
113 /**
114  * Size of the doubly-linked list of migration blocks.
115  */
116 static unsigned int mig_size;
117
118
119 /**
120  * Delete the given migration block.
121  *
122  * @param mb block to delete
123  */
124 static void
125 delete_migration_block (struct MigrationReadyBlock *mb)
126 {
127   GNUNET_CONTAINER_DLL_remove (mig_head,
128                                mig_tail,
129                                mb);
130   GNUNET_PEER_decrement_rcs (mb->target_list,
131                              MIGRATION_LIST_SIZE);
132   mig_size--;
133   GNUNET_free (mb);
134 }
135
136
137 /**
138  * Compare the distance of two peers to a key.
139  *
140  * @param key key
141  * @param p1 first peer
142  * @param p2 second peer
143  * @return GNUNET_YES if P1 is closer to key than P2
144  */
145 static int
146 is_closer (const GNUNET_HashCode *key,
147            const struct GNUNET_PeerIdentity *p1,
148            const struct GNUNET_PeerIdentity *p2)
149 {
150   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
151                                     &p2->hashPubKey,
152                                     key);
153 }
154
155
156 /**
157  * Consider migrating content to a given peer.
158  *
159  * @param cls 'struct MigrationReadyBlock*' to select
160  *            targets for (or NULL for none)
161  * @param key ID of the peer 
162  * @param value 'struct ConnectedPeer' of the peer
163  * @return GNUNET_YES (always continue iteration)
164  */
165 static int
166 consider_migration (void *cls,
167                     const GNUNET_HashCode *key,
168                     void *value)
169 {
170   struct MigrationReadyBlock *mb = cls;
171   struct ConnectedPeer *cp = value;
172   struct MigrationReadyBlock *pos;
173   struct GNUNET_PeerIdentity cppid;
174   struct GNUNET_PeerIdentity otherpid;
175   struct GNUNET_PeerIdentity worstpid;
176   size_t msize;
177   unsigned int i;
178   unsigned int repl;
179   
180   /* consider 'cp' as a migration target for mb */
181   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
182     return GNUNET_YES; /* peer has requested no migration! */
183   if (mb != NULL)
184     {
185       GNUNET_PEER_resolve (cp->pid,
186                            &cppid);
187       repl = MIGRATION_LIST_SIZE;
188       for (i=0;i<MIGRATION_LIST_SIZE;i++)
189         {
190           if (mb->target_list[i] == 0)
191             {
192               mb->target_list[i] = cp->pid;
193               GNUNET_PEER_change_rc (mb->target_list[i], 1);
194               repl = MIGRATION_LIST_SIZE;
195               break;
196             }
197           GNUNET_PEER_resolve (mb->target_list[i],
198                                &otherpid);
199           if ( (repl == MIGRATION_LIST_SIZE) &&
200                is_closer (&mb->query,
201                           &cppid,
202                           &otherpid)) 
203             {
204               repl = i;
205               worstpid = otherpid;
206             }
207           else if ( (repl != MIGRATION_LIST_SIZE) &&
208                     (is_closer (&mb->query,
209                                 &worstpid,
210                                 &otherpid) ) )
211             {
212               repl = i;
213               worstpid = otherpid;
214             }       
215         }
216       if (repl != MIGRATION_LIST_SIZE) 
217         {
218           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
219           mb->target_list[repl] = cp->pid;
220           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
221         }
222     }
223
224   /* consider scheduling transmission to cp for content migration */
225   if (cp->cth != NULL)        
226     return GNUNET_YES; 
227   msize = 0;
228   pos = mig_head;
229   while (pos != NULL)
230     {
231       for (i=0;i<MIGRATION_LIST_SIZE;i++)
232         {
233           if (cp->pid == pos->target_list[i])
234             {
235               if (msize == 0)
236                 msize = pos->size;
237               else
238                 msize = GNUNET_MIN (msize,
239                                     pos->size);
240               break;
241             }
242         }
243       pos = pos->next;
244     }
245   if (msize == 0)
246     return GNUNET_YES; /* no content available */
247 #if DEBUG_FS
248   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
249               "Trying to migrate at least %u bytes to peer `%s'\n",
250               msize,
251               GNUNET_h2s (key));
252 #endif
253   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
254     {
255       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
256       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
257     }
258   cp->cth 
259     = GNUNET_CORE_notify_transmit_ready (core,
260                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
261                                          (const struct GNUNET_PeerIdentity*) key,
262                                          msize + sizeof (struct PutMessage),
263                                          &transmit_to_peer,
264                                          cp);
265   return GNUNET_YES;
266 }
267
268
269 /**
270  * Task that is run periodically to obtain blocks for content
271  * migration
272  * 
273  * @param cls unused
274  * @param tc scheduler context (also unused)
275  */
276 static void
277 gather_migration_blocks (void *cls,
278                          const struct GNUNET_SCHEDULER_TaskContext *tc);
279
280
281
282
283 /**
284  * If the migration task is not currently running, consider
285  * (re)scheduling it with the appropriate delay.
286  */
287 static void
288 consider_migration_gathering ()
289 {
290   struct GNUNET_TIME_Relative delay;
291
292   if (dsh == NULL)
293     return;
294   if (mig_qe != NULL)
295     return;
296   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
297     return;
298   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
299                                          mig_size);
300   delay = GNUNET_TIME_relative_divide (delay,
301                                        MAX_MIGRATION_QUEUE);
302   delay = GNUNET_TIME_relative_max (delay,
303                                     min_migration_delay);
304   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
305                                            &gather_migration_blocks,
306                                            NULL);
307 }
308
309
310
311
312 /**
313  * Process content offered for migration.
314  *
315  * @param cls closure
316  * @param key key for the content
317  * @param size number of bytes in data
318  * @param data content stored
319  * @param type type of the content
320  * @param priority priority of the content
321  * @param anonymity anonymity-level for the content
322  * @param expiration expiration time for the content
323  * @param uid unique identifier for the datum;
324  *        maybe 0 if no unique identifier is available
325  */
326 static void
327 process_migration_content (void *cls,
328                            const GNUNET_HashCode * key,
329                            size_t size,
330                            const void *data,
331                            enum GNUNET_BLOCK_Type type,
332                            uint32_t priority,
333                            uint32_t anonymity,
334                            struct GNUNET_TIME_Absolute
335                            expiration, uint64_t uid)
336 {
337   struct MigrationReadyBlock *mb;
338   
339   if (key == NULL)
340     {
341       mig_qe = NULL;
342       if (mig_size < MAX_MIGRATION_QUEUE)  
343         consider_migration_gathering ();
344       return;
345     }
346   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < 
347       MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
348     {
349       /* content will expire soon, don't bother */
350       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
351       return;
352     }
353   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
354     {
355       if (GNUNET_OK !=
356           GNUNET_FS_handle_on_demand_block (key, size, data,
357                                             type, priority, anonymity,
358                                             expiration, uid, 
359                                             &process_migration_content,
360                                             NULL))
361         {
362           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
363         }
364       return;
365     }
366 #if DEBUG_FS
367   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368               "Retrieved block `%s' of type %u for migration\n",
369               GNUNET_h2s (key),
370               type);
371 #endif
372   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
373   mb->query = *key;
374   mb->expiration = expiration;
375   mb->size = size;
376   mb->type = type;
377   memcpy (&mb[1], data, size);
378   GNUNET_CONTAINER_DLL_insert_after (mig_head,
379                                      mig_tail,
380                                      mig_tail,
381                                      mb);
382   mig_size++;
383   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
384                                          &consider_migration,
385                                          mb);
386   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
387 }
388
389
390
391 /**
392  * Task that is run periodically to obtain blocks for content
393  * migration
394  * 
395  * @param cls unused
396  * @param tc scheduler context (also unused)
397  */
398 static void
399 gather_migration_blocks (void *cls,
400                          const struct GNUNET_SCHEDULER_TaskContext *tc)
401 {
402   mig_task = GNUNET_SCHEDULER_NO_TASK;
403   if (dsh != NULL)
404     {
405       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
406                                             GNUNET_TIME_UNIT_FOREVER_REL,
407                                             &process_migration_content, NULL);
408       GNUNET_assert (mig_qe != NULL);
409     }
410 }
411
412
413
414 size_t
415 API_ (void *cls,
416       size_t size, void *buf)
417 {
418     next = mig_head;
419       while (NULL != (mb = next))
420         {
421           next = mb->next;
422           for (i=0;i<MIGRATION_LIST_SIZE;i++)
423             {
424               if ( (cp->pid == mb->target_list[i]) &&
425                    (mb->size + sizeof (migm) <= size) )
426                 {
427                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
428                   mb->target_list[i] = 0;
429                   mb->used_targets++;
430                   memset (&migm, 0, sizeof (migm));
431                   migm.header.size = htons (sizeof (migm) + mb->size);
432                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
433                   migm.type = htonl (mb->type);
434                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
435                   memcpy (&cbuf[msize], &migm, sizeof (migm));
436                   msize += sizeof (migm);
437                   size -= sizeof (migm);
438                   memcpy (&cbuf[msize], &mb[1], mb->size);
439                   msize += mb->size;
440                   size -= mb->size;
441 #if DEBUG_FS
442                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
444                               GNUNET_h2s (&mb->query),
445                               (unsigned int) mb->size,
446                               GNUNET_i2s (&pid));
447 #endif    
448                   break;
449                 }
450               else
451                 {
452 #if DEBUG_FS
453                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
454                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
455                               GNUNET_h2s (&mb->query),
456                               (unsigned int) mb->size,
457                               GNUNET_i2s (&pid));
458 #endif    
459                 }
460             }
461           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
462                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
463             {
464               delete_migration_block (mb);
465               consider_migration_gathering ();
466             }
467         }
468       consider_migration (NULL, 
469                           &pid.hashPubKey,
470                           cp);
471
472 }
473
474
475