Merge branch 'master' of gnunet.org:gnunet
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file fs/gnunet-service-fs_push.c
21  * @brief API to push content from our datastore to other peers
22  *            ('anonymous'-content P2P migration)
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet-service-fs.h"
27 #include "gnunet-service-fs_cp.h"
28 #include "gnunet-service-fs_indexing.h"
29 #include "gnunet-service-fs_push.h"
30
31
32 /**
33  * Maximum number of blocks we keep in memory for migration.
34  */
35 #define MAX_MIGRATION_QUEUE 8
36
37 /**
38  * Blocks are at most migrated to this number of peers
39  * plus one, each time they are fetched from the database.
40  */
41 #define MIGRATION_LIST_SIZE 2
42
43 /**
44  * How long must content remain valid for us to consider it for migration?
45  * If content will expire too soon, there is clearly no point in pushing
46  * it to other peers.  This value gives the threshold for migration.  Note
47  * that if this value is increased, the migration testcase may need to be
48  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
49  */
50 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
51
52
53 /**
54  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
55  */
56 struct MigrationReadyBlock
57 {
58
59   /**
60    * This is a doubly-linked list.
61    */
62   struct MigrationReadyBlock *next;
63
64   /**
65    * This is a doubly-linked list.
66    */
67   struct MigrationReadyBlock *prev;
68
69   /**
70    * Query for the block.
71    */
72   struct GNUNET_HashCode query;
73
74   /**
75    * When does this block expire?
76    */
77   struct GNUNET_TIME_Absolute expiration;
78
79   /**
80    * Peers we already forwarded this
81    * block to.  Zero for empty entries.
82    */
83   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
84
85   /**
86    * Size of the block.
87    */
88   size_t size;
89
90   /**
91    *  Number of targets already used.
92    */
93   unsigned int used_targets;
94
95   /**
96    * Type of the block.
97    */
98   enum GNUNET_BLOCK_Type type;
99 };
100
101
102 /**
103  * Information about a peer waiting for migratable data.
104  */
105 struct MigrationReadyPeer
106 {
107   /**
108    * This is a doubly-linked list.
109    */
110   struct MigrationReadyPeer *next;
111
112   /**
113    * This is a doubly-linked list.
114    */
115   struct MigrationReadyPeer *prev;
116
117   /**
118    * Handle to peer.
119    */
120   struct GSF_ConnectedPeer *peer;
121
122   /**
123    * Envelope of the currently pushed message.
124    */
125   struct GNUNET_MQ_Envelope *env;
126 };
127
128
129 /**
130  * Head of linked list of blocks that can be migrated.
131  */
132 static struct MigrationReadyBlock *mig_head;
133
134 /**
135  * Tail of linked list of blocks that can be migrated.
136  */
137 static struct MigrationReadyBlock *mig_tail;
138
139 /**
140  * Head of linked list of peers.
141  */
142 static struct MigrationReadyPeer *peer_head;
143
144 /**
145  * Tail of linked list of peers.
146  */
147 static struct MigrationReadyPeer *peer_tail;
148
149 /**
150  * Request to datastore for migration (or NULL).
151  */
152 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
153
154 /**
155  * ID of task that collects blocks for migration.
156  */
157 static struct GNUNET_SCHEDULER_Task *mig_task;
158
159 /**
160  * What is the maximum frequency at which we are allowed to
161  * poll the datastore for migration content?
162  */
163 static struct GNUNET_TIME_Relative min_migration_delay;
164
165 /**
166  * Size of the doubly-linked list of migration blocks.
167  */
168 static unsigned int mig_size;
169
170 /**
171  * Is this module enabled?
172  */
173 static int enabled;
174
175 /**
176  * Did we find anything in the datastore?
177  */
178 static int value_found;
179
180
181 /**
182  * Delete the given migration block.
183  *
184  * @param mb block to delete
185  */
186 static void
187 delete_migration_block (struct MigrationReadyBlock *mb)
188 {
189   GNUNET_CONTAINER_DLL_remove (mig_head,
190                                mig_tail,
191                                mb);
192   GNUNET_PEER_decrement_rcs (mb->target_list,
193                              MIGRATION_LIST_SIZE);
194   mig_size--;
195   GNUNET_free (mb);
196 }
197
198
199 /**
200  * Find content for migration to this peer.
201  *
202  * @param cls a `struct MigrationReadyPeer *`
203  */
204 static void
205 find_content (void *cls);
206
207
208 /**
209  * Send the given block to the given peer.
210  *
211  * @param peer target peer
212  * @param block the block
213  * @return #GNUNET_YES if the block was deleted (!)
214  */
215 static int
216 transmit_content (struct MigrationReadyPeer *mrp,
217                   struct MigrationReadyBlock *block)
218 {
219   struct PutMessage *msg;
220   unsigned int i;
221   struct GSF_PeerPerformanceData *ppd;
222   int ret;
223
224   ppd = GSF_get_peer_performance_data_ (mrp->peer);
225   GNUNET_assert (NULL == mrp->env);
226   mrp->env = GNUNET_MQ_msg_extra (msg,
227                                   block->size,
228                                   GNUNET_MESSAGE_TYPE_FS_PUT);
229   msg->type = htonl (block->type);
230   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
231   GNUNET_memcpy (&msg[1],
232                  &block[1],
233                  block->size);
234   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
235   {
236     if (block->target_list[i] == 0)
237     {
238       block->target_list[i] = ppd->pid;
239       GNUNET_PEER_change_rc (block->target_list[i],
240                              1);
241       break;
242     }
243   }
244   if (MIGRATION_LIST_SIZE == i)
245   {
246     delete_migration_block (block);
247     ret = GNUNET_YES;
248   }
249   else
250   {
251     ret = GNUNET_NO;
252   }
253   GNUNET_MQ_notify_sent (mrp->env,
254                          &find_content,
255                          mrp);
256   GSF_peer_transmit_ (mrp->peer,
257                       GNUNET_NO,
258                       0 /* priority */ ,
259                       mrp->env);
260   return ret;
261 }
262
263
264 /**
265  * Count the number of peers this block has
266  * already been forwarded to.
267  *
268  * @param block the block
269  * @return number of times block was forwarded
270  */
271 static unsigned int
272 count_targets (struct MigrationReadyBlock *block)
273 {
274   unsigned int i;
275
276   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
277     if (block->target_list[i] == 0)
278       return i;
279   return i;
280 }
281
282
283 /**
284  * Check if sending this block to this peer would
285  * be a good idea.
286  *
287  * @param mrp target peer
288  * @param block the block
289  * @return score (>= 0: feasible, negative: infeasible)
290  */
291 static long
292 score_content (struct MigrationReadyPeer *mrp,
293                struct MigrationReadyBlock *block)
294 {
295   unsigned int i;
296   struct GSF_PeerPerformanceData *ppd;
297   struct GNUNET_PeerIdentity id;
298   struct GNUNET_HashCode hc;
299   uint32_t dist;
300
301   ppd = GSF_get_peer_performance_data_ (mrp->peer);
302   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
303     if (block->target_list[i] == ppd->pid)
304       return -1;
305   GNUNET_assert (0 != ppd->pid);
306   GNUNET_PEER_resolve (ppd->pid,
307                        &id);
308   GNUNET_CRYPTO_hash (&id,
309                       sizeof (struct GNUNET_PeerIdentity),
310                       &hc);
311   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
312                                           &hc);
313   /* closer distance, higher score: */
314   return UINT32_MAX - dist;
315 }
316
317
318 /**
319  * If the migration task is not currently running, consider
320  * (re)scheduling it with the appropriate delay.
321  */
322 static void
323 consider_gathering (void);
324
325
326 /**
327  * Find content for migration to this peer.
328  *
329  * @param cls peer to find content for
330  */
331 static void
332 find_content (void *cls)
333 {
334   struct MigrationReadyPeer *mrp = cls;
335   struct MigrationReadyBlock *pos;
336   long score;
337   long best_score;
338   struct MigrationReadyBlock *best;
339
340   mrp->env = NULL;
341   best = NULL;
342   best_score = -1;
343   pos = mig_head;
344   while (NULL != pos)
345   {
346     score = score_content (mrp, pos);
347     if (score > best_score)
348     {
349       best_score = score;
350       best = pos;
351     }
352     pos = pos->next;
353   }
354   if (NULL == best)
355   {
356     if (mig_size < MAX_MIGRATION_QUEUE)
357     {
358       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
359                   "No content found for pushing, waiting for queue to fill\n");
360       return;                   /* will fill up eventually... */
361     }
362     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
363                 "No suitable content found, purging content from full queue\n");
364     /* failed to find migration target AND
365      * queue is full, purge most-forwarded
366      * block from queue to make room for more */
367     pos = mig_head;
368     while (NULL != pos)
369     {
370       score = count_targets (pos);
371       if (score >= best_score)
372       {
373         best_score = score;
374         best = pos;
375       }
376       pos = pos->next;
377     }
378     GNUNET_assert (NULL != best);
379     delete_migration_block (best);
380     consider_gathering ();
381     return;
382   }
383   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384               "Preparing to push best content to peer\n");
385   transmit_content (mrp,
386                     best);
387 }
388
389
390 /**
391  * Task that is run periodically to obtain blocks for content
392  * migration
393  *
394  * @param cls unused
395  */
396 static void
397 gather_migration_blocks (void *cls);
398
399
400 /**
401  * If the migration task is not currently running, consider
402  * (re)scheduling it with the appropriate delay.
403  */
404 static void
405 consider_gathering ()
406 {
407   struct GNUNET_TIME_Relative delay;
408
409   if (NULL == GSF_dsh)
410     return;
411   if (NULL != mig_qe)
412     return;
413   if (NULL != mig_task)
414     return;
415   if (mig_size >= MAX_MIGRATION_QUEUE)
416     return;
417   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
418                                          mig_size);
419   delay = GNUNET_TIME_relative_divide (delay,
420                                        MAX_MIGRATION_QUEUE);
421   delay = GNUNET_TIME_relative_max (delay,
422                                     min_migration_delay);
423   if (GNUNET_NO == value_found)
424   {
425     /* wait at least 5s if the datastore is empty */
426     delay = GNUNET_TIME_relative_max (delay,
427                                       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
428                                                                      5));
429   }
430   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
431               "Scheduling gathering task (queue size: %u)\n",
432               mig_size);
433   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
434                                            &gather_migration_blocks,
435                                            NULL);
436 }
437
438
439 /**
440  * Process content offered for migration.
441  *
442  * @param cls closure
443  * @param key key for the content
444  * @param size number of bytes in data
445  * @param data content stored
446  * @param type type of the content
447  * @param priority priority of the content
448  * @param anonymity anonymity-level for the content
449  * @param replication replication-level for the content
450  * @param expiration expiration time for the content
451  * @param uid unique identifier for the datum;
452  *        maybe 0 if no unique identifier is available
453  */
454 static void
455 process_migration_content (void *cls,
456                            const struct GNUNET_HashCode *key,
457                            size_t size,
458                            const void *data,
459                            enum GNUNET_BLOCK_Type type,
460                            uint32_t priority,
461                            uint32_t anonymity,
462                            uint32_t replication,
463                            struct GNUNET_TIME_Absolute expiration,
464                            uint64_t uid)
465 {
466   struct MigrationReadyBlock *mb;
467   struct MigrationReadyPeer *pos;
468
469   mig_qe = NULL;
470   if (NULL == key)
471   {
472     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
473                 "No content found for migration...\n");
474     consider_gathering ();
475     return;
476   }
477   value_found = GNUNET_YES;
478   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
479       MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
480   {
481     /* content will expire soon, don't bother */
482     consider_gathering ();
483     return;
484   }
485   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
486   {
487     if (GNUNET_OK !=
488         GNUNET_FS_handle_on_demand_block (key,
489                                           size,
490                                           data,
491                                           type,
492                                           priority,
493                                           anonymity,
494                                           replication,
495                                           expiration,
496                                           uid,
497                                           &process_migration_content,
498                                           NULL))
499       consider_gathering ();
500     return;
501   }
502   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
503               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
504               GNUNET_h2s (key),
505               type, mig_size + 1,
506               MAX_MIGRATION_QUEUE);
507   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
508   mb->query = *key;
509   mb->expiration = expiration;
510   mb->size = size;
511   mb->type = type;
512   GNUNET_memcpy (&mb[1], data, size);
513   GNUNET_CONTAINER_DLL_insert_after (mig_head,
514                                      mig_tail,
515                                      mig_tail,
516                                      mb);
517   mig_size++;
518   for (pos = peer_head; NULL != pos; pos = pos->next)
519   {
520     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521                 "Preparing to push best content to peer %s\n",
522                 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
523     if ( (NULL == pos->env) &&
524          (GNUNET_YES == transmit_content (pos,
525                                           mb)) ) {
526       break;                  /* 'mb' was freed! */
527     }
528   }
529   consider_gathering ();
530 }
531
532
533 /**
534  * Task that is run periodically to obtain blocks for content
535  * migration
536  *
537  * @param cls unused
538  */
539 static void
540 gather_migration_blocks (void *cls)
541 {
542   mig_task = NULL;
543   if (mig_size >= MAX_MIGRATION_QUEUE)
544     return;
545   if (NULL == GSF_dsh)
546     return;
547   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
548               "Asking datastore for content for replication (queue size: %u)\n",
549               mig_size);
550   value_found = GNUNET_NO;
551   mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
552                                                  0,
553                                                  UINT_MAX,
554                                                  &process_migration_content,
555                                                  NULL);
556   if (NULL == mig_qe)
557     consider_gathering ();
558 }
559
560
561 /**
562  * A peer connected to us.  Start pushing content
563  * to this peer.
564  *
565  * @param peer handle for the peer that connected
566  */
567 void
568 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
569 {
570   struct MigrationReadyPeer *mrp;
571
572   if (GNUNET_YES != enabled)
573     return;
574   for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
575     if (mrp->peer == peer)
576       break;
577   if (NULL != mrp)
578   {
579     /* same peer added twice, must not happen */
580     GNUNET_break (0);
581     return;
582   }
583
584   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
585               "Adding peer %s to list for pushing\n",
586               GNUNET_i2s (GSF_connected_peer_get_identity2_(peer)));
587
588   mrp = GNUNET_new (struct MigrationReadyPeer);
589   mrp->peer = peer;
590   find_content (mrp);
591   GNUNET_CONTAINER_DLL_insert (peer_head,
592                                peer_tail,
593                                mrp);
594 }
595
596
597 /**
598  * A peer disconnected from us.  Stop pushing content
599  * to this peer.
600  *
601  * @param peer handle for the peer that disconnected
602  */
603 void
604 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
605 {
606   struct MigrationReadyPeer *pos;
607
608   for (pos = peer_head; NULL != pos; pos = pos->next)
609     if (pos->peer == peer)
610       break;
611   if (NULL == pos)
612     return;
613   if (NULL != pos->env)
614     GNUNET_MQ_send_cancel (pos->env);
615   GNUNET_CONTAINER_DLL_remove (peer_head,
616                                peer_tail,
617                                pos);
618   GNUNET_free (pos);
619 }
620
621
622 /**
623  * Setup the module.
624  */
625 void
626 GSF_push_init_ ()
627 {
628   enabled =
629     GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
630                                           "FS",
631                                           "CONTENT_PUSHING");
632   if (GNUNET_YES != enabled)
633     return;
634
635   if (GNUNET_OK !=
636       GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
637                                            "fs",
638                                            "MIN_MIGRATION_DELAY",
639                                            &min_migration_delay))
640   {
641     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
642                                "fs",
643                                "MIN_MIGRATION_DELAY",
644                                _("time required, content pushing disabled"));
645     return;
646   }
647   consider_gathering ();
648 }
649
650
651 /**
652  * Shutdown the module.
653  */
654 void
655 GSF_push_done_ ()
656 {
657   if (NULL != mig_task)
658   {
659     GNUNET_SCHEDULER_cancel (mig_task);
660     mig_task = NULL;
661   }
662   if (NULL != mig_qe)
663   {
664     GNUNET_DATASTORE_cancel (mig_qe);
665     mig_qe = NULL;
666   }
667   while (NULL != mig_head)
668     delete_migration_block (mig_head);
669   GNUNET_assert (0 == mig_size);
670 }
671
672 /* end of gnunet-service-fs_push.c */