fix migration support
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * How long must content remain valid for us to consider it for migration?  
36  * If content will expire too soon, there is clearly no point in pushing
37  * it to other peers.  This value gives the threshold for migration.  Note
38  * that if this value is increased, the migration testcase may need to be
39  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
40  */
41 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
42
43
44 /**
45  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
46  */
47 struct MigrationReadyBlock
48 {
49
50   /**
51    * This is a doubly-linked list.
52    */
53   struct MigrationReadyBlock *next;
54
55   /**
56    * This is a doubly-linked list.
57    */
58   struct MigrationReadyBlock *prev;
59
60   /**
61    * Query for the block.
62    */
63   GNUNET_HashCode query;
64
65   /**
66    * When does this block expire? 
67    */
68   struct GNUNET_TIME_Absolute expiration;
69
70   /**
71    * Peers we already forwarded this
72    * block to.  Zero for empty entries.
73    */
74   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
75
76   /**
77    * Size of the block.
78    */
79   size_t size;
80
81   /**
82    *  Number of targets already used.
83    */
84   unsigned int used_targets;
85
86   /**
87    * Type of the block.
88    */
89   enum GNUNET_BLOCK_Type type;
90 };
91
92
93 /**
94  * Information about a peer waiting for
95  * migratable data.
96  */
97 struct MigrationReadyPeer
98 {
99   /**
100    * This is a doubly-linked list.
101    */
102   struct MigrationReadyPeer *next;
103
104   /**
105    * This is a doubly-linked list.
106    */
107   struct MigrationReadyPeer *prev;
108
109   /**
110    * Handle to peer.
111    */
112   struct GSF_ConnectedPeer *peer;
113   
114   /**
115    * Handle for current transmission request,
116    * or NULL for none.
117    */
118   struct GSF_PeerTransmitHandle *th;
119
120   /**
121    * Message we are trying to push right now (or NULL)
122    */
123   struct PutMessage *msg;
124 };
125
126
127 /**
128  * Head of linked list of blocks that can be migrated.
129  */
130 static struct MigrationReadyBlock *mig_head;
131
132 /**
133  * Tail of linked list of blocks that can be migrated.
134  */
135 static struct MigrationReadyBlock *mig_tail;
136
137 /**
138  * Head of linked list of peers.
139  */
140 static struct MigrationReadyPeer *peer_head;
141
142 /**
143  * Tail of linked list of peers.
144  */
145 static struct MigrationReadyPeer *peer_tail;
146
147 /**
148  * Request to datastore for migration (or NULL).
149  */
150 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
151
152 /**
153  * ID of task that collects blocks for migration.
154  */
155 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
156
157 /**
158  * What is the maximum frequency at which we are allowed to
159  * poll the datastore for migration content?
160  */
161 static struct GNUNET_TIME_Relative min_migration_delay;
162
163 /**
164  * Size of the doubly-linked list of migration blocks.
165  */
166 static unsigned int mig_size;
167
168 /**
169  * Is this module enabled?
170  */
171 static int enabled;
172
173
174 /**
175  * Delete the given migration block.
176  *
177  * @param mb block to delete
178  */
179 static void
180 delete_migration_block (struct MigrationReadyBlock *mb)
181 {
182   GNUNET_CONTAINER_DLL_remove (mig_head,
183                                mig_tail,
184                                mb);
185   GNUNET_PEER_decrement_rcs (mb->target_list,
186                              MIGRATION_LIST_SIZE);
187   mig_size--;
188   GNUNET_free (mb);
189 }
190
191
192 /**
193  * Find content for migration to this peer.
194  */ 
195 static void
196 find_content (struct MigrationReadyPeer *mrp);
197
198
199 /**
200  * Transmit the message currently scheduled for
201  * transmission.
202  *
203  * @param cls the 'struct MigrationReadyPeer'
204  * @param buf_size number of bytes available in buf
205  * @param buf where to copy the message, NULL on error (peer disconnect)
206  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
207  */
208 static size_t
209 transmit_message (void *cls,
210                   size_t buf_size,
211                   void *buf)
212 {
213   struct MigrationReadyPeer *peer = cls;
214   struct PutMessage *msg;
215   uint16_t msize;
216
217   peer->th = NULL;
218   msg = peer->msg;
219   peer->msg = NULL;
220   if (buf == NULL)
221     {
222 #if DEBUG_FS
223       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
224                   "Failed to migrate content to another peer (disconnect)\n");
225 #endif
226       GNUNET_free (msg);
227       return 0;
228     }
229   msize = ntohs (msg->header.size);
230   GNUNET_assert (msize <= buf_size);
231   memcpy (buf, msg, msize);
232   GNUNET_free (msg);
233 #if DEBUG_FS
234   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
235               "Pushing %u bytes to another peer\n",
236               msize);
237 #endif
238   find_content (peer);
239   return msize;
240 }
241
242
243 /**
244  * Send the given block to the given peer.
245  *
246  * @param peer target peer
247  * @param block the block
248  * @return GNUNET_YES if the block was deleted (!)
249  */
250 static int
251 transmit_content (struct MigrationReadyPeer *peer,
252                   struct MigrationReadyBlock *block)
253 {
254   size_t msize;
255   struct PutMessage *msg;
256   unsigned  int i;
257   struct GSF_PeerPerformanceData *ppd;
258   int ret;
259
260   ppd = GSF_get_peer_performance_data_ (peer->peer);
261   GNUNET_assert (NULL == peer->th);
262   msize = sizeof (struct PutMessage) + block->size;
263   msg = GNUNET_malloc (msize);
264   msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
265   msg->header.size = htons (msize);
266   msg->type = htonl (block->type);
267   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
268   memcpy (&msg[1],
269           &block[1],
270           block->size);
271   peer->msg = msg;
272   for (i=0;i<MIGRATION_LIST_SIZE;i++)
273     {
274       if (block->target_list[i] == 0)
275         {
276           block->target_list[i] = ppd->pid;
277           GNUNET_PEER_change_rc (block->target_list[i], 1);
278           break;
279         }
280     }
281   if (MIGRATION_LIST_SIZE == i)
282     {
283       delete_migration_block (block);
284       ret = GNUNET_YES;
285     }
286   else
287     {
288       ret = GNUNET_NO;
289     }
290 #if DEBUG_FS
291   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
292               "Asking for transmission of %u bytes for migration\n",
293               msize);
294 #endif
295   peer->th = GSF_peer_transmit_ (peer->peer,
296                                  GNUNET_NO,
297                                  0 /* priority */,
298                                  GNUNET_TIME_UNIT_FOREVER_REL,
299                                  msize,
300                                  &transmit_message,
301                                  peer);
302   return ret;
303 }
304
305
306 /**
307  * Count the number of peers this block has
308  * already been forwarded to.
309  *
310  * @param block the block
311  * @return number of times block was forwarded
312  */
313 static unsigned int
314 count_targets (struct MigrationReadyBlock *block)
315 {
316   unsigned int i;
317
318   for (i=0;i<MIGRATION_LIST_SIZE;i++)
319     if (block->target_list[i] == 0)
320       return i;
321   return i;
322 }
323
324
325 /**
326  * Check if sending this block to this peer would
327  * be a good idea. 
328  *
329  * @param peer target peer
330  * @param block the block
331  * @return score (>= 0: feasible, negative: infeasible)
332  */
333 static long
334 score_content (struct MigrationReadyPeer *peer,
335                struct MigrationReadyBlock *block)
336 {
337   unsigned int i;
338   struct GSF_PeerPerformanceData *ppd;
339   struct GNUNET_PeerIdentity id;
340   uint32_t dist;
341
342   ppd = GSF_get_peer_performance_data_ (peer->peer);
343   for (i=0;i<MIGRATION_LIST_SIZE;i++)
344     if (block->target_list[i] == ppd->pid)
345       return -1;
346   GNUNET_PEER_resolve (ppd->pid,
347                        &id);
348   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
349                                           &id.hashPubKey);
350   /* closer distance, higher score: */
351   return UINT32_MAX - dist;
352 }
353
354
355 /**
356  * If the migration task is not currently running, consider
357  * (re)scheduling it with the appropriate delay.
358  */
359 static void
360 consider_gathering (void);
361
362
363 /**
364  * Find content for migration to this peer.
365  *
366  * @param mrp peer to find content for
367  */ 
368 static void
369 find_content (struct MigrationReadyPeer *mrp)
370 {
371   struct MigrationReadyBlock *pos;
372   long score;
373   long best_score;
374   struct MigrationReadyBlock *best;
375
376   GNUNET_assert (NULL == mrp->th);
377   best = NULL;
378   best_score = -1;
379   pos = mig_head;
380   while (NULL != pos)
381     {
382       score = score_content (mrp, pos);
383       if (score > best_score)
384         {
385           best_score = score;
386           best = pos;
387         }
388       pos = pos->next;
389     }
390   if (NULL == best) 
391     {
392       if (mig_size < MAX_MIGRATION_QUEUE)
393         {
394 #if DEBUG_FS
395           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
396                       "No content found for pushing, waiting for queue to fill\n");
397 #endif
398           return; /* will fill up eventually... */
399         }
400 #if DEBUG_FS
401       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
402                   "No suitable content found, purging content from full queue\n");
403 #endif
404       /* failed to find migration target AND
405          queue is full, purge most-forwarded
406          block from queue to make room for more */
407       pos = mig_head;
408       while (NULL != pos)
409         {
410           score = count_targets (pos);
411           if (score >= best_score)
412             {
413               best_score = score;
414               best = pos;
415             }
416           pos = pos->next;
417         }
418       GNUNET_assert (NULL != best);
419       delete_migration_block (best);
420       consider_gathering ();
421       return;
422     }
423 #if DEBUG_FS
424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425               "Preparing to push best content to peer\n");
426 #endif
427   transmit_content (mrp, best);
428 }
429
430
431 /**
432  * Task that is run periodically to obtain blocks for content
433  * migration
434  * 
435  * @param cls unused
436  * @param tc scheduler context (also unused)
437  */
438 static void
439 gather_migration_blocks (void *cls,
440                          const struct GNUNET_SCHEDULER_TaskContext *tc);
441
442
443 /**
444  * If the migration task is not currently running, consider
445  * (re)scheduling it with the appropriate delay.
446  */
447 static void
448 consider_gathering ()
449 {
450   struct GNUNET_TIME_Relative delay;
451
452   if (GSF_dsh == NULL)
453     return;
454   if (mig_qe != NULL)
455     return;
456   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
457     return;
458   if (mig_size >= MAX_MIGRATION_QUEUE)  
459     return;
460   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
461                                          mig_size);
462   delay = GNUNET_TIME_relative_divide (delay,
463                                        MAX_MIGRATION_QUEUE);
464   delay = GNUNET_TIME_relative_max (delay,
465                                     min_migration_delay);
466   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
467                                            &gather_migration_blocks,
468                                            NULL);
469 }
470
471
472 /**
473  * Process content offered for migration.
474  *
475  * @param cls closure
476  * @param key key for the content
477  * @param size number of bytes in data
478  * @param data content stored
479  * @param type type of the content
480  * @param priority priority of the content
481  * @param anonymity anonymity-level for the content
482  * @param expiration expiration time for the content
483  * @param uid unique identifier for the datum;
484  *        maybe 0 if no unique identifier is available
485  */
486 static void
487 process_migration_content (void *cls,
488                            const GNUNET_HashCode * key,
489                            size_t size,
490                            const void *data,
491                            enum GNUNET_BLOCK_Type type,
492                            uint32_t priority,
493                            uint32_t anonymity,
494                            struct GNUNET_TIME_Absolute
495                            expiration, uint64_t uid)
496 {
497   struct MigrationReadyBlock *mb;
498   struct MigrationReadyPeer *pos;
499   
500   if (key == NULL)
501     {
502       mig_qe = NULL;
503       consider_gathering ();
504       return;
505     }
506   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < 
507       MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
508     {
509       /* content will expire soon, don't bother */
510       GNUNET_DATASTORE_get_next (GSF_dsh);
511       return;
512     }
513   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
514     {
515       if (GNUNET_OK !=
516           GNUNET_FS_handle_on_demand_block (key, size, data,
517                                             type, priority, anonymity,
518                                             expiration, uid, 
519                                             &process_migration_content,
520                                             NULL))
521         {
522           GNUNET_DATASTORE_get_next (GSF_dsh);
523         }
524       return;
525     }
526 #if DEBUG_FS
527   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
528               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
529               GNUNET_h2s (key),
530               type,
531               mig_size + 1,
532               MIGRATION_LIST_SIZE);
533 #endif
534   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
535   mb->query = *key;
536   mb->expiration = expiration;
537   mb->size = size;
538   mb->type = type;
539   memcpy (&mb[1], data, size);
540   GNUNET_CONTAINER_DLL_insert_after (mig_head,
541                                      mig_tail,
542                                      mig_tail,
543                                      mb);
544   mig_size++;
545   pos = peer_head;
546   while (pos != NULL)
547     {
548       if (NULL == pos->th)
549         {
550 #if DEBUG_FS
551           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
552                       "Preparing to push best content to peer\n");
553 #endif
554           if (GNUNET_YES == transmit_content (pos, mb))
555             break; /* 'mb' was freed! */
556         }
557       pos = pos->next;
558     }
559   GNUNET_DATASTORE_get_next (GSF_dsh);
560 }
561
562
563 /**
564  * Task that is run periodically to obtain blocks for content
565  * migration
566  * 
567  * @param cls unused
568  * @param tc scheduler context (also unused)
569  */
570 static void
571 gather_migration_blocks (void *cls,
572                          const struct GNUNET_SCHEDULER_TaskContext *tc)
573 {
574   mig_task = GNUNET_SCHEDULER_NO_TASK;
575   if (mig_size >= MAX_MIGRATION_QUEUE)  
576     return;
577   if (GSF_dsh != NULL)
578     {
579       mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh, 
580                                             0, UINT_MAX,
581                                             GNUNET_TIME_UNIT_FOREVER_REL,
582                                             &process_migration_content, NULL);
583       GNUNET_assert (mig_qe != NULL);
584     }
585 }
586
587
588 /**
589  * A peer connected to us.  Start pushing content
590  * to this peer.
591  *
592  * @param peer handle for the peer that connected
593  */
594 void
595 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
596 {
597   struct MigrationReadyPeer *mrp;
598
599   if (GNUNET_YES != enabled)
600     return;
601   mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
602   mrp->peer = peer;
603   find_content (mrp);
604   GNUNET_CONTAINER_DLL_insert  (peer_head,
605                                 peer_tail,
606                                 mrp);
607 }
608
609
610 /**
611  * A peer disconnected from us.  Stop pushing content
612  * to this peer.
613  *
614  * @param peer handle for the peer that disconnected
615  */
616 void
617 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
618 {
619   struct MigrationReadyPeer *pos;
620
621   pos = peer_head;
622   while (pos != NULL)
623     {
624       if (pos->peer == peer)
625         {
626           GNUNET_CONTAINER_DLL_remove (peer_head,
627                                        peer_tail,
628                                        pos);
629           if (NULL != pos->th)
630             GSF_peer_transmit_cancel_ (pos->th);
631           GNUNET_free (pos);
632           return;
633         }
634       pos = pos->next;
635     }
636 }
637
638
639 /**
640  * Setup the module.
641  */
642 void
643 GSF_push_init_ ()
644 {
645   enabled = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
646                                                   "FS",
647                                                   "CONTENT_PUSHING");
648   if (GNUNET_YES != enabled)
649     return;
650  
651   if (GNUNET_OK != 
652       GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
653                                            "fs",
654                                            "MIN_MIGRATION_DELAY",
655                                            &min_migration_delay))
656     {
657       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
658                   _("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
659                   "MIN_MIGRATION_DELAY",
660                   "fs");
661       return;
662     }
663   consider_gathering ();
664 }
665
666
667 /**
668  * Shutdown the module.
669  */
670 void
671 GSF_push_done_ ()
672 {
673   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
674     {
675       GNUNET_SCHEDULER_cancel (mig_task);
676       mig_task = GNUNET_SCHEDULER_NO_TASK;
677     }
678   if (NULL != mig_qe)
679     {
680       GNUNET_DATASTORE_cancel (mig_qe);
681       mig_qe = NULL;
682     }
683   while (NULL != mig_head)
684     delete_migration_block (mig_head);
685   GNUNET_assert (0 == mig_size);
686 }
687
688 /* end of gnunet-service-fs_push.c */