c1f5205108bf1dc81b6bcef6a71b4641da67fbe5
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * How long must content remain valid for us to consider it for migration?  
36  * If content will expire too soon, there is clearly no point in pushing
37  * it to other peers.  This value gives the threshold for migration.  Note
38  * that if this value is increased, the migration testcase may need to be
39  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
40  */
41 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
42
43
44 /**
45  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
46  */
47 struct MigrationReadyBlock
48 {
49
50   /**
51    * This is a doubly-linked list.
52    */
53   struct MigrationReadyBlock *next;
54
55   /**
56    * This is a doubly-linked list.
57    */
58   struct MigrationReadyBlock *prev;
59
60   /**
61    * Query for the block.
62    */
63   GNUNET_HashCode query;
64
65   /**
66    * When does this block expire? 
67    */
68   struct GNUNET_TIME_Absolute expiration;
69
70   /**
71    * Peers we already forwarded this
72    * block to.  Zero for empty entries.
73    */
74   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
75
76   /**
77    * Size of the block.
78    */
79   size_t size;
80
81   /**
82    *  Number of targets already used.
83    */
84   unsigned int used_targets;
85
86   /**
87    * Type of the block.
88    */
89   enum GNUNET_BLOCK_Type type;
90 };
91
92
93 /**
94  * Information about a peer waiting for
95  * migratable data.
96  */
97 struct MigrationReadyPeer
98 {
99   /**
100    * This is a doubly-linked list.
101    */
102   struct MigrationReadyPeer *next;
103
104   /**
105    * This is a doubly-linked list.
106    */
107   struct MigrationReadyPeer *prev;
108
109   /**
110    * Handle to peer.
111    */
112   struct GSF_ConnectedPeer *peer;
113   
114   /**
115    * Handle for current transmission request,
116    * or NULL for none.
117    */
118   struct GSF_PeerTransmitHandle *th;
119
120   /**
121    * Message we are trying to push right now (or NULL)
122    */
123   struct PutMessage *msg;
124 };
125
126
127 /**
128  * Head of linked list of blocks that can be migrated.
129  */
130 static struct MigrationReadyBlock *mig_head;
131
132 /**
133  * Tail of linked list of blocks that can be migrated.
134  */
135 static struct MigrationReadyBlock *mig_tail;
136
137 /**
138  * Head of linked list of peers.
139  */
140 static struct MigrationReadyPeer *peer_head;
141
142 /**
143  * Tail of linked list of peers.
144  */
145 static struct MigrationReadyPeer *peer_tail;
146
147 /**
148  * Request to datastore for migration (or NULL).
149  */
150 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
151
152 /**
153  * ID of task that collects blocks for migration.
154  */
155 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
156
157 /**
158  * What is the maximum frequency at which we are allowed to
159  * poll the datastore for migration content?
160  */
161 static struct GNUNET_TIME_Relative min_migration_delay;
162
163 /**
164  * Size of the doubly-linked list of migration blocks.
165  */
166 static unsigned int mig_size;
167
168 /**
169  * Is this module enabled?
170  */
171 static int enabled;
172
173
174 /**
175  * Delete the given migration block.
176  *
177  * @param mb block to delete
178  */
179 static void
180 delete_migration_block (struct MigrationReadyBlock *mb)
181 {
182   GNUNET_CONTAINER_DLL_remove (mig_head,
183                                mig_tail,
184                                mb);
185   GNUNET_PEER_decrement_rcs (mb->target_list,
186                              MIGRATION_LIST_SIZE);
187   mig_size--;
188   GNUNET_free (mb);
189 }
190
191
192 /**
193  * Find content for migration to this peer.
194  */ 
195 static void
196 find_content (struct MigrationReadyPeer *mrp);
197
198
199 /**
200  * Transmit the message currently scheduled for
201  * transmission.
202  *
203  * @param cls the 'struct MigrationReadyPeer'
204  * @param buf_size number of bytes available in buf
205  * @param buf where to copy the message, NULL on error (peer disconnect)
206  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
207  */
208 static size_t
209 transmit_message (void *cls,
210                   size_t buf_size,
211                   void *buf)
212 {
213   struct MigrationReadyPeer *peer = cls;
214   struct PutMessage *msg;
215   uint16_t msize;
216
217   peer->th = NULL;
218   msg = peer->msg;
219   peer->msg = NULL;
220   if (buf == NULL)
221     {
222       GNUNET_free (msg);
223       return 0;
224     }
225   msize = ntohs (msg->header.size);
226   GNUNET_assert (msize <= buf_size);
227   memcpy (buf, msg, msize);
228   GNUNET_free (msg);
229   find_content (peer);
230   return msize;
231 }
232
233
234 /**
235  * Send the given block to the given peer.
236  *
237  * @param peer target peer
238  * @param block the block
239  * @return GNUNET_YES if the block was deleted (!)
240  */
241 static int
242 transmit_content (struct MigrationReadyPeer *peer,
243                   struct MigrationReadyBlock *block)
244 {
245   size_t msize;
246   struct PutMessage *msg;
247   unsigned  int i;
248   struct GSF_PeerPerformanceData *ppd;
249   int ret;
250
251   ppd = GSF_get_peer_performance_data_ (peer->peer);
252   GNUNET_assert (NULL == peer->th);
253   msize = sizeof (struct PutMessage) + block->size;
254   msg = GNUNET_malloc (msize);
255   msg->header.type = htons (42);
256   msg->header.size = htons (msize);
257   
258   memcpy (&msg[1],
259           &block[1],
260           block->size);
261   peer->msg = msg;
262   for (i=0;i<MIGRATION_LIST_SIZE;i++)
263     {
264       if (block->target_list[i] == 0)
265         {
266           block->target_list[i] = ppd->pid;
267           GNUNET_PEER_change_rc (block->target_list[i], 1);
268           break;
269         }
270     }
271   if (MIGRATION_LIST_SIZE == i)
272     {
273       delete_migration_block (block);
274       ret = GNUNET_YES;
275     }
276   else
277     {
278       ret = GNUNET_NO;
279     }
280   peer->th = GSF_peer_transmit_ (peer->peer,
281                                  GNUNET_NO,
282                                  0 /* priority */,
283                                  GNUNET_TIME_UNIT_FOREVER_REL,
284                                  msize,
285                                  &transmit_message,
286                                  peer);
287   return ret;
288 }
289
290
291 /**
292  * Count the number of peers this block has
293  * already been forwarded to.
294  *
295  * @param block the block
296  * @return number of times block was forwarded
297  */
298 static unsigned int
299 count_targets (struct MigrationReadyBlock *block)
300 {
301   unsigned int i;
302
303   for (i=0;i<MIGRATION_LIST_SIZE;i++)
304     if (block->target_list[i] == 0)
305       return i;
306   return i;
307 }
308
309
310 /**
311  * Check if sending this block to this peer would
312  * be a good idea. 
313  *
314  * @param peer target peer
315  * @param block the block
316  * @return score (>= 0: feasible, negative: infeasible)
317  */
318 static long
319 score_content (struct MigrationReadyPeer *peer,
320                struct MigrationReadyBlock *block)
321 {
322   unsigned int i;
323   struct GSF_PeerPerformanceData *ppd;
324   struct GNUNET_PeerIdentity id;
325   uint32_t dist;
326
327   ppd = GSF_get_peer_performance_data_ (peer->peer);
328   for (i=0;i<MIGRATION_LIST_SIZE;i++)
329     if (block->target_list[i] == ppd->pid)
330       return -1;
331   GNUNET_PEER_resolve (ppd->pid,
332                        &id);
333   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
334                                           &id.hashPubKey);
335   /* closer distance, higher score: */
336   return UINT32_MAX - dist;
337 }
338
339
340 /**
341  * If the migration task is not currently running, consider
342  * (re)scheduling it with the appropriate delay.
343  */
344 static void
345 consider_gathering (void);
346
347
348 /**
349  * Find content for migration to this peer.
350  *
351  * @param mrp peer to find content for
352  */ 
353 static void
354 find_content (struct MigrationReadyPeer *mrp)
355 {
356   struct MigrationReadyBlock *pos;
357   long score;
358   long best_score;
359   struct MigrationReadyBlock *best;
360
361   GNUNET_assert (NULL == mrp->th);
362   best = NULL;
363   best_score = -1;
364   pos = mig_head;
365   while (NULL != pos)
366     {
367       score = score_content (mrp, pos);
368       if (score > best_score)
369         {
370           best_score = score;
371           best = pos;
372         }
373       pos = pos->next;
374     }
375   if ( (NULL == best) &&
376        (mig_size >= MAX_MIGRATION_QUEUE) )
377     {
378       /* failed to find migration target AND
379          queue is full, purge most-forwarded
380          block from queue to make room for more */
381       score = 0;
382       pos = mig_head;
383       while (NULL != pos)
384         {
385           score = count_targets (pos);
386           if (score >= best_score)
387             {
388               best_score = score;
389               best = pos;
390             }
391           pos = pos->next;
392         }
393       GNUNET_assert (NULL != best);
394       delete_migration_block (best);      
395       consider_gathering ();
396       return;
397     }
398   transmit_content (mrp, best);
399 }
400
401
402 /**
403  * Task that is run periodically to obtain blocks for content
404  * migration
405  * 
406  * @param cls unused
407  * @param tc scheduler context (also unused)
408  */
409 static void
410 gather_migration_blocks (void *cls,
411                          const struct GNUNET_SCHEDULER_TaskContext *tc);
412
413
414 /**
415  * If the migration task is not currently running, consider
416  * (re)scheduling it with the appropriate delay.
417  */
418 static void
419 consider_gathering ()
420 {
421   struct GNUNET_TIME_Relative delay;
422
423   if (GSF_dsh == NULL)
424     return;
425   if (mig_qe != NULL)
426     return;
427   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
428     return;
429   if (mig_size >= MAX_MIGRATION_QUEUE)  
430     return;
431   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
432                                          mig_size);
433   delay = GNUNET_TIME_relative_divide (delay,
434                                        MAX_MIGRATION_QUEUE);
435   delay = GNUNET_TIME_relative_max (delay,
436                                     min_migration_delay);
437   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
438                                            &gather_migration_blocks,
439                                            NULL);
440 }
441
442
443 /**
444  * Process content offered for migration.
445  *
446  * @param cls closure
447  * @param key key for the content
448  * @param size number of bytes in data
449  * @param data content stored
450  * @param type type of the content
451  * @param priority priority of the content
452  * @param anonymity anonymity-level for the content
453  * @param expiration expiration time for the content
454  * @param uid unique identifier for the datum;
455  *        maybe 0 if no unique identifier is available
456  */
457 static void
458 process_migration_content (void *cls,
459                            const GNUNET_HashCode * key,
460                            size_t size,
461                            const void *data,
462                            enum GNUNET_BLOCK_Type type,
463                            uint32_t priority,
464                            uint32_t anonymity,
465                            struct GNUNET_TIME_Absolute
466                            expiration, uint64_t uid)
467 {
468   struct MigrationReadyBlock *mb;
469   struct MigrationReadyPeer *pos;
470   
471   if (key == NULL)
472     {
473       mig_qe = NULL;
474       consider_gathering ();
475       return;
476     }
477   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < 
478       MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
479     {
480       /* content will expire soon, don't bother */
481       GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
482       return;
483     }
484   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
485     {
486       if (GNUNET_OK !=
487           GNUNET_FS_handle_on_demand_block (key, size, data,
488                                             type, priority, anonymity,
489                                             expiration, uid, 
490                                             &process_migration_content,
491                                             NULL))
492         {
493           GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
494         }
495       return;
496     }
497 #if DEBUG_FS
498   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
499               "Retrieved block `%s' of type %u for migration\n",
500               GNUNET_h2s (key),
501               type);
502 #endif
503   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
504   mb->query = *key;
505   mb->expiration = expiration;
506   mb->size = size;
507   mb->type = type;
508   memcpy (&mb[1], data, size);
509   GNUNET_CONTAINER_DLL_insert_after (mig_head,
510                                      mig_tail,
511                                      mig_tail,
512                                      mb);
513   mig_size++;
514   pos = peer_head;
515   while (pos != NULL)
516     {
517       if (NULL == pos->th)
518         {
519           if (GNUNET_YES == transmit_content (pos, mb))
520             break; /* 'mb' was freed! */
521         }
522       pos = pos->next;
523     }
524   GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
525 }
526
527
528 /**
529  * Task that is run periodically to obtain blocks for content
530  * migration
531  * 
532  * @param cls unused
533  * @param tc scheduler context (also unused)
534  */
535 static void
536 gather_migration_blocks (void *cls,
537                          const struct GNUNET_SCHEDULER_TaskContext *tc)
538 {
539   mig_task = GNUNET_SCHEDULER_NO_TASK;
540   if (GSF_dsh != NULL)
541     {
542       mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh, 
543                                             0, UINT_MAX,
544                                             GNUNET_TIME_UNIT_FOREVER_REL,
545                                             &process_migration_content, NULL);
546       GNUNET_assert (mig_qe != NULL);
547     }
548 }
549
550
551 /**
552  * A peer connected to us.  Start pushing content
553  * to this peer.
554  *
555  * @param peer handle for the peer that connected
556  */
557 void
558 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
559 {
560   struct MigrationReadyPeer *mrp;
561
562   if (GNUNET_YES != enabled)
563     return;
564   mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
565   mrp->peer = peer;
566   find_content (mrp);
567   GNUNET_CONTAINER_DLL_insert  (peer_head,
568                                 peer_tail,
569                                 mrp);
570 }
571
572
573 /**
574  * A peer disconnected from us.  Stop pushing content
575  * to this peer.
576  *
577  * @param peer handle for the peer that disconnected
578  */
579 void
580 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
581 {
582   struct MigrationReadyPeer *pos;
583
584   pos = peer_head;
585   while (pos != NULL)
586     {
587       if (pos->peer == peer)
588         {
589           GNUNET_CONTAINER_DLL_remove (peer_head,
590                                        peer_tail,
591                                        pos);
592           if (NULL != pos->th)
593             GSF_peer_transmit_cancel_ (pos->th);
594           GNUNET_free (pos);
595           return;
596         }
597       pos = pos->next;
598     }
599 }
600
601
602 /**
603  * Setup the module.
604  * 
605  * @param cfg configuration to use
606  */
607 void
608 GSF_push_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
609 {
610   int enabled;
611
612   enabled = GNUNET_CONFIGURATION_get_value_yesno (cfg,
613                                                   "FS",
614                                                   "CONTENT_PUSHING");
615   if (GNUNET_YES != enabled)
616     return;
617  
618   if (GNUNET_OK != 
619       GNUNET_CONFIGURATION_get_value_time (cfg,
620                                            "fs",
621                                            "MIN_MIGRATION_DELAY",
622                                            &min_migration_delay))
623     {
624       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
625                   _("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
626                   "MIN_MIGRATION_DELAY",
627                   "fs");
628       return;
629     }
630   consider_gathering ();
631 }
632
633
634 /**
635  * Shutdown the module.
636  */
637 void
638 GSF_push_done_ ()
639 {
640   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
641     {
642       GNUNET_SCHEDULER_cancel (mig_task);
643       mig_task = GNUNET_SCHEDULER_NO_TASK;
644     }
645   if (NULL != mig_qe)
646     {
647       GNUNET_DATASTORE_cancel (mig_qe);
648       mig_qe = NULL;
649     }
650   while (NULL != mig_head)
651     delete_migration_block (mig_head);
652   GNUNET_assert (0 == mig_size);
653 }
654
655 /* end of gnunet-service-fs_push.c */