first batch of license fixes (boring)
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15
16 /**
17  * @file fs/gnunet-service-fs_push.c
18  * @brief API to push content from our datastore to other peers
19  *            ('anonymous'-content P2P migration)
20  * @author Christian Grothoff
21  */
22 #include "platform.h"
23 #include "gnunet-service-fs.h"
24 #include "gnunet-service-fs_cp.h"
25 #include "gnunet-service-fs_indexing.h"
26 #include "gnunet-service-fs_push.h"
27
28
29 /**
30  * Maximum number of blocks we keep in memory for migration.
31  */
32 #define MAX_MIGRATION_QUEUE 8
33
34 /**
35  * Blocks are at most migrated to this number of peers
36  * plus one, each time they are fetched from the database.
37  */
38 #define MIGRATION_LIST_SIZE 2
39
40 /**
41  * How long must content remain valid for us to consider it for migration?
42  * If content will expire too soon, there is clearly no point in pushing
43  * it to other peers.  This value gives the threshold for migration.  Note
44  * that if this value is increased, the migration testcase may need to be
45  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
46  */
47 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
48
49
50 /**
51  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
52  */
53 struct MigrationReadyBlock
54 {
55
56   /**
57    * This is a doubly-linked list.
58    */
59   struct MigrationReadyBlock *next;
60
61   /**
62    * This is a doubly-linked list.
63    */
64   struct MigrationReadyBlock *prev;
65
66   /**
67    * Query for the block.
68    */
69   struct GNUNET_HashCode query;
70
71   /**
72    * When does this block expire?
73    */
74   struct GNUNET_TIME_Absolute expiration;
75
76   /**
77    * Peers we already forwarded this
78    * block to.  Zero for empty entries.
79    */
80   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
81
82   /**
83    * Size of the block.
84    */
85   size_t size;
86
87   /**
88    *  Number of targets already used.
89    */
90   unsigned int used_targets;
91
92   /**
93    * Type of the block.
94    */
95   enum GNUNET_BLOCK_Type type;
96 };
97
98
99 /**
100  * Information about a peer waiting for migratable data.
101  */
102 struct MigrationReadyPeer
103 {
104   /**
105    * This is a doubly-linked list.
106    */
107   struct MigrationReadyPeer *next;
108
109   /**
110    * This is a doubly-linked list.
111    */
112   struct MigrationReadyPeer *prev;
113
114   /**
115    * Handle to peer.
116    */
117   struct GSF_ConnectedPeer *peer;
118
119   /**
120    * Envelope of the currently pushed message.
121    */
122   struct GNUNET_MQ_Envelope *env;
123 };
124
125
126 /**
127  * Head of linked list of blocks that can be migrated.
128  */
129 static struct MigrationReadyBlock *mig_head;
130
131 /**
132  * Tail of linked list of blocks that can be migrated.
133  */
134 static struct MigrationReadyBlock *mig_tail;
135
136 /**
137  * Head of linked list of peers.
138  */
139 static struct MigrationReadyPeer *peer_head;
140
141 /**
142  * Tail of linked list of peers.
143  */
144 static struct MigrationReadyPeer *peer_tail;
145
146 /**
147  * Request to datastore for migration (or NULL).
148  */
149 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
150
151 /**
152  * ID of task that collects blocks for migration.
153  */
154 static struct GNUNET_SCHEDULER_Task *mig_task;
155
156 /**
157  * What is the maximum frequency at which we are allowed to
158  * poll the datastore for migration content?
159  */
160 static struct GNUNET_TIME_Relative min_migration_delay;
161
162 /**
163  * Size of the doubly-linked list of migration blocks.
164  */
165 static unsigned int mig_size;
166
167 /**
168  * Is this module enabled?
169  */
170 static int enabled;
171
172 /**
173  * Did we find anything in the datastore?
174  */
175 static int value_found;
176
177
178 /**
179  * Delete the given migration block.
180  *
181  * @param mb block to delete
182  */
183 static void
184 delete_migration_block (struct MigrationReadyBlock *mb)
185 {
186   GNUNET_CONTAINER_DLL_remove (mig_head,
187                                mig_tail,
188                                mb);
189   GNUNET_PEER_decrement_rcs (mb->target_list,
190                              MIGRATION_LIST_SIZE);
191   mig_size--;
192   GNUNET_free (mb);
193 }
194
195
196 /**
197  * Find content for migration to this peer.
198  *
199  * @param cls a `struct MigrationReadyPeer *`
200  */
201 static void
202 find_content (void *cls);
203
204
205 /**
206  * Send the given block to the given peer.
207  *
208  * @param peer target peer
209  * @param block the block
210  * @return #GNUNET_YES if the block was deleted (!)
211  */
212 static int
213 transmit_content (struct MigrationReadyPeer *mrp,
214                   struct MigrationReadyBlock *block)
215 {
216   struct PutMessage *msg;
217   unsigned int i;
218   struct GSF_PeerPerformanceData *ppd;
219   int ret;
220
221   ppd = GSF_get_peer_performance_data_ (mrp->peer);
222   GNUNET_assert (NULL == mrp->env);
223   mrp->env = GNUNET_MQ_msg_extra (msg,
224                                   block->size,
225                                   GNUNET_MESSAGE_TYPE_FS_PUT);
226   msg->type = htonl (block->type);
227   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
228   GNUNET_memcpy (&msg[1],
229                  &block[1],
230                  block->size);
231   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
232   {
233     if (block->target_list[i] == 0)
234     {
235       block->target_list[i] = ppd->pid;
236       GNUNET_PEER_change_rc (block->target_list[i],
237                              1);
238       break;
239     }
240   }
241   if (MIGRATION_LIST_SIZE == i)
242   {
243     delete_migration_block (block);
244     ret = GNUNET_YES;
245   }
246   else
247   {
248     ret = GNUNET_NO;
249   }
250   GNUNET_MQ_notify_sent (mrp->env,
251                          &find_content,
252                          mrp);
253   GSF_peer_transmit_ (mrp->peer,
254                       GNUNET_NO,
255                       0 /* priority */ ,
256                       mrp->env);
257   return ret;
258 }
259
260
261 /**
262  * Count the number of peers this block has
263  * already been forwarded to.
264  *
265  * @param block the block
266  * @return number of times block was forwarded
267  */
268 static unsigned int
269 count_targets (struct MigrationReadyBlock *block)
270 {
271   unsigned int i;
272
273   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
274     if (block->target_list[i] == 0)
275       return i;
276   return i;
277 }
278
279
280 /**
281  * Check if sending this block to this peer would
282  * be a good idea.
283  *
284  * @param mrp target peer
285  * @param block the block
286  * @return score (>= 0: feasible, negative: infeasible)
287  */
288 static long
289 score_content (struct MigrationReadyPeer *mrp,
290                struct MigrationReadyBlock *block)
291 {
292   unsigned int i;
293   struct GSF_PeerPerformanceData *ppd;
294   struct GNUNET_PeerIdentity id;
295   struct GNUNET_HashCode hc;
296   uint32_t dist;
297
298   ppd = GSF_get_peer_performance_data_ (mrp->peer);
299   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
300     if (block->target_list[i] == ppd->pid)
301       return -1;
302   GNUNET_assert (0 != ppd->pid);
303   GNUNET_PEER_resolve (ppd->pid,
304                        &id);
305   GNUNET_CRYPTO_hash (&id,
306                       sizeof (struct GNUNET_PeerIdentity),
307                       &hc);
308   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
309                                           &hc);
310   /* closer distance, higher score: */
311   return UINT32_MAX - dist;
312 }
313
314
315 /**
316  * If the migration task is not currently running, consider
317  * (re)scheduling it with the appropriate delay.
318  */
319 static void
320 consider_gathering (void);
321
322
323 /**
324  * Find content for migration to this peer.
325  *
326  * @param cls peer to find content for
327  */
328 static void
329 find_content (void *cls)
330 {
331   struct MigrationReadyPeer *mrp = cls;
332   struct MigrationReadyBlock *pos;
333   long score;
334   long best_score;
335   struct MigrationReadyBlock *best;
336
337   mrp->env = NULL;
338   best = NULL;
339   best_score = -1;
340   pos = mig_head;
341   while (NULL != pos)
342   {
343     score = score_content (mrp, pos);
344     if (score > best_score)
345     {
346       best_score = score;
347       best = pos;
348     }
349     pos = pos->next;
350   }
351   if (NULL == best)
352   {
353     if (mig_size < MAX_MIGRATION_QUEUE)
354     {
355       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
356                   "No content found for pushing, waiting for queue to fill\n");
357       return;                   /* will fill up eventually... */
358     }
359     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360                 "No suitable content found, purging content from full queue\n");
361     /* failed to find migration target AND
362      * queue is full, purge most-forwarded
363      * block from queue to make room for more */
364     pos = mig_head;
365     while (NULL != pos)
366     {
367       score = count_targets (pos);
368       if (score >= best_score)
369       {
370         best_score = score;
371         best = pos;
372       }
373       pos = pos->next;
374     }
375     GNUNET_assert (NULL != best);
376     delete_migration_block (best);
377     consider_gathering ();
378     return;
379   }
380   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
381               "Preparing to push best content to peer\n");
382   transmit_content (mrp,
383                     best);
384 }
385
386
387 /**
388  * Task that is run periodically to obtain blocks for content
389  * migration
390  *
391  * @param cls unused
392  */
393 static void
394 gather_migration_blocks (void *cls);
395
396
397 /**
398  * If the migration task is not currently running, consider
399  * (re)scheduling it with the appropriate delay.
400  */
401 static void
402 consider_gathering ()
403 {
404   struct GNUNET_TIME_Relative delay;
405
406   if (NULL == GSF_dsh)
407     return;
408   if (NULL != mig_qe)
409     return;
410   if (NULL != mig_task)
411     return;
412   if (mig_size >= MAX_MIGRATION_QUEUE)
413     return;
414   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
415                                          mig_size);
416   delay = GNUNET_TIME_relative_divide (delay,
417                                        MAX_MIGRATION_QUEUE);
418   delay = GNUNET_TIME_relative_max (delay,
419                                     min_migration_delay);
420   if (GNUNET_NO == value_found)
421   {
422     /* wait at least 5s if the datastore is empty */
423     delay = GNUNET_TIME_relative_max (delay,
424                                       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
425                                                                      5));
426   }
427   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
428               "Scheduling gathering task (queue size: %u)\n",
429               mig_size);
430   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
431                                            &gather_migration_blocks,
432                                            NULL);
433 }
434
435
436 /**
437  * Process content offered for migration.
438  *
439  * @param cls closure
440  * @param key key for the content
441  * @param size number of bytes in data
442  * @param data content stored
443  * @param type type of the content
444  * @param priority priority of the content
445  * @param anonymity anonymity-level for the content
446  * @param replication replication-level for the content
447  * @param expiration expiration time for the content
448  * @param uid unique identifier for the datum;
449  *        maybe 0 if no unique identifier is available
450  */
451 static void
452 process_migration_content (void *cls,
453                            const struct GNUNET_HashCode *key,
454                            size_t size,
455                            const void *data,
456                            enum GNUNET_BLOCK_Type type,
457                            uint32_t priority,
458                            uint32_t anonymity,
459                            uint32_t replication,
460                            struct GNUNET_TIME_Absolute expiration,
461                            uint64_t uid)
462 {
463   struct MigrationReadyBlock *mb;
464   struct MigrationReadyPeer *pos;
465
466   mig_qe = NULL;
467   if (NULL == key)
468   {
469     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
470                 "No content found for migration...\n");
471     consider_gathering ();
472     return;
473   }
474   value_found = GNUNET_YES;
475   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
476       MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
477   {
478     /* content will expire soon, don't bother */
479     consider_gathering ();
480     return;
481   }
482   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
483   {
484     if (GNUNET_OK !=
485         GNUNET_FS_handle_on_demand_block (key,
486                                           size,
487                                           data,
488                                           type,
489                                           priority,
490                                           anonymity,
491                                           replication,
492                                           expiration,
493                                           uid,
494                                           &process_migration_content,
495                                           NULL))
496       consider_gathering ();
497     return;
498   }
499   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
501               GNUNET_h2s (key),
502               type, mig_size + 1,
503               MAX_MIGRATION_QUEUE);
504   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
505   mb->query = *key;
506   mb->expiration = expiration;
507   mb->size = size;
508   mb->type = type;
509   GNUNET_memcpy (&mb[1], data, size);
510   GNUNET_CONTAINER_DLL_insert_after (mig_head,
511                                      mig_tail,
512                                      mig_tail,
513                                      mb);
514   mig_size++;
515   for (pos = peer_head; NULL != pos; pos = pos->next)
516   {
517     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518                 "Preparing to push best content to peer %s\n",
519                 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
520     if ( (NULL == pos->env) &&
521          (GNUNET_YES == transmit_content (pos,
522                                           mb)) ) {
523       break;                  /* 'mb' was freed! */
524     }
525   }
526   consider_gathering ();
527 }
528
529
530 /**
531  * Task that is run periodically to obtain blocks for content
532  * migration
533  *
534  * @param cls unused
535  */
536 static void
537 gather_migration_blocks (void *cls)
538 {
539   mig_task = NULL;
540   if (mig_size >= MAX_MIGRATION_QUEUE)
541     return;
542   if (NULL == GSF_dsh)
543     return;
544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545               "Asking datastore for content for replication (queue size: %u)\n",
546               mig_size);
547   value_found = GNUNET_NO;
548   mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
549                                                  0,
550                                                  UINT_MAX,
551                                                  &process_migration_content,
552                                                  NULL);
553   if (NULL == mig_qe)
554     consider_gathering ();
555 }
556
557
558 /**
559  * A peer connected to us.  Start pushing content
560  * to this peer.
561  *
562  * @param peer handle for the peer that connected
563  */
564 void
565 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
566 {
567   struct MigrationReadyPeer *mrp;
568
569   if (GNUNET_YES != enabled)
570     return;
571   for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
572     if (mrp->peer == peer)
573       break;
574   if (NULL != mrp)
575   {
576     /* same peer added twice, must not happen */
577     GNUNET_break (0);
578     return;
579   }
580
581   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
582               "Adding peer %s to list for pushing\n",
583               GNUNET_i2s (GSF_connected_peer_get_identity2_(peer)));
584
585   mrp = GNUNET_new (struct MigrationReadyPeer);
586   mrp->peer = peer;
587   find_content (mrp);
588   GNUNET_CONTAINER_DLL_insert (peer_head,
589                                peer_tail,
590                                mrp);
591 }
592
593
594 /**
595  * A peer disconnected from us.  Stop pushing content
596  * to this peer.
597  *
598  * @param peer handle for the peer that disconnected
599  */
600 void
601 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
602 {
603   struct MigrationReadyPeer *pos;
604
605   for (pos = peer_head; NULL != pos; pos = pos->next)
606     if (pos->peer == peer)
607       break;
608   if (NULL == pos)
609     return;
610   if (NULL != pos->env)
611     GNUNET_MQ_send_cancel (pos->env);
612   GNUNET_CONTAINER_DLL_remove (peer_head,
613                                peer_tail,
614                                pos);
615   GNUNET_free (pos);
616 }
617
618
619 /**
620  * Setup the module.
621  */
622 void
623 GSF_push_init_ ()
624 {
625   enabled =
626     GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
627                                           "FS",
628                                           "CONTENT_PUSHING");
629   if (GNUNET_YES != enabled)
630     return;
631
632   if (GNUNET_OK !=
633       GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
634                                            "fs",
635                                            "MIN_MIGRATION_DELAY",
636                                            &min_migration_delay))
637   {
638     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
639                                "fs",
640                                "MIN_MIGRATION_DELAY",
641                                _("time required, content pushing disabled"));
642     return;
643   }
644   consider_gathering ();
645 }
646
647
648 /**
649  * Shutdown the module.
650  */
651 void
652 GSF_push_done_ ()
653 {
654   if (NULL != mig_task)
655   {
656     GNUNET_SCHEDULER_cancel (mig_task);
657     mig_task = NULL;
658   }
659   if (NULL != mig_qe)
660   {
661     GNUNET_DATASTORE_cancel (mig_qe);
662     mig_qe = NULL;
663   }
664   while (NULL != mig_head)
665     delete_migration_block (mig_head);
666   GNUNET_assert (0 == mig_size);
667 }
668
669 /* end of gnunet-service-fs_push.c */