indentation
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 #define DEBUG_FS_MIGRATION GNUNET_NO
35
36 /**
37  * How long must content remain valid for us to consider it for migration?  
38  * If content will expire too soon, there is clearly no point in pushing
39  * it to other peers.  This value gives the threshold for migration.  Note
40  * that if this value is increased, the migration testcase may need to be
41  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
42  */
43 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
44
45
46 /**
47  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
48  */
49 struct MigrationReadyBlock
50 {
51
52   /**
53    * This is a doubly-linked list.
54    */
55   struct MigrationReadyBlock *next;
56
57   /**
58    * This is a doubly-linked list.
59    */
60   struct MigrationReadyBlock *prev;
61
62   /**
63    * Query for the block.
64    */
65   GNUNET_HashCode query;
66
67   /**
68    * When does this block expire? 
69    */
70   struct GNUNET_TIME_Absolute expiration;
71
72   /**
73    * Peers we already forwarded this
74    * block to.  Zero for empty entries.
75    */
76   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
77
78   /**
79    * Size of the block.
80    */
81   size_t size;
82
83   /**
84    *  Number of targets already used.
85    */
86   unsigned int used_targets;
87
88   /**
89    * Type of the block.
90    */
91   enum GNUNET_BLOCK_Type type;
92 };
93
94
95 /**
96  * Information about a peer waiting for
97  * migratable data.
98  */
99 struct MigrationReadyPeer
100 {
101   /**
102    * This is a doubly-linked list.
103    */
104   struct MigrationReadyPeer *next;
105
106   /**
107    * This is a doubly-linked list.
108    */
109   struct MigrationReadyPeer *prev;
110
111   /**
112    * Handle to peer.
113    */
114   struct GSF_ConnectedPeer *peer;
115
116   /**
117    * Handle for current transmission request,
118    * or NULL for none.
119    */
120   struct GSF_PeerTransmitHandle *th;
121
122   /**
123    * Message we are trying to push right now (or NULL)
124    */
125   struct PutMessage *msg;
126 };
127
128
129 /**
130  * Head of linked list of blocks that can be migrated.
131  */
132 static struct MigrationReadyBlock *mig_head;
133
134 /**
135  * Tail of linked list of blocks that can be migrated.
136  */
137 static struct MigrationReadyBlock *mig_tail;
138
139 /**
140  * Head of linked list of peers.
141  */
142 static struct MigrationReadyPeer *peer_head;
143
144 /**
145  * Tail of linked list of peers.
146  */
147 static struct MigrationReadyPeer *peer_tail;
148
149 /**
150  * Request to datastore for migration (or NULL).
151  */
152 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
153
154 /**
155  * ID of task that collects blocks for migration.
156  */
157 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
158
159 /**
160  * What is the maximum frequency at which we are allowed to
161  * poll the datastore for migration content?
162  */
163 static struct GNUNET_TIME_Relative min_migration_delay;
164
165 /**
166  * Size of the doubly-linked list of migration blocks.
167  */
168 static unsigned int mig_size;
169
170 /**
171  * Is this module enabled?
172  */
173 static int enabled;
174
175
176 /**
177  * Delete the given migration block.
178  *
179  * @param mb block to delete
180  */
181 static void
182 delete_migration_block (struct MigrationReadyBlock *mb)
183 {
184   GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb);
185   GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE);
186   mig_size--;
187   GNUNET_free (mb);
188 }
189
190
191 /**
192  * Find content for migration to this peer.
193  */
194 static void find_content (struct MigrationReadyPeer *mrp);
195
196
197 /**
198  * Transmit the message currently scheduled for
199  * transmission.
200  *
201  * @param cls the 'struct MigrationReadyPeer'
202  * @param buf_size number of bytes available in buf
203  * @param buf where to copy the message, NULL on error (peer disconnect)
204  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
205  */
206 static size_t
207 transmit_message (void *cls, size_t buf_size, void *buf)
208 {
209   struct MigrationReadyPeer *peer = cls;
210   struct PutMessage *msg;
211   uint16_t msize;
212
213   peer->th = NULL;
214   msg = peer->msg;
215   peer->msg = NULL;
216   if (buf == NULL)
217   {
218 #if DEBUG_FS_MIGRATION
219     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
220                 "Failed to migrate content to another peer (disconnect)\n");
221 #endif
222     GNUNET_free (msg);
223     return 0;
224   }
225   msize = ntohs (msg->header.size);
226   GNUNET_assert (msize <= buf_size);
227   memcpy (buf, msg, msize);
228   GNUNET_free (msg);
229 #if DEBUG_FS_MIGRATION
230   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
231               "Pushing %u bytes to another peer\n", msize);
232 #endif
233   find_content (peer);
234   return msize;
235 }
236
237
238 /**
239  * Send the given block to the given peer.
240  *
241  * @param peer target peer
242  * @param block the block
243  * @return GNUNET_YES if the block was deleted (!)
244  */
245 static int
246 transmit_content (struct MigrationReadyPeer *peer,
247                   struct MigrationReadyBlock *block)
248 {
249   size_t msize;
250   struct PutMessage *msg;
251   unsigned int i;
252   struct GSF_PeerPerformanceData *ppd;
253   int ret;
254
255   ppd = GSF_get_peer_performance_data_ (peer->peer);
256   GNUNET_assert (NULL == peer->th);
257   msize = sizeof (struct PutMessage) + block->size;
258   msg = GNUNET_malloc (msize);
259   msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
260   msg->header.size = htons (msize);
261   msg->type = htonl (block->type);
262   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
263   memcpy (&msg[1], &block[1], block->size);
264   peer->msg = msg;
265   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
266   {
267     if (block->target_list[i] == 0)
268     {
269       block->target_list[i] = ppd->pid;
270       GNUNET_PEER_change_rc (block->target_list[i], 1);
271       break;
272     }
273   }
274   if (MIGRATION_LIST_SIZE == i)
275   {
276     delete_migration_block (block);
277     ret = GNUNET_YES;
278   }
279   else
280   {
281     ret = GNUNET_NO;
282   }
283 #if DEBUG_FS_MIGRATION
284   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
285               "Asking for transmission of %u bytes for migration\n", msize);
286 #endif
287   peer->th = GSF_peer_transmit_ (peer->peer, GNUNET_NO, 0 /* priority */ ,
288                                  GNUNET_TIME_UNIT_FOREVER_REL,
289                                  msize, &transmit_message, peer);
290   return ret;
291 }
292
293
294 /**
295  * Count the number of peers this block has
296  * already been forwarded to.
297  *
298  * @param block the block
299  * @return number of times block was forwarded
300  */
301 static unsigned int
302 count_targets (struct MigrationReadyBlock *block)
303 {
304   unsigned int i;
305
306   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
307     if (block->target_list[i] == 0)
308       return i;
309   return i;
310 }
311
312
313 /**
314  * Check if sending this block to this peer would
315  * be a good idea. 
316  *
317  * @param peer target peer
318  * @param block the block
319  * @return score (>= 0: feasible, negative: infeasible)
320  */
321 static long
322 score_content (struct MigrationReadyPeer *peer,
323                struct MigrationReadyBlock *block)
324 {
325   unsigned int i;
326   struct GSF_PeerPerformanceData *ppd;
327   struct GNUNET_PeerIdentity id;
328   uint32_t dist;
329
330   ppd = GSF_get_peer_performance_data_ (peer->peer);
331   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
332     if (block->target_list[i] == ppd->pid)
333       return -1;
334   GNUNET_assert (0 != ppd->pid);
335   GNUNET_PEER_resolve (ppd->pid, &id);
336   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &id.hashPubKey);
337   /* closer distance, higher score: */
338   return UINT32_MAX - dist;
339 }
340
341
342 /**
343  * If the migration task is not currently running, consider
344  * (re)scheduling it with the appropriate delay.
345  */
346 static void consider_gathering (void);
347
348
349 /**
350  * Find content for migration to this peer.
351  *
352  * @param mrp peer to find content for
353  */
354 static void
355 find_content (struct MigrationReadyPeer *mrp)
356 {
357   struct MigrationReadyBlock *pos;
358   long score;
359   long best_score;
360   struct MigrationReadyBlock *best;
361
362   GNUNET_assert (NULL == mrp->th);
363   best = NULL;
364   best_score = -1;
365   pos = mig_head;
366   while (NULL != pos)
367   {
368     score = score_content (mrp, pos);
369     if (score > best_score)
370     {
371       best_score = score;
372       best = pos;
373     }
374     pos = pos->next;
375   }
376   if (NULL == best)
377   {
378     if (mig_size < MAX_MIGRATION_QUEUE)
379     {
380 #if DEBUG_FS_MIGRATION
381       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
382                   "No content found for pushing, waiting for queue to fill\n");
383 #endif
384       return;                   /* will fill up eventually... */
385     }
386 #if DEBUG_FS_MIGRATION
387     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
388                 "No suitable content found, purging content from full queue\n");
389 #endif
390     /* failed to find migration target AND
391      * queue is full, purge most-forwarded
392      * block from queue to make room for more */
393     pos = mig_head;
394     while (NULL != pos)
395     {
396       score = count_targets (pos);
397       if (score >= best_score)
398       {
399         best_score = score;
400         best = pos;
401       }
402       pos = pos->next;
403     }
404     GNUNET_assert (NULL != best);
405     delete_migration_block (best);
406     consider_gathering ();
407     return;
408   }
409 #if DEBUG_FS_MIGRATION
410   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
411               "Preparing to push best content to peer\n");
412 #endif
413   transmit_content (mrp, best);
414 }
415
416
417 /**
418  * Task that is run periodically to obtain blocks for content
419  * migration
420  * 
421  * @param cls unused
422  * @param tc scheduler context (also unused)
423  */
424 static void
425 gather_migration_blocks (void *cls,
426                          const struct GNUNET_SCHEDULER_TaskContext *tc);
427
428
429 /**
430  * If the migration task is not currently running, consider
431  * (re)scheduling it with the appropriate delay.
432  */
433 static void
434 consider_gathering ()
435 {
436   struct GNUNET_TIME_Relative delay;
437
438   if (GSF_dsh == NULL)
439     return;
440   if (mig_qe != NULL)
441     return;
442   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
443     return;
444   if (mig_size >= MAX_MIGRATION_QUEUE)
445     return;
446   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size);
447   delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE);
448   delay = GNUNET_TIME_relative_max (delay, min_migration_delay);
449 #if DEBUG_FS_MIGRATION
450   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
451               "Scheduling gathering task (queue size: %u)\n", mig_size);
452 #endif
453   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
454                                            &gather_migration_blocks, NULL);
455 }
456
457
458 /**
459  * Process content offered for migration.
460  *
461  * @param cls closure
462  * @param key key for the content
463  * @param size number of bytes in data
464  * @param data content stored
465  * @param type type of the content
466  * @param priority priority of the content
467  * @param anonymity anonymity-level for the content
468  * @param expiration expiration time for the content
469  * @param uid unique identifier for the datum;
470  *        maybe 0 if no unique identifier is available
471  */
472 static void
473 process_migration_content (void *cls,
474                            const GNUNET_HashCode * key,
475                            size_t size,
476                            const void *data,
477                            enum GNUNET_BLOCK_Type type,
478                            uint32_t priority,
479                            uint32_t anonymity,
480                            struct GNUNET_TIME_Absolute expiration, uint64_t uid)
481 {
482   struct MigrationReadyBlock *mb;
483   struct MigrationReadyPeer *pos;
484
485   mig_qe = NULL;
486   if (key == NULL)
487   {
488 #if DEBUG_FS_MIGRATION
489     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for migration...\n");
490 #endif
491     consider_gathering ();
492     return;
493   }
494   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
495       MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
496   {
497     /* content will expire soon, don't bother */
498     consider_gathering ();
499     return;
500   }
501   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
502   {
503     if (GNUNET_OK !=
504         GNUNET_FS_handle_on_demand_block (key, size, data,
505                                           type, priority, anonymity,
506                                           expiration, uid,
507                                           &process_migration_content, NULL))
508       consider_gathering ();
509     return;
510   }
511 #if DEBUG_FS_MIGRATION
512   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
514               GNUNET_h2s (key), type, mig_size + 1, MAX_MIGRATION_QUEUE);
515 #endif
516   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
517   mb->query = *key;
518   mb->expiration = expiration;
519   mb->size = size;
520   mb->type = type;
521   memcpy (&mb[1], data, size);
522   GNUNET_CONTAINER_DLL_insert_after (mig_head, mig_tail, mig_tail, mb);
523   mig_size++;
524   pos = peer_head;
525   while (pos != NULL)
526   {
527     if (NULL == pos->th)
528     {
529 #if DEBUG_FS_MIGRATION
530       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
531                   "Preparing to push best content to peer\n");
532 #endif
533       if (GNUNET_YES == transmit_content (pos, mb))
534         break;                  /* 'mb' was freed! */
535     }
536     pos = pos->next;
537   }
538   consider_gathering ();
539 }
540
541
542 /**
543  * Task that is run periodically to obtain blocks for content
544  * migration
545  * 
546  * @param cls unused
547  * @param tc scheduler context (also unused)
548  */
549 static void
550 gather_migration_blocks (void *cls,
551                          const struct GNUNET_SCHEDULER_TaskContext *tc)
552 {
553   mig_task = GNUNET_SCHEDULER_NO_TASK;
554   if (mig_size >= MAX_MIGRATION_QUEUE)
555     return;
556   if (GSF_dsh != NULL)
557   {
558 #if DEBUG_FS_MIGRATION
559     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
560                 "Asking datastore for content for replication (queue size: %u)\n",
561                 mig_size);
562 #endif
563     mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
564                                                    0, UINT_MAX,
565                                                    GNUNET_TIME_UNIT_FOREVER_REL,
566                                                    &process_migration_content,
567                                                    NULL);
568     if (NULL == mig_qe)
569       consider_gathering ();
570   }
571 }
572
573
574 /**
575  * A peer connected to us.  Start pushing content
576  * to this peer.
577  *
578  * @param peer handle for the peer that connected
579  */
580 void
581 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
582 {
583   struct MigrationReadyPeer *mrp;
584
585   if (GNUNET_YES != enabled)
586     return;
587   mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
588   mrp->peer = peer;
589   find_content (mrp);
590   GNUNET_CONTAINER_DLL_insert (peer_head, peer_tail, mrp);
591 }
592
593
594 /**
595  * A peer disconnected from us.  Stop pushing content
596  * to this peer.
597  *
598  * @param peer handle for the peer that disconnected
599  */
600 void
601 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
602 {
603   struct MigrationReadyPeer *pos;
604
605   pos = peer_head;
606   while (pos != NULL)
607   {
608     if (pos->peer == peer)
609     {
610       GNUNET_CONTAINER_DLL_remove (peer_head, peer_tail, pos);
611       if (NULL != pos->th)
612       {
613         GSF_peer_transmit_cancel_ (pos->th);
614         pos->th = NULL;
615       }
616       if (NULL != pos->msg)
617       {
618         GNUNET_free (pos->msg);
619         pos->msg = NULL;
620       }
621       GNUNET_free (pos);
622       return;
623     }
624     pos = pos->next;
625   }
626 }
627
628
629 /**
630  * Setup the module.
631  */
632 void
633 GSF_push_init_ ()
634 {
635   enabled = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
636                                                   "FS", "CONTENT_PUSHING");
637   if (GNUNET_YES != enabled)
638     return;
639
640   if (GNUNET_OK !=
641       GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
642                                            "fs",
643                                            "MIN_MIGRATION_DELAY",
644                                            &min_migration_delay))
645   {
646     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
647                 _
648                 ("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
649                 "MIN_MIGRATION_DELAY", "fs");
650     return;
651   }
652   consider_gathering ();
653 }
654
655
656 /**
657  * Shutdown the module.
658  */
659 void
660 GSF_push_done_ ()
661 {
662   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
663   {
664     GNUNET_SCHEDULER_cancel (mig_task);
665     mig_task = GNUNET_SCHEDULER_NO_TASK;
666   }
667   if (NULL != mig_qe)
668   {
669     GNUNET_DATASTORE_cancel (mig_qe);
670     mig_qe = NULL;
671   }
672   while (NULL != mig_head)
673     delete_migration_block (mig_head);
674   GNUNET_assert (0 == mig_size);
675 }
676
677 /* end of gnunet-service-fs_push.c */