-eliminate printing warning on MAGIC missmatch (#2138)
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 #define DEBUG_FS_MIGRATION GNUNET_EXTRA_LOGGING
35
36 /**
37  * Maximum number of blocks we keep in memory for migration.
38  */
39 #define MAX_MIGRATION_QUEUE 8
40
41 /**
42  * Blocks are at most migrated to this number of peers
43  * plus one, each time they are fetched from the database.
44  */
45 #define MIGRATION_LIST_SIZE 2
46
47 /**
48  * How long must content remain valid for us to consider it for migration?
49  * If content will expire too soon, there is clearly no point in pushing
50  * it to other peers.  This value gives the threshold for migration.  Note
51  * that if this value is increased, the migration testcase may need to be
52  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
53  */
54 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
55
56
57 /**
58  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
59  */
60 struct MigrationReadyBlock
61 {
62
63   /**
64    * This is a doubly-linked list.
65    */
66   struct MigrationReadyBlock *next;
67
68   /**
69    * This is a doubly-linked list.
70    */
71   struct MigrationReadyBlock *prev;
72
73   /**
74    * Query for the block.
75    */
76   GNUNET_HashCode query;
77
78   /**
79    * When does this block expire?
80    */
81   struct GNUNET_TIME_Absolute expiration;
82
83   /**
84    * Peers we already forwarded this
85    * block to.  Zero for empty entries.
86    */
87   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
88
89   /**
90    * Size of the block.
91    */
92   size_t size;
93
94   /**
95    *  Number of targets already used.
96    */
97   unsigned int used_targets;
98
99   /**
100    * Type of the block.
101    */
102   enum GNUNET_BLOCK_Type type;
103 };
104
105
106 /**
107  * Information about a peer waiting for
108  * migratable data.
109  */
110 struct MigrationReadyPeer
111 {
112   /**
113    * This is a doubly-linked list.
114    */
115   struct MigrationReadyPeer *next;
116
117   /**
118    * This is a doubly-linked list.
119    */
120   struct MigrationReadyPeer *prev;
121
122   /**
123    * Handle to peer.
124    */
125   struct GSF_ConnectedPeer *peer;
126
127   /**
128    * Handle for current transmission request,
129    * or NULL for none.
130    */
131   struct GSF_PeerTransmitHandle *th;
132
133   /**
134    * Message we are trying to push right now (or NULL)
135    */
136   struct PutMessage *msg;
137 };
138
139
140 /**
141  * Head of linked list of blocks that can be migrated.
142  */
143 static struct MigrationReadyBlock *mig_head;
144
145 /**
146  * Tail of linked list of blocks that can be migrated.
147  */
148 static struct MigrationReadyBlock *mig_tail;
149
150 /**
151  * Head of linked list of peers.
152  */
153 static struct MigrationReadyPeer *peer_head;
154
155 /**
156  * Tail of linked list of peers.
157  */
158 static struct MigrationReadyPeer *peer_tail;
159
160 /**
161  * Request to datastore for migration (or NULL).
162  */
163 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
164
165 /**
166  * ID of task that collects blocks for migration.
167  */
168 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
169
170 /**
171  * What is the maximum frequency at which we are allowed to
172  * poll the datastore for migration content?
173  */
174 static struct GNUNET_TIME_Relative min_migration_delay;
175
176 /**
177  * Size of the doubly-linked list of migration blocks.
178  */
179 static unsigned int mig_size;
180
181 /**
182  * Is this module enabled?
183  */
184 static int enabled;
185
186
187 /**
188  * Delete the given migration block.
189  *
190  * @param mb block to delete
191  */
192 static void
193 delete_migration_block (struct MigrationReadyBlock *mb)
194 {
195   GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb);
196   GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE);
197   mig_size--;
198   GNUNET_free (mb);
199 }
200
201
202 /**
203  * Find content for migration to this peer.
204  */
205 static void
206 find_content (struct MigrationReadyPeer *mrp);
207
208
209 /**
210  * Transmit the message currently scheduled for
211  * transmission.
212  *
213  * @param cls the 'struct MigrationReadyPeer'
214  * @param buf_size number of bytes available in buf
215  * @param buf where to copy the message, NULL on error (peer disconnect)
216  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
217  */
218 static size_t
219 transmit_message (void *cls, size_t buf_size, void *buf)
220 {
221   struct MigrationReadyPeer *peer = cls;
222   struct PutMessage *msg;
223   uint16_t msize;
224
225   peer->th = NULL;
226   msg = peer->msg;
227   peer->msg = NULL;
228   if (buf == NULL)
229   {
230 #if DEBUG_FS_MIGRATION
231     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
232                 "Failed to migrate content to another peer (disconnect)\n");
233 #endif
234     GNUNET_free (msg);
235     return 0;
236   }
237   msize = ntohs (msg->header.size);
238   GNUNET_assert (msize <= buf_size);
239   memcpy (buf, msg, msize);
240   GNUNET_free (msg);
241 #if DEBUG_FS_MIGRATION
242   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pushing %u bytes to another peer\n",
243               msize);
244 #endif
245   find_content (peer);
246   return msize;
247 }
248
249
250 /**
251  * Send the given block to the given peer.
252  *
253  * @param peer target peer
254  * @param block the block
255  * @return GNUNET_YES if the block was deleted (!)
256  */
257 static int
258 transmit_content (struct MigrationReadyPeer *peer,
259                   struct MigrationReadyBlock *block)
260 {
261   size_t msize;
262   struct PutMessage *msg;
263   unsigned int i;
264   struct GSF_PeerPerformanceData *ppd;
265   int ret;
266
267   ppd = GSF_get_peer_performance_data_ (peer->peer);
268   GNUNET_assert (NULL == peer->th);
269   msize = sizeof (struct PutMessage) + block->size;
270   msg = GNUNET_malloc (msize);
271   msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
272   msg->header.size = htons (msize);
273   msg->type = htonl (block->type);
274   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
275   memcpy (&msg[1], &block[1], block->size);
276   peer->msg = msg;
277   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
278   {
279     if (block->target_list[i] == 0)
280     {
281       block->target_list[i] = ppd->pid;
282       GNUNET_PEER_change_rc (block->target_list[i], 1);
283       break;
284     }
285   }
286   if (MIGRATION_LIST_SIZE == i)
287   {
288     delete_migration_block (block);
289     ret = GNUNET_YES;
290   }
291   else
292   {
293     ret = GNUNET_NO;
294   }
295 #if DEBUG_FS_MIGRATION
296   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
297               "Asking for transmission of %u bytes for migration\n", msize);
298 #endif
299   peer->th = GSF_peer_transmit_ (peer->peer, GNUNET_NO, 0 /* priority */ ,
300                                  GNUNET_TIME_UNIT_FOREVER_REL, msize,
301                                  &transmit_message, peer);
302   return ret;
303 }
304
305
306 /**
307  * Count the number of peers this block has
308  * already been forwarded to.
309  *
310  * @param block the block
311  * @return number of times block was forwarded
312  */
313 static unsigned int
314 count_targets (struct MigrationReadyBlock *block)
315 {
316   unsigned int i;
317
318   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
319     if (block->target_list[i] == 0)
320       return i;
321   return i;
322 }
323
324
325 /**
326  * Check if sending this block to this peer would
327  * be a good idea.
328  *
329  * @param peer target peer
330  * @param block the block
331  * @return score (>= 0: feasible, negative: infeasible)
332  */
333 static long
334 score_content (struct MigrationReadyPeer *peer,
335                struct MigrationReadyBlock *block)
336 {
337   unsigned int i;
338   struct GSF_PeerPerformanceData *ppd;
339   struct GNUNET_PeerIdentity id;
340   uint32_t dist;
341
342   ppd = GSF_get_peer_performance_data_ (peer->peer);
343   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
344     if (block->target_list[i] == ppd->pid)
345       return -1;
346   GNUNET_assert (0 != ppd->pid);
347   GNUNET_PEER_resolve (ppd->pid, &id);
348   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &id.hashPubKey);
349   /* closer distance, higher score: */
350   return UINT32_MAX - dist;
351 }
352
353
354 /**
355  * If the migration task is not currently running, consider
356  * (re)scheduling it with the appropriate delay.
357  */
358 static void
359 consider_gathering (void);
360
361
362 /**
363  * Find content for migration to this peer.
364  *
365  * @param mrp peer to find content for
366  */
367 static void
368 find_content (struct MigrationReadyPeer *mrp)
369 {
370   struct MigrationReadyBlock *pos;
371   long score;
372   long best_score;
373   struct MigrationReadyBlock *best;
374
375   GNUNET_assert (NULL == mrp->th);
376   best = NULL;
377   best_score = -1;
378   pos = mig_head;
379   while (NULL != pos)
380   {
381     score = score_content (mrp, pos);
382     if (score > best_score)
383     {
384       best_score = score;
385       best = pos;
386     }
387     pos = pos->next;
388   }
389   if (NULL == best)
390   {
391     if (mig_size < MAX_MIGRATION_QUEUE)
392     {
393 #if DEBUG_FS_MIGRATION
394       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
395                   "No content found for pushing, waiting for queue to fill\n");
396 #endif
397       return;                   /* will fill up eventually... */
398     }
399 #if DEBUG_FS_MIGRATION
400     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
401                 "No suitable content found, purging content from full queue\n");
402 #endif
403     /* failed to find migration target AND
404      * queue is full, purge most-forwarded
405      * block from queue to make room for more */
406     pos = mig_head;
407     while (NULL != pos)
408     {
409       score = count_targets (pos);
410       if (score >= best_score)
411       {
412         best_score = score;
413         best = pos;
414       }
415       pos = pos->next;
416     }
417     GNUNET_assert (NULL != best);
418     delete_migration_block (best);
419     consider_gathering ();
420     return;
421   }
422 #if DEBUG_FS_MIGRATION
423   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
424               "Preparing to push best content to peer\n");
425 #endif
426   transmit_content (mrp, best);
427 }
428
429
430 /**
431  * Task that is run periodically to obtain blocks for content
432  * migration
433  *
434  * @param cls unused
435  * @param tc scheduler context (also unused)
436  */
437 static void
438 gather_migration_blocks (void *cls,
439                          const struct GNUNET_SCHEDULER_TaskContext *tc);
440
441
442 /**
443  * If the migration task is not currently running, consider
444  * (re)scheduling it with the appropriate delay.
445  */
446 static void
447 consider_gathering ()
448 {
449   struct GNUNET_TIME_Relative delay;
450
451   if (GSF_dsh == NULL)
452     return;
453   if (mig_qe != NULL)
454     return;
455   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
456     return;
457   if (mig_size >= MAX_MIGRATION_QUEUE)
458     return;
459   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size);
460   delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE);
461   delay = GNUNET_TIME_relative_max (delay, min_migration_delay);
462 #if DEBUG_FS_MIGRATION
463   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
464               "Scheduling gathering task (queue size: %u)\n", mig_size);
465 #endif
466   mig_task =
467       GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL);
468 }
469
470
471 /**
472  * Process content offered for migration.
473  *
474  * @param cls closure
475  * @param key key for the content
476  * @param size number of bytes in data
477  * @param data content stored
478  * @param type type of the content
479  * @param priority priority of the content
480  * @param anonymity anonymity-level for the content
481  * @param expiration expiration time for the content
482  * @param uid unique identifier for the datum;
483  *        maybe 0 if no unique identifier is available
484  */
485 static void
486 process_migration_content (void *cls, const GNUNET_HashCode * key, size_t size,
487                            const void *data, enum GNUNET_BLOCK_Type type,
488                            uint32_t priority, uint32_t anonymity,
489                            struct GNUNET_TIME_Absolute expiration, uint64_t uid)
490 {
491   struct MigrationReadyBlock *mb;
492   struct MigrationReadyPeer *pos;
493
494   mig_qe = NULL;
495   if (key == NULL)
496   {
497 #if DEBUG_FS_MIGRATION
498     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for migration...\n");
499 #endif
500     consider_gathering ();
501     return;
502   }
503   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
504       MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
505   {
506     /* content will expire soon, don't bother */
507     consider_gathering ();
508     return;
509   }
510   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
511   {
512     if (GNUNET_OK !=
513         GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
514                                           anonymity, expiration, uid,
515                                           &process_migration_content, NULL))
516       consider_gathering ();
517     return;
518   }
519 #if DEBUG_FS_MIGRATION
520   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
522               GNUNET_h2s (key), type, mig_size + 1, MAX_MIGRATION_QUEUE);
523 #endif
524   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
525   mb->query = *key;
526   mb->expiration = expiration;
527   mb->size = size;
528   mb->type = type;
529   memcpy (&mb[1], data, size);
530   GNUNET_CONTAINER_DLL_insert_after (mig_head, mig_tail, mig_tail, mb);
531   mig_size++;
532   pos = peer_head;
533   while (pos != NULL)
534   {
535     if (NULL == pos->th)
536     {
537 #if DEBUG_FS_MIGRATION
538       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
539                   "Preparing to push best content to peer\n");
540 #endif
541       if (GNUNET_YES == transmit_content (pos, mb))
542         break;                  /* 'mb' was freed! */
543     }
544     pos = pos->next;
545   }
546   consider_gathering ();
547 }
548
549
550 /**
551  * Task that is run periodically to obtain blocks for content
552  * migration
553  *
554  * @param cls unused
555  * @param tc scheduler context (also unused)
556  */
557 static void
558 gather_migration_blocks (void *cls,
559                          const struct GNUNET_SCHEDULER_TaskContext *tc)
560 {
561   mig_task = GNUNET_SCHEDULER_NO_TASK;
562   if (mig_size >= MAX_MIGRATION_QUEUE)
563     return;
564   if (GSF_dsh != NULL)
565   {
566 #if DEBUG_FS_MIGRATION
567     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
568                 "Asking datastore for content for replication (queue size: %u)\n",
569                 mig_size);
570 #endif
571     mig_qe =
572         GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX,
573                                               GNUNET_TIME_UNIT_FOREVER_REL,
574                                               &process_migration_content, NULL);
575     if (NULL == mig_qe)
576       consider_gathering ();
577   }
578 }
579
580
581 /**
582  * A peer connected to us.  Start pushing content
583  * to this peer.
584  *
585  * @param peer handle for the peer that connected
586  */
587 void
588 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
589 {
590   struct MigrationReadyPeer *mrp;
591
592   if (GNUNET_YES != enabled)
593     return;
594   mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
595   mrp->peer = peer;
596   find_content (mrp);
597   GNUNET_CONTAINER_DLL_insert (peer_head, peer_tail, mrp);
598 }
599
600
601 /**
602  * A peer disconnected from us.  Stop pushing content
603  * to this peer.
604  *
605  * @param peer handle for the peer that disconnected
606  */
607 void
608 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
609 {
610   struct MigrationReadyPeer *pos;
611
612   pos = peer_head;
613   while (pos != NULL)
614   {
615     if (pos->peer == peer)
616     {
617       GNUNET_CONTAINER_DLL_remove (peer_head, peer_tail, pos);
618       if (NULL != pos->th)
619       {
620         GSF_peer_transmit_cancel_ (pos->th);
621         pos->th = NULL;
622       }
623       if (NULL != pos->msg)
624       {
625         GNUNET_free (pos->msg);
626         pos->msg = NULL;
627       }
628       GNUNET_free (pos);
629       return;
630     }
631     pos = pos->next;
632   }
633 }
634
635
636 /**
637  * Setup the module.
638  */
639 void
640 GSF_push_init_ ()
641 {
642   enabled =
643       GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING");
644   if (GNUNET_YES != enabled)
645     return;
646
647   if (GNUNET_OK !=
648       GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY",
649                                            &min_migration_delay))
650   {
651     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
652                 _
653                 ("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
654                 "MIN_MIGRATION_DELAY", "fs");
655     return;
656   }
657   consider_gathering ();
658 }
659
660
661 /**
662  * Shutdown the module.
663  */
664 void
665 GSF_push_done_ ()
666 {
667   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
668   {
669     GNUNET_SCHEDULER_cancel (mig_task);
670     mig_task = GNUNET_SCHEDULER_NO_TASK;
671   }
672   if (NULL != mig_qe)
673   {
674     GNUNET_DATASTORE_cancel (mig_qe);
675     mig_qe = NULL;
676   }
677   while (NULL != mig_head)
678     delete_migration_block (mig_head);
679   GNUNET_assert (0 == mig_size);
680 }
681
682 /* end of gnunet-service-fs_push.c */