-avoid calling memcpy() with NULL argument, even if len is 0
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * Maximum number of blocks we keep in memory for migration.
36  */
37 #define MAX_MIGRATION_QUEUE 8
38
39 /**
40  * Blocks are at most migrated to this number of peers
41  * plus one, each time they are fetched from the database.
42  */
43 #define MIGRATION_LIST_SIZE 2
44
45 /**
46  * How long must content remain valid for us to consider it for migration?
47  * If content will expire too soon, there is clearly no point in pushing
48  * it to other peers.  This value gives the threshold for migration.  Note
49  * that if this value is increased, the migration testcase may need to be
50  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
51  */
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
53
54
55 /**
56  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
57  */
58 struct MigrationReadyBlock
59 {
60
61   /**
62    * This is a doubly-linked list.
63    */
64   struct MigrationReadyBlock *next;
65
66   /**
67    * This is a doubly-linked list.
68    */
69   struct MigrationReadyBlock *prev;
70
71   /**
72    * Query for the block.
73    */
74   struct GNUNET_HashCode query;
75
76   /**
77    * When does this block expire?
78    */
79   struct GNUNET_TIME_Absolute expiration;
80
81   /**
82    * Peers we already forwarded this
83    * block to.  Zero for empty entries.
84    */
85   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
86
87   /**
88    * Size of the block.
89    */
90   size_t size;
91
92   /**
93    *  Number of targets already used.
94    */
95   unsigned int used_targets;
96
97   /**
98    * Type of the block.
99    */
100   enum GNUNET_BLOCK_Type type;
101 };
102
103
104 /**
105  * Information about a peer waiting for
106  * migratable data.
107  */
108 struct MigrationReadyPeer
109 {
110   /**
111    * This is a doubly-linked list.
112    */
113   struct MigrationReadyPeer *next;
114
115   /**
116    * This is a doubly-linked list.
117    */
118   struct MigrationReadyPeer *prev;
119
120   /**
121    * Handle to peer.
122    */
123   struct GSF_ConnectedPeer *peer;
124
125   /**
126    * Handle for current transmission request,
127    * or NULL for none.
128    */
129   struct GSF_PeerTransmitHandle *th;
130
131   /**
132    * Message we are trying to push right now (or NULL)
133    */
134   struct PutMessage *msg;
135 };
136
137
138 /**
139  * Head of linked list of blocks that can be migrated.
140  */
141 static struct MigrationReadyBlock *mig_head;
142
143 /**
144  * Tail of linked list of blocks that can be migrated.
145  */
146 static struct MigrationReadyBlock *mig_tail;
147
148 /**
149  * Head of linked list of peers.
150  */
151 static struct MigrationReadyPeer *peer_head;
152
153 /**
154  * Tail of linked list of peers.
155  */
156 static struct MigrationReadyPeer *peer_tail;
157
158 /**
159  * Request to datastore for migration (or NULL).
160  */
161 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
162
163 /**
164  * ID of task that collects blocks for migration.
165  */
166 static struct GNUNET_SCHEDULER_Task * mig_task;
167
168 /**
169  * What is the maximum frequency at which we are allowed to
170  * poll the datastore for migration content?
171  */
172 static struct GNUNET_TIME_Relative min_migration_delay;
173
174 /**
175  * Size of the doubly-linked list of migration blocks.
176  */
177 static unsigned int mig_size;
178
179 /**
180  * Is this module enabled?
181  */
182 static int enabled;
183
184 /**
185  * Did we find anything in the datastore?
186  */
187 static int value_found;
188
189
190 /**
191  * Delete the given migration block.
192  *
193  * @param mb block to delete
194  */
195 static void
196 delete_migration_block (struct MigrationReadyBlock *mb)
197 {
198   GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb);
199   GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE);
200   mig_size--;
201   GNUNET_free (mb);
202 }
203
204
205 /**
206  * Find content for migration to this peer.
207  */
208 static void
209 find_content (struct MigrationReadyPeer *mrp);
210
211
212 /**
213  * Transmit the message currently scheduled for transmission.
214  *
215  * @param cls the `struct MigrationReadyPeer`
216  * @param buf_size number of bytes available in @a buf
217  * @param buf where to copy the message, NULL on error (peer disconnect)
218  * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
219  */
220 static size_t
221 transmit_message (void *cls,
222                   size_t buf_size,
223                   void *buf)
224 {
225   struct MigrationReadyPeer *peer = cls;
226   struct PutMessage *msg;
227   uint16_t msize;
228
229   peer->th = NULL;
230   msg = peer->msg;
231   peer->msg = NULL;
232   if (NULL == buf)
233   {
234     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
235                 "Failed to migrate content to another peer (disconnect)\n");
236     GNUNET_free (msg);
237     return 0;
238   }
239   msize = ntohs (msg->header.size);
240   GNUNET_assert (msize <= buf_size);
241   GNUNET_memcpy (buf, msg, msize);
242   GNUNET_free (msg);
243   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244               "Pushing %u bytes to %s\n",
245               msize,
246               GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
247   find_content (peer);
248   return msize;
249 }
250
251
252 /**
253  * Send the given block to the given peer.
254  *
255  * @param peer target peer
256  * @param block the block
257  * @return #GNUNET_YES if the block was deleted (!)
258  */
259 static int
260 transmit_content (struct MigrationReadyPeer *peer,
261                   struct MigrationReadyBlock *block)
262 {
263   size_t msize;
264   struct PutMessage *msg;
265   unsigned int i;
266   struct GSF_PeerPerformanceData *ppd;
267   int ret;
268
269   ppd = GSF_get_peer_performance_data_ (peer->peer);
270   GNUNET_assert (NULL == peer->th);
271   msize = sizeof (struct PutMessage) + block->size;
272   msg = GNUNET_malloc (msize);
273   msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
274   msg->header.size = htons (msize);
275   msg->type = htonl (block->type);
276   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
277   GNUNET_memcpy (&msg[1], &block[1], block->size);
278   peer->msg = msg;
279   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
280   {
281     if (block->target_list[i] == 0)
282     {
283       block->target_list[i] = ppd->pid;
284       GNUNET_PEER_change_rc (block->target_list[i], 1);
285       break;
286     }
287   }
288   if (MIGRATION_LIST_SIZE == i)
289   {
290     delete_migration_block (block);
291     ret = GNUNET_YES;
292   }
293   else
294   {
295     ret = GNUNET_NO;
296   }
297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
298               "Asking for transmission of %u bytes to %s for migration\n",
299               (unsigned int) msize,
300               GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
301   peer->th = GSF_peer_transmit_ (peer->peer,
302                                  GNUNET_NO, 0 /* priority */ ,
303                                  GNUNET_TIME_UNIT_FOREVER_REL,
304                                  msize,
305                                  &transmit_message, peer);
306   return ret;
307 }
308
309
310 /**
311  * Count the number of peers this block has
312  * already been forwarded to.
313  *
314  * @param block the block
315  * @return number of times block was forwarded
316  */
317 static unsigned int
318 count_targets (struct MigrationReadyBlock *block)
319 {
320   unsigned int i;
321
322   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
323     if (block->target_list[i] == 0)
324       return i;
325   return i;
326 }
327
328
329 /**
330  * Check if sending this block to this peer would
331  * be a good idea.
332  *
333  * @param peer target peer
334  * @param block the block
335  * @return score (>= 0: feasible, negative: infeasible)
336  */
337 static long
338 score_content (struct MigrationReadyPeer *peer,
339                struct MigrationReadyBlock *block)
340 {
341   unsigned int i;
342   struct GSF_PeerPerformanceData *ppd;
343   struct GNUNET_PeerIdentity id;
344   struct GNUNET_HashCode hc;
345   uint32_t dist;
346
347   ppd = GSF_get_peer_performance_data_ (peer->peer);
348   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
349     if (block->target_list[i] == ppd->pid)
350       return -1;
351   GNUNET_assert (0 != ppd->pid);
352   GNUNET_PEER_resolve (ppd->pid, &id);
353   GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc);
354   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc);
355   /* closer distance, higher score: */
356   return UINT32_MAX - dist;
357 }
358
359
360 /**
361  * If the migration task is not currently running, consider
362  * (re)scheduling it with the appropriate delay.
363  */
364 static void
365 consider_gathering (void);
366
367
368 /**
369  * Find content for migration to this peer.
370  *
371  * @param mrp peer to find content for
372  */
373 static void
374 find_content (struct MigrationReadyPeer *mrp)
375 {
376   struct MigrationReadyBlock *pos;
377   long score;
378   long best_score;
379   struct MigrationReadyBlock *best;
380
381   GNUNET_assert (NULL == mrp->th);
382   best = NULL;
383   best_score = -1;
384   pos = mig_head;
385   while (NULL != pos)
386   {
387     score = score_content (mrp, pos);
388     if (score > best_score)
389     {
390       best_score = score;
391       best = pos;
392     }
393     pos = pos->next;
394   }
395   if (NULL == best)
396   {
397     if (mig_size < MAX_MIGRATION_QUEUE)
398     {
399       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
400                   "No content found for pushing, waiting for queue to fill\n");
401       return;                   /* will fill up eventually... */
402     }
403     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404                 "No suitable content found, purging content from full queue\n");
405     /* failed to find migration target AND
406      * queue is full, purge most-forwarded
407      * block from queue to make room for more */
408     pos = mig_head;
409     while (NULL != pos)
410     {
411       score = count_targets (pos);
412       if (score >= best_score)
413       {
414         best_score = score;
415         best = pos;
416       }
417       pos = pos->next;
418     }
419     GNUNET_assert (NULL != best);
420     delete_migration_block (best);
421     consider_gathering ();
422     return;
423   }
424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425               "Preparing to push best content to peer\n");
426   transmit_content (mrp, best);
427 }
428
429
430 /**
431  * Task that is run periodically to obtain blocks for content
432  * migration
433  *
434  * @param cls unused
435  */
436 static void
437 gather_migration_blocks (void *cls);
438
439
440 /**
441  * If the migration task is not currently running, consider
442  * (re)scheduling it with the appropriate delay.
443  */
444 static void
445 consider_gathering ()
446 {
447   struct GNUNET_TIME_Relative delay;
448
449   if (NULL == GSF_dsh)
450     return;
451   if (NULL != mig_qe)
452     return;
453   if (NULL != mig_task)
454     return;
455   if (mig_size >= MAX_MIGRATION_QUEUE)
456     return;
457   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size);
458   delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE);
459   delay = GNUNET_TIME_relative_max (delay, min_migration_delay);
460   if (GNUNET_NO == value_found)
461   {
462     /* wait at least 5s if the datastore is empty */
463     delay = GNUNET_TIME_relative_max (delay,
464                                       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
465                                                                      5));
466   }
467   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
468               "Scheduling gathering task (queue size: %u)\n",
469               mig_size);
470   mig_task =
471       GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL);
472 }
473
474
475 /**
476  * Process content offered for migration.
477  *
478  * @param cls closure
479  * @param key key for the content
480  * @param size number of bytes in data
481  * @param data content stored
482  * @param type type of the content
483  * @param priority priority of the content
484  * @param anonymity anonymity-level for the content
485  * @param expiration expiration time for the content
486  * @param uid unique identifier for the datum;
487  *        maybe 0 if no unique identifier is available
488  */
489 static void
490 process_migration_content (void *cls,
491                            const struct GNUNET_HashCode *key,
492                            size_t size,
493                            const void *data,
494                            enum GNUNET_BLOCK_Type type,
495                            uint32_t priority,
496                            uint32_t anonymity,
497                            struct GNUNET_TIME_Absolute expiration,
498                            uint64_t uid)
499 {
500   struct MigrationReadyBlock *mb;
501   struct MigrationReadyPeer *pos;
502
503   mig_qe = NULL;
504   if (NULL == key)
505   {
506     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507                 "No content found for migration...\n");
508     consider_gathering ();
509     return;
510   }
511   value_found = GNUNET_YES;
512   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
513       MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
514   {
515     /* content will expire soon, don't bother */
516     consider_gathering ();
517     return;
518   }
519   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
520   {
521     if (GNUNET_OK !=
522         GNUNET_FS_handle_on_demand_block (key,
523                                           size,
524                                           data,
525                                           type,
526                                           priority,
527                                           anonymity,
528                                           expiration,
529                                           uid,
530                                           &process_migration_content, NULL))
531       consider_gathering ();
532     return;
533   }
534   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
535               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
536               GNUNET_h2s (key),
537               type, mig_size + 1,
538               MAX_MIGRATION_QUEUE);
539   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
540   mb->query = *key;
541   mb->expiration = expiration;
542   mb->size = size;
543   mb->type = type;
544   GNUNET_memcpy (&mb[1], data, size);
545   GNUNET_CONTAINER_DLL_insert_after (mig_head,
546                                      mig_tail,
547                                      mig_tail,
548                                      mb);
549   mig_size++;
550   for (pos = peer_head; NULL != pos; pos = pos->next)
551   {
552     if (NULL == pos->th)
553     {
554       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555                   "Preparing to push best content to peer %s\n",
556                   GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
557       if (GNUNET_YES == transmit_content (pos, mb))
558         break;                  /* 'mb' was freed! */
559     }
560   }
561   consider_gathering ();
562 }
563
564
565 /**
566  * Task that is run periodically to obtain blocks for content
567  * migration
568  *
569  * @param cls unused
570  */
571 static void
572 gather_migration_blocks (void *cls)
573 {
574   mig_task = NULL;
575   if (mig_size >= MAX_MIGRATION_QUEUE)
576     return;
577   if (NULL == GSF_dsh)
578     return;
579   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
580               "Asking datastore for content for replication (queue size: %u)\n",
581               mig_size);
582   value_found = GNUNET_NO;
583   mig_qe =
584     GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX,
585                                           &process_migration_content, NULL);
586   if (NULL == mig_qe)
587     consider_gathering ();
588 }
589
590
591 /**
592  * A peer connected to us.  Start pushing content
593  * to this peer.
594  *
595  * @param peer handle for the peer that connected
596  */
597 void
598 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
599 {
600   struct MigrationReadyPeer *mrp;
601
602   if (GNUNET_YES != enabled)
603     return;
604   for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
605     if (mrp->peer == peer)
606       break;
607   if (NULL != mrp)
608   {
609     /* same peer added twice, must not happen */
610     GNUNET_break (0);
611     return;
612   }
613
614   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
615               "Adding peer %s to list for pushing\n",
616               GNUNET_i2s (GSF_connected_peer_get_identity2_(peer)));
617
618   mrp = GNUNET_new (struct MigrationReadyPeer);
619   mrp->peer = peer;
620   find_content (mrp);
621   GNUNET_CONTAINER_DLL_insert (peer_head,
622                                peer_tail,
623                                mrp);
624 }
625
626
627 /**
628  * A peer disconnected from us.  Stop pushing content
629  * to this peer.
630  *
631  * @param peer handle for the peer that disconnected
632  */
633 void
634 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
635 {
636   struct MigrationReadyPeer *pos;
637
638   for (pos = peer_head; NULL != pos; pos = pos->next)
639     if (pos->peer == peer)
640       break;
641   if (NULL == pos)
642     return;
643   GNUNET_CONTAINER_DLL_remove (peer_head,
644                                peer_tail,
645                                pos);
646   if (NULL != pos->th)
647   {
648     GSF_peer_transmit_cancel_ (pos->th);
649     pos->th = NULL;
650   }
651   if (NULL != pos->msg)
652   {
653     GNUNET_free (pos->msg);
654     pos->msg = NULL;
655   }
656   GNUNET_free (pos);
657 }
658
659
660 /**
661  * Setup the module.
662  */
663 void
664 GSF_push_init_ ()
665 {
666   enabled =
667       GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING");
668   if (GNUNET_YES != enabled)
669     return;
670
671   if (GNUNET_OK !=
672       GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY",
673                                            &min_migration_delay))
674   {
675     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
676                                "fs", "MIN_MIGRATION_DELAY",
677                                _("time required, content pushing disabled"));
678     return;
679   }
680   consider_gathering ();
681 }
682
683
684 /**
685  * Shutdown the module.
686  */
687 void
688 GSF_push_done_ ()
689 {
690   if (NULL != mig_task)
691   {
692     GNUNET_SCHEDULER_cancel (mig_task);
693     mig_task = NULL;
694   }
695   if (NULL != mig_qe)
696   {
697     GNUNET_DATASTORE_cancel (mig_qe);
698     mig_qe = NULL;
699   }
700   while (NULL != mig_head)
701     delete_migration_block (mig_head);
702   GNUNET_assert (0 == mig_size);
703 }
704
705 /* end of gnunet-service-fs_push.c */