converting FS to new MQ-based core API
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * Maximum number of blocks we keep in memory for migration.
36  */
37 #define MAX_MIGRATION_QUEUE 8
38
39 /**
40  * Blocks are at most migrated to this number of peers
41  * plus one, each time they are fetched from the database.
42  */
43 #define MIGRATION_LIST_SIZE 2
44
45 /**
46  * How long must content remain valid for us to consider it for migration?
47  * If content will expire too soon, there is clearly no point in pushing
48  * it to other peers.  This value gives the threshold for migration.  Note
49  * that if this value is increased, the migration testcase may need to be
50  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
51  */
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
53
54
55 /**
56  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
57  */
58 struct MigrationReadyBlock
59 {
60
61   /**
62    * This is a doubly-linked list.
63    */
64   struct MigrationReadyBlock *next;
65
66   /**
67    * This is a doubly-linked list.
68    */
69   struct MigrationReadyBlock *prev;
70
71   /**
72    * Query for the block.
73    */
74   struct GNUNET_HashCode query;
75
76   /**
77    * When does this block expire?
78    */
79   struct GNUNET_TIME_Absolute expiration;
80
81   /**
82    * Peers we already forwarded this
83    * block to.  Zero for empty entries.
84    */
85   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
86
87   /**
88    * Size of the block.
89    */
90   size_t size;
91
92   /**
93    *  Number of targets already used.
94    */
95   unsigned int used_targets;
96
97   /**
98    * Type of the block.
99    */
100   enum GNUNET_BLOCK_Type type;
101 };
102
103
104 /**
105  * Information about a peer waiting for migratable data.
106  */
107 struct MigrationReadyPeer
108 {
109   /**
110    * This is a doubly-linked list.
111    */
112   struct MigrationReadyPeer *next;
113
114   /**
115    * This is a doubly-linked list.
116    */
117   struct MigrationReadyPeer *prev;
118
119   /**
120    * Handle to peer.
121    */
122   struct GSF_ConnectedPeer *peer;
123
124   /**
125    * Envelope of the currently pushed message.
126    */
127   struct GNUNET_MQ_Envelope *env;
128 };
129
130
131 /**
132  * Head of linked list of blocks that can be migrated.
133  */
134 static struct MigrationReadyBlock *mig_head;
135
136 /**
137  * Tail of linked list of blocks that can be migrated.
138  */
139 static struct MigrationReadyBlock *mig_tail;
140
141 /**
142  * Head of linked list of peers.
143  */
144 static struct MigrationReadyPeer *peer_head;
145
146 /**
147  * Tail of linked list of peers.
148  */
149 static struct MigrationReadyPeer *peer_tail;
150
151 /**
152  * Request to datastore for migration (or NULL).
153  */
154 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
155
156 /**
157  * ID of task that collects blocks for migration.
158  */
159 static struct GNUNET_SCHEDULER_Task *mig_task;
160
161 /**
162  * What is the maximum frequency at which we are allowed to
163  * poll the datastore for migration content?
164  */
165 static struct GNUNET_TIME_Relative min_migration_delay;
166
167 /**
168  * Size of the doubly-linked list of migration blocks.
169  */
170 static unsigned int mig_size;
171
172 /**
173  * Is this module enabled?
174  */
175 static int enabled;
176
177 /**
178  * Did we find anything in the datastore?
179  */
180 static int value_found;
181
182
183 /**
184  * Delete the given migration block.
185  *
186  * @param mb block to delete
187  */
188 static void
189 delete_migration_block (struct MigrationReadyBlock *mb)
190 {
191   GNUNET_CONTAINER_DLL_remove (mig_head,
192                                mig_tail,
193                                mb);
194   GNUNET_PEER_decrement_rcs (mb->target_list,
195                              MIGRATION_LIST_SIZE);
196   mig_size--;
197   GNUNET_free (mb);
198 }
199
200
201 /**
202  * Find content for migration to this peer.
203  *
204  * @param cls a `struct MigrationReadyPeer *`
205  */
206 static void
207 find_content (void *cls);
208
209
210 /**
211  * Send the given block to the given peer.
212  *
213  * @param peer target peer
214  * @param block the block
215  * @return #GNUNET_YES if the block was deleted (!)
216  */
217 static int
218 transmit_content (struct MigrationReadyPeer *mrp,
219                   struct MigrationReadyBlock *block)
220 {
221   struct PutMessage *msg;
222   unsigned int i;
223   struct GSF_PeerPerformanceData *ppd;
224   int ret;
225
226   ppd = GSF_get_peer_performance_data_ (mrp->peer);
227   mrp->env = GNUNET_MQ_msg_extra (msg,
228                                   block->size,
229                                   GNUNET_MESSAGE_TYPE_FS_PUT);
230   msg->type = htonl (block->type);
231   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
232   GNUNET_memcpy (&msg[1],
233                  &block[1],
234                  block->size);
235   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
236   {
237     if (block->target_list[i] == 0)
238     {
239       block->target_list[i] = ppd->pid;
240       GNUNET_PEER_change_rc (block->target_list[i],
241                              1);
242       break;
243     }
244   }
245   if (MIGRATION_LIST_SIZE == i)
246   {
247     delete_migration_block (block);
248     ret = GNUNET_YES;
249   }
250   else
251   {
252     ret = GNUNET_NO;
253   }
254   GNUNET_MQ_notify_sent (mrp->env,
255                          &find_content,
256                          mrp);
257   GSF_peer_transmit_ (mrp->peer,
258                       GNUNET_NO,
259                       0 /* priority */ ,
260                       mrp->env);
261   return ret;
262 }
263
264
265 /**
266  * Count the number of peers this block has
267  * already been forwarded to.
268  *
269  * @param block the block
270  * @return number of times block was forwarded
271  */
272 static unsigned int
273 count_targets (struct MigrationReadyBlock *block)
274 {
275   unsigned int i;
276
277   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
278     if (block->target_list[i] == 0)
279       return i;
280   return i;
281 }
282
283
284 /**
285  * Check if sending this block to this peer would
286  * be a good idea.
287  *
288  * @param mrp target peer
289  * @param block the block
290  * @return score (>= 0: feasible, negative: infeasible)
291  */
292 static long
293 score_content (struct MigrationReadyPeer *mrp,
294                struct MigrationReadyBlock *block)
295 {
296   unsigned int i;
297   struct GSF_PeerPerformanceData *ppd;
298   struct GNUNET_PeerIdentity id;
299   struct GNUNET_HashCode hc;
300   uint32_t dist;
301
302   ppd = GSF_get_peer_performance_data_ (mrp->peer);
303   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
304     if (block->target_list[i] == ppd->pid)
305       return -1;
306   GNUNET_assert (0 != ppd->pid);
307   GNUNET_PEER_resolve (ppd->pid,
308                        &id);
309   GNUNET_CRYPTO_hash (&id,
310                       sizeof (struct GNUNET_PeerIdentity),
311                       &hc);
312   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
313                                           &hc);
314   /* closer distance, higher score: */
315   return UINT32_MAX - dist;
316 }
317
318
319 /**
320  * If the migration task is not currently running, consider
321  * (re)scheduling it with the appropriate delay.
322  */
323 static void
324 consider_gathering (void);
325
326
327 /**
328  * Find content for migration to this peer.
329  *
330  * @param cls peer to find content for
331  */
332 static void
333 find_content (void *cls)
334 {
335   struct MigrationReadyPeer *mrp = cls;
336   struct MigrationReadyBlock *pos;
337   long score;
338   long best_score;
339   struct MigrationReadyBlock *best;
340
341   mrp->env = NULL;
342   best = NULL;
343   best_score = -1;
344   pos = mig_head;
345   while (NULL != pos)
346   {
347     score = score_content (mrp, pos);
348     if (score > best_score)
349     {
350       best_score = score;
351       best = pos;
352     }
353     pos = pos->next;
354   }
355   if (NULL == best)
356   {
357     if (mig_size < MAX_MIGRATION_QUEUE)
358     {
359       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360                   "No content found for pushing, waiting for queue to fill\n");
361       return;                   /* will fill up eventually... */
362     }
363     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
364                 "No suitable content found, purging content from full queue\n");
365     /* failed to find migration target AND
366      * queue is full, purge most-forwarded
367      * block from queue to make room for more */
368     pos = mig_head;
369     while (NULL != pos)
370     {
371       score = count_targets (pos);
372       if (score >= best_score)
373       {
374         best_score = score;
375         best = pos;
376       }
377       pos = pos->next;
378     }
379     GNUNET_assert (NULL != best);
380     delete_migration_block (best);
381     consider_gathering ();
382     return;
383   }
384   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
385               "Preparing to push best content to peer\n");
386   transmit_content (mrp,
387                     best);
388 }
389
390
391 /**
392  * Task that is run periodically to obtain blocks for content
393  * migration
394  *
395  * @param cls unused
396  */
397 static void
398 gather_migration_blocks (void *cls);
399
400
401 /**
402  * If the migration task is not currently running, consider
403  * (re)scheduling it with the appropriate delay.
404  */
405 static void
406 consider_gathering ()
407 {
408   struct GNUNET_TIME_Relative delay;
409
410   if (NULL == GSF_dsh)
411     return;
412   if (NULL != mig_qe)
413     return;
414   if (NULL != mig_task)
415     return;
416   if (mig_size >= MAX_MIGRATION_QUEUE)
417     return;
418   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
419                                          mig_size);
420   delay = GNUNET_TIME_relative_divide (delay,
421                                        MAX_MIGRATION_QUEUE);
422   delay = GNUNET_TIME_relative_max (delay,
423                                     min_migration_delay);
424   if (GNUNET_NO == value_found)
425   {
426     /* wait at least 5s if the datastore is empty */
427     delay = GNUNET_TIME_relative_max (delay,
428                                       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
429                                                                      5));
430   }
431   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
432               "Scheduling gathering task (queue size: %u)\n",
433               mig_size);
434   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
435                                            &gather_migration_blocks,
436                                            NULL);
437 }
438
439
440 /**
441  * Process content offered for migration.
442  *
443  * @param cls closure
444  * @param key key for the content
445  * @param size number of bytes in data
446  * @param data content stored
447  * @param type type of the content
448  * @param priority priority of the content
449  * @param anonymity anonymity-level for the content
450  * @param expiration expiration time for the content
451  * @param uid unique identifier for the datum;
452  *        maybe 0 if no unique identifier is available
453  */
454 static void
455 process_migration_content (void *cls,
456                            const struct GNUNET_HashCode *key,
457                            size_t size,
458                            const void *data,
459                            enum GNUNET_BLOCK_Type type,
460                            uint32_t priority,
461                            uint32_t anonymity,
462                            struct GNUNET_TIME_Absolute expiration,
463                            uint64_t uid)
464 {
465   struct MigrationReadyBlock *mb;
466   struct MigrationReadyPeer *pos;
467
468   mig_qe = NULL;
469   if (NULL == key)
470   {
471     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
472                 "No content found for migration...\n");
473     consider_gathering ();
474     return;
475   }
476   value_found = GNUNET_YES;
477   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
478       MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
479   {
480     /* content will expire soon, don't bother */
481     consider_gathering ();
482     return;
483   }
484   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
485   {
486     if (GNUNET_OK !=
487         GNUNET_FS_handle_on_demand_block (key,
488                                           size,
489                                           data,
490                                           type,
491                                           priority,
492                                           anonymity,
493                                           expiration,
494                                           uid,
495                                           &process_migration_content, NULL))
496       consider_gathering ();
497     return;
498   }
499   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
501               GNUNET_h2s (key),
502               type, mig_size + 1,
503               MAX_MIGRATION_QUEUE);
504   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
505   mb->query = *key;
506   mb->expiration = expiration;
507   mb->size = size;
508   mb->type = type;
509   GNUNET_memcpy (&mb[1], data, size);
510   GNUNET_CONTAINER_DLL_insert_after (mig_head,
511                                      mig_tail,
512                                      mig_tail,
513                                      mb);
514   mig_size++;
515   for (pos = peer_head; NULL != pos; pos = pos->next)
516   {
517     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518                 "Preparing to push best content to peer %s\n",
519                 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
520     if (GNUNET_YES == transmit_content (pos,
521                                         mb))
522       break;                  /* 'mb' was freed! */
523   }
524   consider_gathering ();
525 }
526
527
528 /**
529  * Task that is run periodically to obtain blocks for content
530  * migration
531  *
532  * @param cls unused
533  */
534 static void
535 gather_migration_blocks (void *cls)
536 {
537   mig_task = NULL;
538   if (mig_size >= MAX_MIGRATION_QUEUE)
539     return;
540   if (NULL == GSF_dsh)
541     return;
542   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543               "Asking datastore for content for replication (queue size: %u)\n",
544               mig_size);
545   value_found = GNUNET_NO;
546   mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
547                                                  0,
548                                                  UINT_MAX,
549                                                  &process_migration_content,
550                                                  NULL);
551   if (NULL == mig_qe)
552     consider_gathering ();
553 }
554
555
556 /**
557  * A peer connected to us.  Start pushing content
558  * to this peer.
559  *
560  * @param peer handle for the peer that connected
561  */
562 void
563 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
564 {
565   struct MigrationReadyPeer *mrp;
566
567   if (GNUNET_YES != enabled)
568     return;
569   for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
570     if (mrp->peer == peer)
571       break;
572   if (NULL != mrp)
573   {
574     /* same peer added twice, must not happen */
575     GNUNET_break (0);
576     return;
577   }
578
579   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
580               "Adding peer %s to list for pushing\n",
581               GNUNET_i2s (GSF_connected_peer_get_identity2_(peer)));
582
583   mrp = GNUNET_new (struct MigrationReadyPeer);
584   mrp->peer = peer;
585   find_content (mrp);
586   GNUNET_CONTAINER_DLL_insert (peer_head,
587                                peer_tail,
588                                mrp);
589 }
590
591
592 /**
593  * A peer disconnected from us.  Stop pushing content
594  * to this peer.
595  *
596  * @param peer handle for the peer that disconnected
597  */
598 void
599 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
600 {
601   struct MigrationReadyPeer *pos;
602
603   for (pos = peer_head; NULL != pos; pos = pos->next)
604     if (pos->peer == peer)
605       break;
606   if (NULL == pos)
607     return;
608   if (NULL != pos->env)
609     GNUNET_MQ_send_cancel (pos->env);
610   GNUNET_CONTAINER_DLL_remove (peer_head,
611                                peer_tail,
612                                pos);
613   GNUNET_free (pos);
614 }
615
616
617 /**
618  * Setup the module.
619  */
620 void
621 GSF_push_init_ ()
622 {
623   enabled =
624     GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
625                                           "FS",
626                                           "CONTENT_PUSHING");
627   if (GNUNET_YES != enabled)
628     return;
629
630   if (GNUNET_OK !=
631       GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
632                                            "fs",
633                                            "MIN_MIGRATION_DELAY",
634                                            &min_migration_delay))
635   {
636     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
637                                "fs",
638                                "MIN_MIGRATION_DELAY",
639                                _("time required, content pushing disabled"));
640     return;
641   }
642   consider_gathering ();
643 }
644
645
646 /**
647  * Shutdown the module.
648  */
649 void
650 GSF_push_done_ ()
651 {
652   if (NULL != mig_task)
653   {
654     GNUNET_SCHEDULER_cancel (mig_task);
655     mig_task = NULL;
656   }
657   if (NULL != mig_qe)
658   {
659     GNUNET_DATASTORE_cancel (mig_qe);
660     mig_qe = NULL;
661   }
662   while (NULL != mig_head)
663     delete_migration_block (mig_head);
664   GNUNET_assert (0 == mig_size);
665 }
666
667 /* end of gnunet-service-fs_push.c */