58588ebdb7298be6fe17bdb12bf8c7eb516e6211
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * How long must content remain valid for us to consider it for migration?  
36  * If content will expire too soon, there is clearly no point in pushing
37  * it to other peers.  This value gives the threshold for migration.  Note
38  * that if this value is increased, the migration testcase may need to be
39  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
40  */
41 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
42
43
44 /**
45  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
46  */
47 struct MigrationReadyBlock
48 {
49
50   /**
51    * This is a doubly-linked list.
52    */
53   struct MigrationReadyBlock *next;
54
55   /**
56    * This is a doubly-linked list.
57    */
58   struct MigrationReadyBlock *prev;
59
60   /**
61    * Query for the block.
62    */
63   GNUNET_HashCode query;
64
65   /**
66    * When does this block expire? 
67    */
68   struct GNUNET_TIME_Absolute expiration;
69
70   /**
71    * Peers we already forwarded this
72    * block to.  Zero for empty entries.
73    */
74   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
75
76   /**
77    * Size of the block.
78    */
79   size_t size;
80
81   /**
82    *  Number of targets already used.
83    */
84   unsigned int used_targets;
85
86   /**
87    * Type of the block.
88    */
89   enum GNUNET_BLOCK_Type type;
90 };
91
92
93 /**
94  * Information about a peer waiting for
95  * migratable data.
96  */
97 struct MigrationReadyPeer
98 {
99   /**
100    * This is a doubly-linked list.
101    */
102   struct MigrationReadyPeer *next;
103
104   /**
105    * This is a doubly-linked list.
106    */
107   struct MigrationReadyPeer *prev;
108
109   /**
110    * Handle to peer.
111    */
112   struct GSF_ConnectedPeer *peer;
113   
114   /**
115    * Handle for current transmission request,
116    * or NULL for none.
117    */
118   struct GSF_PeerTransmitHandle *th;
119
120   /**
121    * Message we are trying to push right now (or NULL)
122    */
123   struct PutMessage *msg;
124 };
125
126
127 /**
128  * Head of linked list of blocks that can be migrated.
129  */
130 static struct MigrationReadyBlock *mig_head;
131
132 /**
133  * Tail of linked list of blocks that can be migrated.
134  */
135 static struct MigrationReadyBlock *mig_tail;
136
137 /**
138  * Head of linked list of peers.
139  */
140 static struct MigrationReadyPeer *peer_head;
141
142 /**
143  * Tail of linked list of peers.
144  */
145 static struct MigrationReadyPeer *peer_tail;
146
147 /**
148  * Request to datastore for migration (or NULL).
149  */
150 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
151
152 /**
153  * ID of task that collects blocks for migration.
154  */
155 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
156
157 /**
158  * What is the maximum frequency at which we are allowed to
159  * poll the datastore for migration content?
160  */
161 static struct GNUNET_TIME_Relative min_migration_delay;
162
163 /**
164  * Size of the doubly-linked list of migration blocks.
165  */
166 static unsigned int mig_size;
167
168 /**
169  * Is this module enabled?
170  */
171 static int enabled;
172
173
174 /**
175  * Delete the given migration block.
176  *
177  * @param mb block to delete
178  */
179 static void
180 delete_migration_block (struct MigrationReadyBlock *mb)
181 {
182   GNUNET_CONTAINER_DLL_remove (mig_head,
183                                mig_tail,
184                                mb);
185   GNUNET_PEER_decrement_rcs (mb->target_list,
186                              MIGRATION_LIST_SIZE);
187   mig_size--;
188   GNUNET_free (mb);
189 }
190
191
192 /**
193  * Find content for migration to this peer.
194  */ 
195 static void
196 find_content (struct MigrationReadyPeer *mrp);
197
198
199 /**
200  * Transmit the message currently scheduled for
201  * transmission.
202  *
203  * @param cls the 'struct MigrationReadyPeer'
204  * @param buf_size number of bytes available in buf
205  * @param buf where to copy the message, NULL on error (peer disconnect)
206  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
207  */
208 static size_t
209 transmit_message (void *cls,
210                   size_t buf_size,
211                   void *buf)
212 {
213   struct MigrationReadyPeer *peer = cls;
214   struct PutMessage *msg;
215   uint16_t msize;
216
217   peer->th = NULL;
218   msg = peer->msg;
219   peer->msg = NULL;
220   if (buf == NULL)
221     {
222       GNUNET_free (msg);
223       return 0;
224     }
225   msize = ntohs (msg->header.size);
226   GNUNET_assert (msize <= buf_size);
227   memcpy (buf, msg, msize);
228   GNUNET_free (msg);
229 #if DEBUG_FS
230   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
231               "Pushing %u bytes to another peer\n",
232               msize);
233 #endif
234   find_content (peer);
235   return msize;
236 }
237
238
239 /**
240  * Send the given block to the given peer.
241  *
242  * @param peer target peer
243  * @param block the block
244  * @return GNUNET_YES if the block was deleted (!)
245  */
246 static int
247 transmit_content (struct MigrationReadyPeer *peer,
248                   struct MigrationReadyBlock *block)
249 {
250   size_t msize;
251   struct PutMessage *msg;
252   unsigned  int i;
253   struct GSF_PeerPerformanceData *ppd;
254   int ret;
255
256   ppd = GSF_get_peer_performance_data_ (peer->peer);
257   GNUNET_assert (NULL == peer->th);
258   msize = sizeof (struct PutMessage) + block->size;
259   msg = GNUNET_malloc (msize);
260   msg->header.type = htons (42);
261   msg->header.size = htons (msize);
262   GNUNET_break (0);
263   memcpy (&msg[1],
264           &block[1],
265           block->size);
266   peer->msg = msg;
267   for (i=0;i<MIGRATION_LIST_SIZE;i++)
268     {
269       if (block->target_list[i] == 0)
270         {
271           block->target_list[i] = ppd->pid;
272           GNUNET_PEER_change_rc (block->target_list[i], 1);
273           break;
274         }
275     }
276   if (MIGRATION_LIST_SIZE == i)
277     {
278       delete_migration_block (block);
279       ret = GNUNET_YES;
280     }
281   else
282     {
283       ret = GNUNET_NO;
284     }
285   peer->th = GSF_peer_transmit_ (peer->peer,
286                                  GNUNET_NO,
287                                  0 /* priority */,
288                                  GNUNET_TIME_UNIT_FOREVER_REL,
289                                  msize,
290                                  &transmit_message,
291                                  peer);
292   return ret;
293 }
294
295
296 /**
297  * Count the number of peers this block has
298  * already been forwarded to.
299  *
300  * @param block the block
301  * @return number of times block was forwarded
302  */
303 static unsigned int
304 count_targets (struct MigrationReadyBlock *block)
305 {
306   unsigned int i;
307
308   for (i=0;i<MIGRATION_LIST_SIZE;i++)
309     if (block->target_list[i] == 0)
310       return i;
311   return i;
312 }
313
314
315 /**
316  * Check if sending this block to this peer would
317  * be a good idea. 
318  *
319  * @param peer target peer
320  * @param block the block
321  * @return score (>= 0: feasible, negative: infeasible)
322  */
323 static long
324 score_content (struct MigrationReadyPeer *peer,
325                struct MigrationReadyBlock *block)
326 {
327   unsigned int i;
328   struct GSF_PeerPerformanceData *ppd;
329   struct GNUNET_PeerIdentity id;
330   uint32_t dist;
331
332   ppd = GSF_get_peer_performance_data_ (peer->peer);
333   for (i=0;i<MIGRATION_LIST_SIZE;i++)
334     if (block->target_list[i] == ppd->pid)
335       return -1;
336   GNUNET_PEER_resolve (ppd->pid,
337                        &id);
338   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
339                                           &id.hashPubKey);
340   /* closer distance, higher score: */
341   return UINT32_MAX - dist;
342 }
343
344
345 /**
346  * If the migration task is not currently running, consider
347  * (re)scheduling it with the appropriate delay.
348  */
349 static void
350 consider_gathering (void);
351
352
353 /**
354  * Find content for migration to this peer.
355  *
356  * @param mrp peer to find content for
357  */ 
358 static void
359 find_content (struct MigrationReadyPeer *mrp)
360 {
361   struct MigrationReadyBlock *pos;
362   long score;
363   long best_score;
364   struct MigrationReadyBlock *best;
365
366   GNUNET_assert (NULL == mrp->th);
367   best = NULL;
368   best_score = -1;
369   pos = mig_head;
370   while (NULL != pos)
371     {
372       score = score_content (mrp, pos);
373       if (score > best_score)
374         {
375           best_score = score;
376           best = pos;
377         }
378       pos = pos->next;
379     }
380   if (NULL == best) 
381     {
382       if (mig_size < MAX_MIGRATION_QUEUE)
383         {
384 #if DEBUG_FS
385           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386                       "No content found for pushing, waiting for queue to fill\n");
387 #endif
388           return; /* will fill up eventually... */
389         }
390 #if DEBUG_FS
391       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392                   "No suitable content found, purging content from full queue\n");
393 #endif
394       /* failed to find migration target AND
395          queue is full, purge most-forwarded
396          block from queue to make room for more */
397       score = 0;
398       pos = mig_head;
399       while (NULL != pos)
400         {
401           score = count_targets (pos);
402           if (score >= best_score)
403             {
404               best_score = score;
405               best = pos;
406             }
407           pos = pos->next;
408         }
409       GNUNET_assert (NULL != best);
410       delete_migration_block (best);
411       consider_gathering ();
412       return;
413     }
414 #if DEBUG_FS
415   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
416               "Preparing to push best content to peer\n");
417 #endif
418   transmit_content (mrp, best);
419 }
420
421
422 /**
423  * Task that is run periodically to obtain blocks for content
424  * migration
425  * 
426  * @param cls unused
427  * @param tc scheduler context (also unused)
428  */
429 static void
430 gather_migration_blocks (void *cls,
431                          const struct GNUNET_SCHEDULER_TaskContext *tc);
432
433
434 /**
435  * If the migration task is not currently running, consider
436  * (re)scheduling it with the appropriate delay.
437  */
438 static void
439 consider_gathering ()
440 {
441   struct GNUNET_TIME_Relative delay;
442
443   if (GSF_dsh == NULL)
444     return;
445   if (mig_qe != NULL)
446     return;
447   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
448     return;
449   if (mig_size >= MAX_MIGRATION_QUEUE)  
450     return;
451   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
452                                          mig_size);
453   delay = GNUNET_TIME_relative_divide (delay,
454                                        MAX_MIGRATION_QUEUE);
455   delay = GNUNET_TIME_relative_max (delay,
456                                     min_migration_delay);
457   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
458                                            &gather_migration_blocks,
459                                            NULL);
460 }
461
462
463 /**
464  * Process content offered for migration.
465  *
466  * @param cls closure
467  * @param key key for the content
468  * @param size number of bytes in data
469  * @param data content stored
470  * @param type type of the content
471  * @param priority priority of the content
472  * @param anonymity anonymity-level for the content
473  * @param expiration expiration time for the content
474  * @param uid unique identifier for the datum;
475  *        maybe 0 if no unique identifier is available
476  */
477 static void
478 process_migration_content (void *cls,
479                            const GNUNET_HashCode * key,
480                            size_t size,
481                            const void *data,
482                            enum GNUNET_BLOCK_Type type,
483                            uint32_t priority,
484                            uint32_t anonymity,
485                            struct GNUNET_TIME_Absolute
486                            expiration, uint64_t uid)
487 {
488   struct MigrationReadyBlock *mb;
489   struct MigrationReadyPeer *pos;
490   
491   if (key == NULL)
492     {
493       mig_qe = NULL;
494       consider_gathering ();
495       return;
496     }
497   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < 
498       MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
499     {
500       /* content will expire soon, don't bother */
501       GNUNET_DATASTORE_get_next (GSF_dsh);
502       return;
503     }
504   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
505     {
506       if (GNUNET_OK !=
507           GNUNET_FS_handle_on_demand_block (key, size, data,
508                                             type, priority, anonymity,
509                                             expiration, uid, 
510                                             &process_migration_content,
511                                             NULL))
512         {
513           GNUNET_DATASTORE_get_next (GSF_dsh);
514         }
515       return;
516     }
517 #if DEBUG_FS
518   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519               "Retrieved block `%s' of type %u for migration\n",
520               GNUNET_h2s (key),
521               type);
522 #endif
523   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
524   mb->query = *key;
525   mb->expiration = expiration;
526   mb->size = size;
527   mb->type = type;
528   memcpy (&mb[1], data, size);
529   GNUNET_CONTAINER_DLL_insert_after (mig_head,
530                                      mig_tail,
531                                      mig_tail,
532                                      mb);
533   mig_size++;
534   pos = peer_head;
535   while (pos != NULL)
536     {
537       if (NULL == pos->th)
538         {
539 #if DEBUG_FS
540           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
541                       "Preparing to push best content to peer\n");
542 #endif
543           if (GNUNET_YES == transmit_content (pos, mb))
544             break; /* 'mb' was freed! */
545         }
546       pos = pos->next;
547     }
548   GNUNET_DATASTORE_get_next (GSF_dsh);
549 }
550
551
552 /**
553  * Task that is run periodically to obtain blocks for content
554  * migration
555  * 
556  * @param cls unused
557  * @param tc scheduler context (also unused)
558  */
559 static void
560 gather_migration_blocks (void *cls,
561                          const struct GNUNET_SCHEDULER_TaskContext *tc)
562 {
563   mig_task = GNUNET_SCHEDULER_NO_TASK;
564   if (GSF_dsh != NULL)
565     {
566       mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh, 
567                                             0, UINT_MAX,
568                                             GNUNET_TIME_UNIT_FOREVER_REL,
569                                             &process_migration_content, NULL);
570       GNUNET_assert (mig_qe != NULL);
571     }
572 }
573
574
575 /**
576  * A peer connected to us.  Start pushing content
577  * to this peer.
578  *
579  * @param peer handle for the peer that connected
580  */
581 void
582 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
583 {
584   struct MigrationReadyPeer *mrp;
585
586   if (GNUNET_YES != enabled)
587     return;
588   mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
589   mrp->peer = peer;
590   find_content (mrp);
591   GNUNET_CONTAINER_DLL_insert  (peer_head,
592                                 peer_tail,
593                                 mrp);
594 }
595
596
597 /**
598  * A peer disconnected from us.  Stop pushing content
599  * to this peer.
600  *
601  * @param peer handle for the peer that disconnected
602  */
603 void
604 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
605 {
606   struct MigrationReadyPeer *pos;
607
608   pos = peer_head;
609   while (pos != NULL)
610     {
611       if (pos->peer == peer)
612         {
613           GNUNET_CONTAINER_DLL_remove (peer_head,
614                                        peer_tail,
615                                        pos);
616           if (NULL != pos->th)
617             GSF_peer_transmit_cancel_ (pos->th);
618           GNUNET_free (pos);
619           return;
620         }
621       pos = pos->next;
622     }
623 }
624
625
626 /**
627  * Setup the module.
628  */
629 void
630 GSF_push_init_ ()
631 {
632   enabled = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
633                                                   "FS",
634                                                   "CONTENT_PUSHING");
635   if (GNUNET_YES != enabled)
636     return;
637  
638   if (GNUNET_OK != 
639       GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
640                                            "fs",
641                                            "MIN_MIGRATION_DELAY",
642                                            &min_migration_delay))
643     {
644       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
645                   _("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
646                   "MIN_MIGRATION_DELAY",
647                   "fs");
648       return;
649     }
650   consider_gathering ();
651 }
652
653
654 /**
655  * Shutdown the module.
656  */
657 void
658 GSF_push_done_ ()
659 {
660   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
661     {
662       GNUNET_SCHEDULER_cancel (mig_task);
663       mig_task = GNUNET_SCHEDULER_NO_TASK;
664     }
665   if (NULL != mig_qe)
666     {
667       GNUNET_DATASTORE_cancel (mig_qe);
668       mig_qe = NULL;
669     }
670   while (NULL != mig_head)
671     delete_migration_block (mig_head);
672   GNUNET_assert (0 == mig_size);
673 }
674
675 /* end of gnunet-service-fs_push.c */