trying to fix #3576
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * Maximum number of blocks we keep in memory for migration.
36  */
37 #define MAX_MIGRATION_QUEUE 8
38
39 /**
40  * Blocks are at most migrated to this number of peers
41  * plus one, each time they are fetched from the database.
42  */
43 #define MIGRATION_LIST_SIZE 2
44
45 /**
46  * How long must content remain valid for us to consider it for migration?
47  * If content will expire too soon, there is clearly no point in pushing
48  * it to other peers.  This value gives the threshold for migration.  Note
49  * that if this value is increased, the migration testcase may need to be
50  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
51  */
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
53
54
55 /**
56  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
57  */
58 struct MigrationReadyBlock
59 {
60
61   /**
62    * This is a doubly-linked list.
63    */
64   struct MigrationReadyBlock *next;
65
66   /**
67    * This is a doubly-linked list.
68    */
69   struct MigrationReadyBlock *prev;
70
71   /**
72    * Query for the block.
73    */
74   struct GNUNET_HashCode query;
75
76   /**
77    * When does this block expire?
78    */
79   struct GNUNET_TIME_Absolute expiration;
80
81   /**
82    * Peers we already forwarded this
83    * block to.  Zero for empty entries.
84    */
85   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
86
87   /**
88    * Size of the block.
89    */
90   size_t size;
91
92   /**
93    *  Number of targets already used.
94    */
95   unsigned int used_targets;
96
97   /**
98    * Type of the block.
99    */
100   enum GNUNET_BLOCK_Type type;
101 };
102
103
104 /**
105  * Information about a peer waiting for
106  * migratable data.
107  */
108 struct MigrationReadyPeer
109 {
110   /**
111    * This is a doubly-linked list.
112    */
113   struct MigrationReadyPeer *next;
114
115   /**
116    * This is a doubly-linked list.
117    */
118   struct MigrationReadyPeer *prev;
119
120   /**
121    * Handle to peer.
122    */
123   struct GSF_ConnectedPeer *peer;
124
125   /**
126    * Handle for current transmission request,
127    * or NULL for none.
128    */
129   struct GSF_PeerTransmitHandle *th;
130
131   /**
132    * Message we are trying to push right now (or NULL)
133    */
134   struct PutMessage *msg;
135 };
136
137
138 /**
139  * Head of linked list of blocks that can be migrated.
140  */
141 static struct MigrationReadyBlock *mig_head;
142
143 /**
144  * Tail of linked list of blocks that can be migrated.
145  */
146 static struct MigrationReadyBlock *mig_tail;
147
148 /**
149  * Head of linked list of peers.
150  */
151 static struct MigrationReadyPeer *peer_head;
152
153 /**
154  * Tail of linked list of peers.
155  */
156 static struct MigrationReadyPeer *peer_tail;
157
158 /**
159  * Request to datastore for migration (or NULL).
160  */
161 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
162
163 /**
164  * ID of task that collects blocks for migration.
165  */
166 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
167
168 /**
169  * What is the maximum frequency at which we are allowed to
170  * poll the datastore for migration content?
171  */
172 static struct GNUNET_TIME_Relative min_migration_delay;
173
174 /**
175  * Size of the doubly-linked list of migration blocks.
176  */
177 static unsigned int mig_size;
178
179 /**
180  * Is this module enabled?
181  */
182 static int enabled;
183
184
185 /**
186  * Delete the given migration block.
187  *
188  * @param mb block to delete
189  */
190 static void
191 delete_migration_block (struct MigrationReadyBlock *mb)
192 {
193   GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb);
194   GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE);
195   mig_size--;
196   GNUNET_free (mb);
197 }
198
199
200 /**
201  * Find content for migration to this peer.
202  */
203 static void
204 find_content (struct MigrationReadyPeer *mrp);
205
206
207 /**
208  * Transmit the message currently scheduled for
209  * transmission.
210  *
211  * @param cls the 'struct MigrationReadyPeer'
212  * @param buf_size number of bytes available in buf
213  * @param buf where to copy the message, NULL on error (peer disconnect)
214  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
215  */
216 static size_t
217 transmit_message (void *cls, size_t buf_size, void *buf)
218 {
219   struct MigrationReadyPeer *peer = cls;
220   struct PutMessage *msg;
221   uint16_t msize;
222
223   peer->th = NULL;
224   msg = peer->msg;
225   peer->msg = NULL;
226   if (buf == NULL)
227   {
228     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
229                 "Failed to migrate content to another peer (disconnect)\n");
230     GNUNET_free (msg);
231     return 0;
232   }
233   msize = ntohs (msg->header.size);
234   GNUNET_assert (msize <= buf_size);
235   memcpy (buf, msg, msize);
236   GNUNET_free (msg);
237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pushing %u bytes to another peer\n",
238               msize);
239   find_content (peer);
240   return msize;
241 }
242
243
244 /**
245  * Send the given block to the given peer.
246  *
247  * @param peer target peer
248  * @param block the block
249  * @return GNUNET_YES if the block was deleted (!)
250  */
251 static int
252 transmit_content (struct MigrationReadyPeer *peer,
253                   struct MigrationReadyBlock *block)
254 {
255   size_t msize;
256   struct PutMessage *msg;
257   unsigned int i;
258   struct GSF_PeerPerformanceData *ppd;
259   int ret;
260
261   ppd = GSF_get_peer_performance_data_ (peer->peer);
262   GNUNET_assert (NULL == peer->th);
263   msize = sizeof (struct PutMessage) + block->size;
264   msg = GNUNET_malloc (msize);
265   msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
266   msg->header.size = htons (msize);
267   msg->type = htonl (block->type);
268   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
269   memcpy (&msg[1], &block[1], block->size);
270   peer->msg = msg;
271   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
272   {
273     if (block->target_list[i] == 0)
274     {
275       block->target_list[i] = ppd->pid;
276       GNUNET_PEER_change_rc (block->target_list[i], 1);
277       break;
278     }
279   }
280   if (MIGRATION_LIST_SIZE == i)
281   {
282     delete_migration_block (block);
283     ret = GNUNET_YES;
284   }
285   else
286   {
287     ret = GNUNET_NO;
288   }
289   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
290               "Asking for transmission of %u bytes for migration\n", msize);
291   peer->th = GSF_peer_transmit_ (peer->peer, GNUNET_NO, 0 /* priority */ ,
292                                  GNUNET_TIME_UNIT_FOREVER_REL, msize,
293                                  &transmit_message, peer);
294   return ret;
295 }
296
297
298 /**
299  * Count the number of peers this block has
300  * already been forwarded to.
301  *
302  * @param block the block
303  * @return number of times block was forwarded
304  */
305 static unsigned int
306 count_targets (struct MigrationReadyBlock *block)
307 {
308   unsigned int i;
309
310   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
311     if (block->target_list[i] == 0)
312       return i;
313   return i;
314 }
315
316
317 /**
318  * Check if sending this block to this peer would
319  * be a good idea.
320  *
321  * @param peer target peer
322  * @param block the block
323  * @return score (>= 0: feasible, negative: infeasible)
324  */
325 static long
326 score_content (struct MigrationReadyPeer *peer,
327                struct MigrationReadyBlock *block)
328 {
329   unsigned int i;
330   struct GSF_PeerPerformanceData *ppd;
331   struct GNUNET_PeerIdentity id;
332   struct GNUNET_HashCode hc;
333   uint32_t dist;
334
335   ppd = GSF_get_peer_performance_data_ (peer->peer);
336   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
337     if (block->target_list[i] == ppd->pid)
338       return -1;
339   GNUNET_assert (0 != ppd->pid);
340   GNUNET_PEER_resolve (ppd->pid, &id);
341   GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc);
342   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc);
343   /* closer distance, higher score: */
344   return UINT32_MAX - dist;
345 }
346
347
348 /**
349  * If the migration task is not currently running, consider
350  * (re)scheduling it with the appropriate delay.
351  */
352 static void
353 consider_gathering (void);
354
355
356 /**
357  * Find content for migration to this peer.
358  *
359  * @param mrp peer to find content for
360  */
361 static void
362 find_content (struct MigrationReadyPeer *mrp)
363 {
364   struct MigrationReadyBlock *pos;
365   long score;
366   long best_score;
367   struct MigrationReadyBlock *best;
368
369   GNUNET_assert (NULL == mrp->th);
370   best = NULL;
371   best_score = -1;
372   pos = mig_head;
373   while (NULL != pos)
374   {
375     score = score_content (mrp, pos);
376     if (score > best_score)
377     {
378       best_score = score;
379       best = pos;
380     }
381     pos = pos->next;
382   }
383   if (NULL == best)
384   {
385     if (mig_size < MAX_MIGRATION_QUEUE)
386     {
387       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
388                   "No content found for pushing, waiting for queue to fill\n");
389       return;                   /* will fill up eventually... */
390     }
391     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392                 "No suitable content found, purging content from full queue\n");
393     /* failed to find migration target AND
394      * queue is full, purge most-forwarded
395      * block from queue to make room for more */
396     pos = mig_head;
397     while (NULL != pos)
398     {
399       score = count_targets (pos);
400       if (score >= best_score)
401       {
402         best_score = score;
403         best = pos;
404       }
405       pos = pos->next;
406     }
407     GNUNET_assert (NULL != best);
408     delete_migration_block (best);
409     consider_gathering ();
410     return;
411   }
412   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413               "Preparing to push best content to peer\n");
414   transmit_content (mrp, best);
415 }
416
417
418 /**
419  * Task that is run periodically to obtain blocks for content
420  * migration
421  *
422  * @param cls unused
423  * @param tc scheduler context (also unused)
424  */
425 static void
426 gather_migration_blocks (void *cls,
427                          const struct GNUNET_SCHEDULER_TaskContext *tc);
428
429
430 /**
431  * If the migration task is not currently running, consider
432  * (re)scheduling it with the appropriate delay.
433  */
434 static void
435 consider_gathering ()
436 {
437   struct GNUNET_TIME_Relative delay;
438
439   if (GSF_dsh == NULL)
440     return;
441   if (mig_qe != NULL)
442     return;
443   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
444     return;
445   if (mig_size >= MAX_MIGRATION_QUEUE)
446     return;
447   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size);
448   delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE);
449   delay = GNUNET_TIME_relative_max (delay, min_migration_delay);
450   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
451               "Scheduling gathering task (queue size: %u)\n", mig_size);
452   mig_task =
453       GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL);
454 }
455
456
457 /**
458  * Process content offered for migration.
459  *
460  * @param cls closure
461  * @param key key for the content
462  * @param size number of bytes in data
463  * @param data content stored
464  * @param type type of the content
465  * @param priority priority of the content
466  * @param anonymity anonymity-level for the content
467  * @param expiration expiration time for the content
468  * @param uid unique identifier for the datum;
469  *        maybe 0 if no unique identifier is available
470  */
471 static void
472 process_migration_content (void *cls, const struct GNUNET_HashCode * key, size_t size,
473                            const void *data, enum GNUNET_BLOCK_Type type,
474                            uint32_t priority, uint32_t anonymity,
475                            struct GNUNET_TIME_Absolute expiration, uint64_t uid)
476 {
477   struct MigrationReadyBlock *mb;
478   struct MigrationReadyPeer *pos;
479
480   mig_qe = NULL;
481   if (key == NULL)
482   {
483     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for migration...\n");
484     consider_gathering ();
485     return;
486   }
487   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
488       MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
489   {
490     /* content will expire soon, don't bother */
491     consider_gathering ();
492     return;
493   }
494   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
495   {
496     if (GNUNET_OK !=
497         GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
498                                           anonymity, expiration, uid,
499                                           &process_migration_content, NULL))
500       consider_gathering ();
501     return;
502   }
503   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
504               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
505               GNUNET_h2s (key), type, mig_size + 1, MAX_MIGRATION_QUEUE);
506   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
507   mb->query = *key;
508   mb->expiration = expiration;
509   mb->size = size;
510   mb->type = type;
511   memcpy (&mb[1], data, size);
512   GNUNET_CONTAINER_DLL_insert_after (mig_head, mig_tail, mig_tail, mb);
513   mig_size++;
514   pos = peer_head;
515   while (pos != NULL)
516   {
517     if (NULL == pos->th)
518     {
519       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
520                   "Preparing to push best content to peer\n");
521       if (GNUNET_YES == transmit_content (pos, mb))
522         break;                  /* 'mb' was freed! */
523     }
524     pos = pos->next;
525   }
526   consider_gathering ();
527 }
528
529
530 /**
531  * Task that is run periodically to obtain blocks for content
532  * migration
533  *
534  * @param cls unused
535  * @param tc scheduler context (also unused)
536  */
537 static void
538 gather_migration_blocks (void *cls,
539                          const struct GNUNET_SCHEDULER_TaskContext *tc)
540 {
541   mig_task = GNUNET_SCHEDULER_NO_TASK;
542   if (mig_size >= MAX_MIGRATION_QUEUE)
543     return;
544   if (GSF_dsh != NULL)
545   {
546     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
547                 "Asking datastore for content for replication (queue size: %u)\n",
548                 mig_size);
549     mig_qe =
550         GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX,
551                                               GNUNET_TIME_UNIT_FOREVER_REL,
552                                               &process_migration_content, NULL);
553     if (NULL == mig_qe)
554       consider_gathering ();
555   }
556 }
557
558
559 /**
560  * A peer connected to us.  Start pushing content
561  * to this peer.
562  *
563  * @param peer handle for the peer that connected
564  */
565 void
566 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
567 {
568   struct MigrationReadyPeer *mrp;
569
570   if (GNUNET_YES != enabled)
571     return;
572   mrp = GNUNET_new (struct MigrationReadyPeer);
573   mrp->peer = peer;
574   find_content (mrp);
575   GNUNET_CONTAINER_DLL_insert (peer_head, peer_tail, mrp);
576 }
577
578
579 /**
580  * A peer disconnected from us.  Stop pushing content
581  * to this peer.
582  *
583  * @param peer handle for the peer that disconnected
584  */
585 void
586 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
587 {
588   struct MigrationReadyPeer *pos;
589
590   pos = peer_head;
591   while (pos != NULL)
592   {
593     if (pos->peer == peer)
594     {
595       GNUNET_CONTAINER_DLL_remove (peer_head, peer_tail, pos);
596       if (NULL != pos->th)
597       {
598         GSF_peer_transmit_cancel_ (pos->th);
599         pos->th = NULL;
600       }
601       if (NULL != pos->msg)
602       {
603         GNUNET_free (pos->msg);
604         pos->msg = NULL;
605       }
606       GNUNET_free (pos);
607       return;
608     }
609     pos = pos->next;
610   }
611 }
612
613
614 /**
615  * Setup the module.
616  */
617 void
618 GSF_push_init_ ()
619 {
620   enabled =
621       GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING");
622   if (GNUNET_YES != enabled)
623     return;
624
625   if (GNUNET_OK !=
626       GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY",
627                                            &min_migration_delay))
628   {
629     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
630                                "fs", "MIN_MIGRATION_DELAY",
631                                _("time required, content pushing disabled"));
632     return;
633   }
634   consider_gathering ();
635 }
636
637
638 /**
639  * Shutdown the module.
640  */
641 void
642 GSF_push_done_ ()
643 {
644   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
645   {
646     GNUNET_SCHEDULER_cancel (mig_task);
647     mig_task = GNUNET_SCHEDULER_NO_TASK;
648   }
649   if (NULL != mig_qe)
650   {
651     GNUNET_DATASTORE_cancel (mig_qe);
652     mig_qe = NULL;
653   }
654   while (NULL != mig_head)
655     delete_migration_block (mig_head);
656   GNUNET_assert (0 == mig_size);
657 }
658
659 /* end of gnunet-service-fs_push.c */