65b15f356d9e807fff5b08af65d8b5a4945ecc67
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * How long must content remain valid for us to consider it for migration?  
36  * If content will expire too soon, there is clearly no point in pushing
37  * it to other peers.  This value gives the threshold for migration.  Note
38  * that if this value is increased, the migration testcase may need to be
39  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
40  */
41 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
42
43
44 /**
45  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
46  */
47 struct MigrationReadyBlock
48 {
49
50   /**
51    * This is a doubly-linked list.
52    */
53   struct MigrationReadyBlock *next;
54
55   /**
56    * This is a doubly-linked list.
57    */
58   struct MigrationReadyBlock *prev;
59
60   /**
61    * Query for the block.
62    */
63   GNUNET_HashCode query;
64
65   /**
66    * When does this block expire? 
67    */
68   struct GNUNET_TIME_Absolute expiration;
69
70   /**
71    * Peers we already forwarded this
72    * block to.  Zero for empty entries.
73    */
74   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
75
76   /**
77    * Size of the block.
78    */
79   size_t size;
80
81   /**
82    *  Number of targets already used.
83    */
84   unsigned int used_targets;
85
86   /**
87    * Type of the block.
88    */
89   enum GNUNET_BLOCK_Type type;
90 };
91
92
93 /**
94  * Information about a peer waiting for
95  * migratable data.
96  */
97 struct MigrationReadyPeer
98 {
99   /**
100    * This is a doubly-linked list.
101    */
102   struct MigrationReadyPeer *next;
103
104   /**
105    * This is a doubly-linked list.
106    */
107   struct MigrationReadyPeer *prev;
108
109   /**
110    * Handle to peer.
111    */
112   struct GSF_ConnectedPeer *peer;
113   
114   /**
115    * Handle for current transmission request,
116    * or NULL for none.
117    */
118   struct GSF_PeerTransmitHandle *th;
119
120   /**
121    * Message we are trying to push right now (or NULL)
122    */
123   struct PutMessage *msg;
124 };
125
126
127 /**
128  * Head of linked list of blocks that can be migrated.
129  */
130 static struct MigrationReadyBlock *mig_head;
131
132 /**
133  * Tail of linked list of blocks that can be migrated.
134  */
135 static struct MigrationReadyBlock *mig_tail;
136
137 /**
138  * Head of linked list of peers.
139  */
140 static struct MigrationReadyPeer *peer_head;
141
142 /**
143  * Tail of linked list of peers.
144  */
145 static struct MigrationReadyPeer *peer_tail;
146
147 /**
148  * Request to datastore for migration (or NULL).
149  */
150 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
151
152 /**
153  * ID of task that collects blocks for migration.
154  */
155 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
156
157 /**
158  * What is the maximum frequency at which we are allowed to
159  * poll the datastore for migration content?
160  */
161 static struct GNUNET_TIME_Relative min_migration_delay;
162
163 /**
164  * Size of the doubly-linked list of migration blocks.
165  */
166 static unsigned int mig_size;
167
168 /**
169  * Is this module enabled?
170  */
171 static int enabled;
172
173
174 /**
175  * Delete the given migration block.
176  *
177  * @param mb block to delete
178  */
179 static void
180 delete_migration_block (struct MigrationReadyBlock *mb)
181 {
182   GNUNET_CONTAINER_DLL_remove (mig_head,
183                                mig_tail,
184                                mb);
185   GNUNET_PEER_decrement_rcs (mb->target_list,
186                              MIGRATION_LIST_SIZE);
187   mig_size--;
188   GNUNET_free (mb);
189 }
190
191
192 /**
193  * Find content for migration to this peer.
194  */ 
195 static void
196 find_content (struct MigrationReadyPeer *mrp);
197
198
199 /**
200  * Transmit the message currently scheduled for
201  * transmission.
202  *
203  * @param cls the 'struct MigrationReadyPeer'
204  * @param buf_size number of bytes available in buf
205  * @param buf where to copy the message, NULL on error (peer disconnect)
206  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
207  */
208 static size_t
209 transmit_message (void *cls,
210                   size_t buf_size,
211                   void *buf)
212 {
213   struct MigrationReadyPeer *peer = cls;
214   struct PutMessage *msg;
215   uint16_t msize;
216
217   peer->th = NULL;
218   msg = peer->msg;
219   peer->msg = NULL;
220   if (buf == NULL)
221     {
222 #if DEBUG_FS
223       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
224                   "Failed to migrate content to another peer (disconnect)\n");
225 #endif
226       GNUNET_free (msg);
227       return 0;
228     }
229   msize = ntohs (msg->header.size);
230   GNUNET_assert (msize <= buf_size);
231   memcpy (buf, msg, msize);
232   GNUNET_free (msg);
233 #if DEBUG_FS
234   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
235               "Pushing %u bytes to another peer\n",
236               msize);
237 #endif
238   find_content (peer);
239   return msize;
240 }
241
242
243 /**
244  * Send the given block to the given peer.
245  *
246  * @param peer target peer
247  * @param block the block
248  * @return GNUNET_YES if the block was deleted (!)
249  */
250 static int
251 transmit_content (struct MigrationReadyPeer *peer,
252                   struct MigrationReadyBlock *block)
253 {
254   size_t msize;
255   struct PutMessage *msg;
256   unsigned  int i;
257   struct GSF_PeerPerformanceData *ppd;
258   int ret;
259
260   ppd = GSF_get_peer_performance_data_ (peer->peer);
261   GNUNET_assert (NULL == peer->th);
262   msize = sizeof (struct PutMessage) + block->size;
263   msg = GNUNET_malloc (msize);
264   msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
265   msg->header.size = htons (msize);
266   msg->type = htonl (block->type);
267   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
268   memcpy (&msg[1],
269           &block[1],
270           block->size);
271   peer->msg = msg;
272   for (i=0;i<MIGRATION_LIST_SIZE;i++)
273     {
274       if (block->target_list[i] == 0)
275         {
276           block->target_list[i] = ppd->pid;
277           GNUNET_PEER_change_rc (block->target_list[i], 1);
278           break;
279         }
280     }
281   if (MIGRATION_LIST_SIZE == i)
282     {
283       delete_migration_block (block);
284       ret = GNUNET_YES;
285     }
286   else
287     {
288       ret = GNUNET_NO;
289     }
290 #if DEBUG_FS
291   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
292               "Asking for transmission of %u bytes for migration\n",
293               msize);
294 #endif
295   peer->th = GSF_peer_transmit_ (peer->peer,
296                                  GNUNET_NO,
297                                  0 /* priority */,
298                                  GNUNET_TIME_UNIT_FOREVER_REL,
299                                  msize,
300                                  &transmit_message,
301                                  peer);
302   return ret;
303 }
304
305
306 /**
307  * Count the number of peers this block has
308  * already been forwarded to.
309  *
310  * @param block the block
311  * @return number of times block was forwarded
312  */
313 static unsigned int
314 count_targets (struct MigrationReadyBlock *block)
315 {
316   unsigned int i;
317
318   for (i=0;i<MIGRATION_LIST_SIZE;i++)
319     if (block->target_list[i] == 0)
320       return i;
321   return i;
322 }
323
324
325 /**
326  * Check if sending this block to this peer would
327  * be a good idea. 
328  *
329  * @param peer target peer
330  * @param block the block
331  * @return score (>= 0: feasible, negative: infeasible)
332  */
333 static long
334 score_content (struct MigrationReadyPeer *peer,
335                struct MigrationReadyBlock *block)
336 {
337   unsigned int i;
338   struct GSF_PeerPerformanceData *ppd;
339   struct GNUNET_PeerIdentity id;
340   uint32_t dist;
341
342   ppd = GSF_get_peer_performance_data_ (peer->peer);
343   for (i=0;i<MIGRATION_LIST_SIZE;i++)
344     if (block->target_list[i] == ppd->pid)
345       return -1;
346   GNUNET_PEER_resolve (ppd->pid,
347                        &id);
348   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
349                                           &id.hashPubKey);
350   /* closer distance, higher score: */
351   return UINT32_MAX - dist;
352 }
353
354
355 /**
356  * If the migration task is not currently running, consider
357  * (re)scheduling it with the appropriate delay.
358  */
359 static void
360 consider_gathering (void);
361
362
363 /**
364  * Find content for migration to this peer.
365  *
366  * @param mrp peer to find content for
367  */ 
368 static void
369 find_content (struct MigrationReadyPeer *mrp)
370 {
371   struct MigrationReadyBlock *pos;
372   long score;
373   long best_score;
374   struct MigrationReadyBlock *best;
375
376   GNUNET_assert (NULL == mrp->th);
377   best = NULL;
378   best_score = -1;
379   pos = mig_head;
380   while (NULL != pos)
381     {
382       score = score_content (mrp, pos);
383       if (score > best_score)
384         {
385           best_score = score;
386           best = pos;
387         }
388       pos = pos->next;
389     }
390   if (NULL == best) 
391     {
392       if (mig_size < MAX_MIGRATION_QUEUE)
393         {
394 #if DEBUG_FS
395           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
396                       "No content found for pushing, waiting for queue to fill\n");
397 #endif
398           return; /* will fill up eventually... */
399         }
400 #if DEBUG_FS
401       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
402                   "No suitable content found, purging content from full queue\n");
403 #endif
404       /* failed to find migration target AND
405          queue is full, purge most-forwarded
406          block from queue to make room for more */
407       score = 0;
408       pos = mig_head;
409       while (NULL != pos)
410         {
411           score = count_targets (pos);
412           if (score >= best_score)
413             {
414               best_score = score;
415               best = pos;
416             }
417           pos = pos->next;
418         }
419       GNUNET_assert (NULL != best);
420       delete_migration_block (best);
421       consider_gathering ();
422       return;
423     }
424 #if DEBUG_FS
425   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
426               "Preparing to push best content to peer\n");
427 #endif
428   transmit_content (mrp, best);
429 }
430
431
432 /**
433  * Task that is run periodically to obtain blocks for content
434  * migration
435  * 
436  * @param cls unused
437  * @param tc scheduler context (also unused)
438  */
439 static void
440 gather_migration_blocks (void *cls,
441                          const struct GNUNET_SCHEDULER_TaskContext *tc);
442
443
444 /**
445  * If the migration task is not currently running, consider
446  * (re)scheduling it with the appropriate delay.
447  */
448 static void
449 consider_gathering ()
450 {
451   struct GNUNET_TIME_Relative delay;
452
453   if (GSF_dsh == NULL)
454     return;
455   if (mig_qe != NULL)
456     return;
457   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
458     return;
459   if (mig_size >= MAX_MIGRATION_QUEUE)  
460     return;
461   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
462                                          mig_size);
463   delay = GNUNET_TIME_relative_divide (delay,
464                                        MAX_MIGRATION_QUEUE);
465   delay = GNUNET_TIME_relative_max (delay,
466                                     min_migration_delay);
467   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
468                                            &gather_migration_blocks,
469                                            NULL);
470 }
471
472
473 /**
474  * Process content offered for migration.
475  *
476  * @param cls closure
477  * @param key key for the content
478  * @param size number of bytes in data
479  * @param data content stored
480  * @param type type of the content
481  * @param priority priority of the content
482  * @param anonymity anonymity-level for the content
483  * @param expiration expiration time for the content
484  * @param uid unique identifier for the datum;
485  *        maybe 0 if no unique identifier is available
486  */
487 static void
488 process_migration_content (void *cls,
489                            const GNUNET_HashCode * key,
490                            size_t size,
491                            const void *data,
492                            enum GNUNET_BLOCK_Type type,
493                            uint32_t priority,
494                            uint32_t anonymity,
495                            struct GNUNET_TIME_Absolute
496                            expiration, uint64_t uid)
497 {
498   struct MigrationReadyBlock *mb;
499   struct MigrationReadyPeer *pos;
500   
501   if (key == NULL)
502     {
503       mig_qe = NULL;
504       consider_gathering ();
505       return;
506     }
507   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < 
508       MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
509     {
510       /* content will expire soon, don't bother */
511       GNUNET_DATASTORE_get_next (GSF_dsh);
512       return;
513     }
514   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
515     {
516       if (GNUNET_OK !=
517           GNUNET_FS_handle_on_demand_block (key, size, data,
518                                             type, priority, anonymity,
519                                             expiration, uid, 
520                                             &process_migration_content,
521                                             NULL))
522         {
523           GNUNET_DATASTORE_get_next (GSF_dsh);
524         }
525       return;
526     }
527 #if DEBUG_FS
528   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
529               "Retrieved block `%s' of type %u for migration\n",
530               GNUNET_h2s (key),
531               type);
532 #endif
533   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
534   mb->query = *key;
535   mb->expiration = expiration;
536   mb->size = size;
537   mb->type = type;
538   memcpy (&mb[1], data, size);
539   GNUNET_CONTAINER_DLL_insert_after (mig_head,
540                                      mig_tail,
541                                      mig_tail,
542                                      mb);
543   mig_size++;
544   pos = peer_head;
545   while (pos != NULL)
546     {
547       if (NULL == pos->th)
548         {
549 #if DEBUG_FS
550           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
551                       "Preparing to push best content to peer\n");
552 #endif
553           if (GNUNET_YES == transmit_content (pos, mb))
554             break; /* 'mb' was freed! */
555         }
556       pos = pos->next;
557     }
558   GNUNET_DATASTORE_get_next (GSF_dsh);
559 }
560
561
562 /**
563  * Task that is run periodically to obtain blocks for content
564  * migration
565  * 
566  * @param cls unused
567  * @param tc scheduler context (also unused)
568  */
569 static void
570 gather_migration_blocks (void *cls,
571                          const struct GNUNET_SCHEDULER_TaskContext *tc)
572 {
573   mig_task = GNUNET_SCHEDULER_NO_TASK;
574   if (GSF_dsh != NULL)
575     {
576       mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh, 
577                                             0, UINT_MAX,
578                                             GNUNET_TIME_UNIT_FOREVER_REL,
579                                             &process_migration_content, NULL);
580       GNUNET_assert (mig_qe != NULL);
581     }
582 }
583
584
585 /**
586  * A peer connected to us.  Start pushing content
587  * to this peer.
588  *
589  * @param peer handle for the peer that connected
590  */
591 void
592 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
593 {
594   struct MigrationReadyPeer *mrp;
595
596   if (GNUNET_YES != enabled)
597     return;
598   mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
599   mrp->peer = peer;
600   find_content (mrp);
601   GNUNET_CONTAINER_DLL_insert  (peer_head,
602                                 peer_tail,
603                                 mrp);
604 }
605
606
607 /**
608  * A peer disconnected from us.  Stop pushing content
609  * to this peer.
610  *
611  * @param peer handle for the peer that disconnected
612  */
613 void
614 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
615 {
616   struct MigrationReadyPeer *pos;
617
618   pos = peer_head;
619   while (pos != NULL)
620     {
621       if (pos->peer == peer)
622         {
623           GNUNET_CONTAINER_DLL_remove (peer_head,
624                                        peer_tail,
625                                        pos);
626           if (NULL != pos->th)
627             GSF_peer_transmit_cancel_ (pos->th);
628           GNUNET_free (pos);
629           return;
630         }
631       pos = pos->next;
632     }
633 }
634
635
636 /**
637  * Setup the module.
638  */
639 void
640 GSF_push_init_ ()
641 {
642   enabled = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
643                                                   "FS",
644                                                   "CONTENT_PUSHING");
645   if (GNUNET_YES != enabled)
646     return;
647  
648   if (GNUNET_OK != 
649       GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
650                                            "fs",
651                                            "MIN_MIGRATION_DELAY",
652                                            &min_migration_delay))
653     {
654       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
655                   _("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
656                   "MIN_MIGRATION_DELAY",
657                   "fs");
658       return;
659     }
660   consider_gathering ();
661 }
662
663
664 /**
665  * Shutdown the module.
666  */
667 void
668 GSF_push_done_ ()
669 {
670   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
671     {
672       GNUNET_SCHEDULER_cancel (mig_task);
673       mig_task = GNUNET_SCHEDULER_NO_TASK;
674     }
675   if (NULL != mig_qe)
676     {
677       GNUNET_DATASTORE_cancel (mig_qe);
678       mig_qe = NULL;
679     }
680   while (NULL != mig_head)
681     delete_migration_block (mig_head);
682   GNUNET_assert (0 == mig_size);
683 }
684
685 /* end of gnunet-service-fs_push.c */