psycstore: postgres: remove size modifier from BYTEA fields
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * Maximum number of blocks we keep in memory for migration.
36  */
37 #define MAX_MIGRATION_QUEUE 8
38
39 /**
40  * Blocks are at most migrated to this number of peers
41  * plus one, each time they are fetched from the database.
42  */
43 #define MIGRATION_LIST_SIZE 2
44
45 /**
46  * How long must content remain valid for us to consider it for migration?
47  * If content will expire too soon, there is clearly no point in pushing
48  * it to other peers.  This value gives the threshold for migration.  Note
49  * that if this value is increased, the migration testcase may need to be
50  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
51  */
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
53
54
55 /**
56  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
57  */
58 struct MigrationReadyBlock
59 {
60
61   /**
62    * This is a doubly-linked list.
63    */
64   struct MigrationReadyBlock *next;
65
66   /**
67    * This is a doubly-linked list.
68    */
69   struct MigrationReadyBlock *prev;
70
71   /**
72    * Query for the block.
73    */
74   struct GNUNET_HashCode query;
75
76   /**
77    * When does this block expire?
78    */
79   struct GNUNET_TIME_Absolute expiration;
80
81   /**
82    * Peers we already forwarded this
83    * block to.  Zero for empty entries.
84    */
85   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
86
87   /**
88    * Size of the block.
89    */
90   size_t size;
91
92   /**
93    *  Number of targets already used.
94    */
95   unsigned int used_targets;
96
97   /**
98    * Type of the block.
99    */
100   enum GNUNET_BLOCK_Type type;
101 };
102
103
104 /**
105  * Information about a peer waiting for migratable data.
106  */
107 struct MigrationReadyPeer
108 {
109   /**
110    * This is a doubly-linked list.
111    */
112   struct MigrationReadyPeer *next;
113
114   /**
115    * This is a doubly-linked list.
116    */
117   struct MigrationReadyPeer *prev;
118
119   /**
120    * Handle to peer.
121    */
122   struct GSF_ConnectedPeer *peer;
123
124   /**
125    * Envelope of the currently pushed message.
126    */
127   struct GNUNET_MQ_Envelope *env;
128 };
129
130
131 /**
132  * Head of linked list of blocks that can be migrated.
133  */
134 static struct MigrationReadyBlock *mig_head;
135
136 /**
137  * Tail of linked list of blocks that can be migrated.
138  */
139 static struct MigrationReadyBlock *mig_tail;
140
141 /**
142  * Head of linked list of peers.
143  */
144 static struct MigrationReadyPeer *peer_head;
145
146 /**
147  * Tail of linked list of peers.
148  */
149 static struct MigrationReadyPeer *peer_tail;
150
151 /**
152  * Request to datastore for migration (or NULL).
153  */
154 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
155
156 /**
157  * ID of task that collects blocks for migration.
158  */
159 static struct GNUNET_SCHEDULER_Task *mig_task;
160
161 /**
162  * What is the maximum frequency at which we are allowed to
163  * poll the datastore for migration content?
164  */
165 static struct GNUNET_TIME_Relative min_migration_delay;
166
167 /**
168  * Size of the doubly-linked list of migration blocks.
169  */
170 static unsigned int mig_size;
171
172 /**
173  * Is this module enabled?
174  */
175 static int enabled;
176
177 /**
178  * Did we find anything in the datastore?
179  */
180 static int value_found;
181
182
183 /**
184  * Delete the given migration block.
185  *
186  * @param mb block to delete
187  */
188 static void
189 delete_migration_block (struct MigrationReadyBlock *mb)
190 {
191   GNUNET_CONTAINER_DLL_remove (mig_head,
192                                mig_tail,
193                                mb);
194   GNUNET_PEER_decrement_rcs (mb->target_list,
195                              MIGRATION_LIST_SIZE);
196   mig_size--;
197   GNUNET_free (mb);
198 }
199
200
201 /**
202  * Find content for migration to this peer.
203  *
204  * @param cls a `struct MigrationReadyPeer *`
205  */
206 static void
207 find_content (void *cls);
208
209
210 /**
211  * Send the given block to the given peer.
212  *
213  * @param peer target peer
214  * @param block the block
215  * @return #GNUNET_YES if the block was deleted (!)
216  */
217 static int
218 transmit_content (struct MigrationReadyPeer *mrp,
219                   struct MigrationReadyBlock *block)
220 {
221   struct PutMessage *msg;
222   unsigned int i;
223   struct GSF_PeerPerformanceData *ppd;
224   int ret;
225
226   ppd = GSF_get_peer_performance_data_ (mrp->peer);
227   GNUNET_assert (NULL == mrp->env);
228   mrp->env = GNUNET_MQ_msg_extra (msg,
229                                   block->size,
230                                   GNUNET_MESSAGE_TYPE_FS_PUT);
231   msg->type = htonl (block->type);
232   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
233   GNUNET_memcpy (&msg[1],
234                  &block[1],
235                  block->size);
236   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
237   {
238     if (block->target_list[i] == 0)
239     {
240       block->target_list[i] = ppd->pid;
241       GNUNET_PEER_change_rc (block->target_list[i],
242                              1);
243       break;
244     }
245   }
246   if (MIGRATION_LIST_SIZE == i)
247   {
248     delete_migration_block (block);
249     ret = GNUNET_YES;
250   }
251   else
252   {
253     ret = GNUNET_NO;
254   }
255   GNUNET_MQ_notify_sent (mrp->env,
256                          &find_content,
257                          mrp);
258   GSF_peer_transmit_ (mrp->peer,
259                       GNUNET_NO,
260                       0 /* priority */ ,
261                       mrp->env);
262   return ret;
263 }
264
265
266 /**
267  * Count the number of peers this block has
268  * already been forwarded to.
269  *
270  * @param block the block
271  * @return number of times block was forwarded
272  */
273 static unsigned int
274 count_targets (struct MigrationReadyBlock *block)
275 {
276   unsigned int i;
277
278   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
279     if (block->target_list[i] == 0)
280       return i;
281   return i;
282 }
283
284
285 /**
286  * Check if sending this block to this peer would
287  * be a good idea.
288  *
289  * @param mrp target peer
290  * @param block the block
291  * @return score (>= 0: feasible, negative: infeasible)
292  */
293 static long
294 score_content (struct MigrationReadyPeer *mrp,
295                struct MigrationReadyBlock *block)
296 {
297   unsigned int i;
298   struct GSF_PeerPerformanceData *ppd;
299   struct GNUNET_PeerIdentity id;
300   struct GNUNET_HashCode hc;
301   uint32_t dist;
302
303   ppd = GSF_get_peer_performance_data_ (mrp->peer);
304   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
305     if (block->target_list[i] == ppd->pid)
306       return -1;
307   GNUNET_assert (0 != ppd->pid);
308   GNUNET_PEER_resolve (ppd->pid,
309                        &id);
310   GNUNET_CRYPTO_hash (&id,
311                       sizeof (struct GNUNET_PeerIdentity),
312                       &hc);
313   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
314                                           &hc);
315   /* closer distance, higher score: */
316   return UINT32_MAX - dist;
317 }
318
319
320 /**
321  * If the migration task is not currently running, consider
322  * (re)scheduling it with the appropriate delay.
323  */
324 static void
325 consider_gathering (void);
326
327
328 /**
329  * Find content for migration to this peer.
330  *
331  * @param cls peer to find content for
332  */
333 static void
334 find_content (void *cls)
335 {
336   struct MigrationReadyPeer *mrp = cls;
337   struct MigrationReadyBlock *pos;
338   long score;
339   long best_score;
340   struct MigrationReadyBlock *best;
341
342   mrp->env = NULL;
343   best = NULL;
344   best_score = -1;
345   pos = mig_head;
346   while (NULL != pos)
347   {
348     score = score_content (mrp, pos);
349     if (score > best_score)
350     {
351       best_score = score;
352       best = pos;
353     }
354     pos = pos->next;
355   }
356   if (NULL == best)
357   {
358     if (mig_size < MAX_MIGRATION_QUEUE)
359     {
360       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
361                   "No content found for pushing, waiting for queue to fill\n");
362       return;                   /* will fill up eventually... */
363     }
364     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365                 "No suitable content found, purging content from full queue\n");
366     /* failed to find migration target AND
367      * queue is full, purge most-forwarded
368      * block from queue to make room for more */
369     pos = mig_head;
370     while (NULL != pos)
371     {
372       score = count_targets (pos);
373       if (score >= best_score)
374       {
375         best_score = score;
376         best = pos;
377       }
378       pos = pos->next;
379     }
380     GNUNET_assert (NULL != best);
381     delete_migration_block (best);
382     consider_gathering ();
383     return;
384   }
385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386               "Preparing to push best content to peer\n");
387   transmit_content (mrp,
388                     best);
389 }
390
391
392 /**
393  * Task that is run periodically to obtain blocks for content
394  * migration
395  *
396  * @param cls unused
397  */
398 static void
399 gather_migration_blocks (void *cls);
400
401
402 /**
403  * If the migration task is not currently running, consider
404  * (re)scheduling it with the appropriate delay.
405  */
406 static void
407 consider_gathering ()
408 {
409   struct GNUNET_TIME_Relative delay;
410
411   if (NULL == GSF_dsh)
412     return;
413   if (NULL != mig_qe)
414     return;
415   if (NULL != mig_task)
416     return;
417   if (mig_size >= MAX_MIGRATION_QUEUE)
418     return;
419   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
420                                          mig_size);
421   delay = GNUNET_TIME_relative_divide (delay,
422                                        MAX_MIGRATION_QUEUE);
423   delay = GNUNET_TIME_relative_max (delay,
424                                     min_migration_delay);
425   if (GNUNET_NO == value_found)
426   {
427     /* wait at least 5s if the datastore is empty */
428     delay = GNUNET_TIME_relative_max (delay,
429                                       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
430                                                                      5));
431   }
432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433               "Scheduling gathering task (queue size: %u)\n",
434               mig_size);
435   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
436                                            &gather_migration_blocks,
437                                            NULL);
438 }
439
440
441 /**
442  * Process content offered for migration.
443  *
444  * @param cls closure
445  * @param key key for the content
446  * @param size number of bytes in data
447  * @param data content stored
448  * @param type type of the content
449  * @param priority priority of the content
450  * @param anonymity anonymity-level for the content
451  * @param expiration expiration time for the content
452  * @param uid unique identifier for the datum;
453  *        maybe 0 if no unique identifier is available
454  */
455 static void
456 process_migration_content (void *cls,
457                            const struct GNUNET_HashCode *key,
458                            size_t size,
459                            const void *data,
460                            enum GNUNET_BLOCK_Type type,
461                            uint32_t priority,
462                            uint32_t anonymity,
463                            struct GNUNET_TIME_Absolute expiration,
464                            uint64_t uid)
465 {
466   struct MigrationReadyBlock *mb;
467   struct MigrationReadyPeer *pos;
468
469   mig_qe = NULL;
470   if (NULL == key)
471   {
472     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
473                 "No content found for migration...\n");
474     consider_gathering ();
475     return;
476   }
477   value_found = GNUNET_YES;
478   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
479       MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
480   {
481     /* content will expire soon, don't bother */
482     consider_gathering ();
483     return;
484   }
485   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
486   {
487     if (GNUNET_OK !=
488         GNUNET_FS_handle_on_demand_block (key,
489                                           size,
490                                           data,
491                                           type,
492                                           priority,
493                                           anonymity,
494                                           expiration,
495                                           uid,
496                                           &process_migration_content, NULL))
497       consider_gathering ();
498     return;
499   }
500   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
502               GNUNET_h2s (key),
503               type, mig_size + 1,
504               MAX_MIGRATION_QUEUE);
505   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
506   mb->query = *key;
507   mb->expiration = expiration;
508   mb->size = size;
509   mb->type = type;
510   GNUNET_memcpy (&mb[1], data, size);
511   GNUNET_CONTAINER_DLL_insert_after (mig_head,
512                                      mig_tail,
513                                      mig_tail,
514                                      mb);
515   mig_size++;
516   for (pos = peer_head; NULL != pos; pos = pos->next)
517   {
518     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519                 "Preparing to push best content to peer %s\n",
520                 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
521     if ( (NULL == pos->env) &&
522          (GNUNET_YES == transmit_content (pos,
523                                           mb)) ) {
524       break;                  /* 'mb' was freed! */
525     }
526   }
527   consider_gathering ();
528 }
529
530
531 /**
532  * Task that is run periodically to obtain blocks for content
533  * migration
534  *
535  * @param cls unused
536  */
537 static void
538 gather_migration_blocks (void *cls)
539 {
540   mig_task = NULL;
541   if (mig_size >= MAX_MIGRATION_QUEUE)
542     return;
543   if (NULL == GSF_dsh)
544     return;
545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
546               "Asking datastore for content for replication (queue size: %u)\n",
547               mig_size);
548   value_found = GNUNET_NO;
549   mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
550                                                  0,
551                                                  UINT_MAX,
552                                                  &process_migration_content,
553                                                  NULL);
554   if (NULL == mig_qe)
555     consider_gathering ();
556 }
557
558
559 /**
560  * A peer connected to us.  Start pushing content
561  * to this peer.
562  *
563  * @param peer handle for the peer that connected
564  */
565 void
566 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
567 {
568   struct MigrationReadyPeer *mrp;
569
570   if (GNUNET_YES != enabled)
571     return;
572   for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
573     if (mrp->peer == peer)
574       break;
575   if (NULL != mrp)
576   {
577     /* same peer added twice, must not happen */
578     GNUNET_break (0);
579     return;
580   }
581
582   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
583               "Adding peer %s to list for pushing\n",
584               GNUNET_i2s (GSF_connected_peer_get_identity2_(peer)));
585
586   mrp = GNUNET_new (struct MigrationReadyPeer);
587   mrp->peer = peer;
588   find_content (mrp);
589   GNUNET_CONTAINER_DLL_insert (peer_head,
590                                peer_tail,
591                                mrp);
592 }
593
594
595 /**
596  * A peer disconnected from us.  Stop pushing content
597  * to this peer.
598  *
599  * @param peer handle for the peer that disconnected
600  */
601 void
602 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
603 {
604   struct MigrationReadyPeer *pos;
605
606   for (pos = peer_head; NULL != pos; pos = pos->next)
607     if (pos->peer == peer)
608       break;
609   if (NULL == pos)
610     return;
611   if (NULL != pos->env)
612     GNUNET_MQ_send_cancel (pos->env);
613   GNUNET_CONTAINER_DLL_remove (peer_head,
614                                peer_tail,
615                                pos);
616   GNUNET_free (pos);
617 }
618
619
620 /**
621  * Setup the module.
622  */
623 void
624 GSF_push_init_ ()
625 {
626   enabled =
627     GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
628                                           "FS",
629                                           "CONTENT_PUSHING");
630   if (GNUNET_YES != enabled)
631     return;
632
633   if (GNUNET_OK !=
634       GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
635                                            "fs",
636                                            "MIN_MIGRATION_DELAY",
637                                            &min_migration_delay))
638   {
639     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
640                                "fs",
641                                "MIN_MIGRATION_DELAY",
642                                _("time required, content pushing disabled"));
643     return;
644   }
645   consider_gathering ();
646 }
647
648
649 /**
650  * Shutdown the module.
651  */
652 void
653 GSF_push_done_ ()
654 {
655   if (NULL != mig_task)
656   {
657     GNUNET_SCHEDULER_cancel (mig_task);
658     mig_task = NULL;
659   }
660   if (NULL != mig_qe)
661   {
662     GNUNET_DATASTORE_cancel (mig_qe);
663     mig_qe = NULL;
664   }
665   while (NULL != mig_head)
666     delete_migration_block (mig_head);
667   GNUNET_assert (0 == mig_size);
668 }
669
670 /* end of gnunet-service-fs_push.c */