-eliminating #if DEBUG checks
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * Maximum number of blocks we keep in memory for migration.
36  */
37 #define MAX_MIGRATION_QUEUE 8
38
39 /**
40  * Blocks are at most migrated to this number of peers
41  * plus one, each time they are fetched from the database.
42  */
43 #define MIGRATION_LIST_SIZE 2
44
45 /**
46  * How long must content remain valid for us to consider it for migration?
47  * If content will expire too soon, there is clearly no point in pushing
48  * it to other peers.  This value gives the threshold for migration.  Note
49  * that if this value is increased, the migration testcase may need to be
50  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
51  */
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
53
54
55 /**
56  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
57  */
58 struct MigrationReadyBlock
59 {
60
61   /**
62    * This is a doubly-linked list.
63    */
64   struct MigrationReadyBlock *next;
65
66   /**
67    * This is a doubly-linked list.
68    */
69   struct MigrationReadyBlock *prev;
70
71   /**
72    * Query for the block.
73    */
74   GNUNET_HashCode query;
75
76   /**
77    * When does this block expire?
78    */
79   struct GNUNET_TIME_Absolute expiration;
80
81   /**
82    * Peers we already forwarded this
83    * block to.  Zero for empty entries.
84    */
85   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
86
87   /**
88    * Size of the block.
89    */
90   size_t size;
91
92   /**
93    *  Number of targets already used.
94    */
95   unsigned int used_targets;
96
97   /**
98    * Type of the block.
99    */
100   enum GNUNET_BLOCK_Type type;
101 };
102
103
104 /**
105  * Information about a peer waiting for
106  * migratable data.
107  */
108 struct MigrationReadyPeer
109 {
110   /**
111    * This is a doubly-linked list.
112    */
113   struct MigrationReadyPeer *next;
114
115   /**
116    * This is a doubly-linked list.
117    */
118   struct MigrationReadyPeer *prev;
119
120   /**
121    * Handle to peer.
122    */
123   struct GSF_ConnectedPeer *peer;
124
125   /**
126    * Handle for current transmission request,
127    * or NULL for none.
128    */
129   struct GSF_PeerTransmitHandle *th;
130
131   /**
132    * Message we are trying to push right now (or NULL)
133    */
134   struct PutMessage *msg;
135 };
136
137
138 /**
139  * Head of linked list of blocks that can be migrated.
140  */
141 static struct MigrationReadyBlock *mig_head;
142
143 /**
144  * Tail of linked list of blocks that can be migrated.
145  */
146 static struct MigrationReadyBlock *mig_tail;
147
148 /**
149  * Head of linked list of peers.
150  */
151 static struct MigrationReadyPeer *peer_head;
152
153 /**
154  * Tail of linked list of peers.
155  */
156 static struct MigrationReadyPeer *peer_tail;
157
158 /**
159  * Request to datastore for migration (or NULL).
160  */
161 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
162
163 /**
164  * ID of task that collects blocks for migration.
165  */
166 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
167
168 /**
169  * What is the maximum frequency at which we are allowed to
170  * poll the datastore for migration content?
171  */
172 static struct GNUNET_TIME_Relative min_migration_delay;
173
174 /**
175  * Size of the doubly-linked list of migration blocks.
176  */
177 static unsigned int mig_size;
178
179 /**
180  * Is this module enabled?
181  */
182 static int enabled;
183
184
185 /**
186  * Delete the given migration block.
187  *
188  * @param mb block to delete
189  */
190 static void
191 delete_migration_block (struct MigrationReadyBlock *mb)
192 {
193   GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb);
194   GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE);
195   mig_size--;
196   GNUNET_free (mb);
197 }
198
199
200 /**
201  * Find content for migration to this peer.
202  */
203 static void
204 find_content (struct MigrationReadyPeer *mrp);
205
206
207 /**
208  * Transmit the message currently scheduled for
209  * transmission.
210  *
211  * @param cls the 'struct MigrationReadyPeer'
212  * @param buf_size number of bytes available in buf
213  * @param buf where to copy the message, NULL on error (peer disconnect)
214  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
215  */
216 static size_t
217 transmit_message (void *cls, size_t buf_size, void *buf)
218 {
219   struct MigrationReadyPeer *peer = cls;
220   struct PutMessage *msg;
221   uint16_t msize;
222
223   peer->th = NULL;
224   msg = peer->msg;
225   peer->msg = NULL;
226   if (buf == NULL)
227   {
228     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
229                 "Failed to migrate content to another peer (disconnect)\n");
230     GNUNET_free (msg);
231     return 0;
232   }
233   msize = ntohs (msg->header.size);
234   GNUNET_assert (msize <= buf_size);
235   memcpy (buf, msg, msize);
236   GNUNET_free (msg);
237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pushing %u bytes to another peer\n",
238               msize);
239   find_content (peer);
240   return msize;
241 }
242
243
244 /**
245  * Send the given block to the given peer.
246  *
247  * @param peer target peer
248  * @param block the block
249  * @return GNUNET_YES if the block was deleted (!)
250  */
251 static int
252 transmit_content (struct MigrationReadyPeer *peer,
253                   struct MigrationReadyBlock *block)
254 {
255   size_t msize;
256   struct PutMessage *msg;
257   unsigned int i;
258   struct GSF_PeerPerformanceData *ppd;
259   int ret;
260
261   ppd = GSF_get_peer_performance_data_ (peer->peer);
262   GNUNET_assert (NULL == peer->th);
263   msize = sizeof (struct PutMessage) + block->size;
264   msg = GNUNET_malloc (msize);
265   msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
266   msg->header.size = htons (msize);
267   msg->type = htonl (block->type);
268   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
269   memcpy (&msg[1], &block[1], block->size);
270   peer->msg = msg;
271   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
272   {
273     if (block->target_list[i] == 0)
274     {
275       block->target_list[i] = ppd->pid;
276       GNUNET_PEER_change_rc (block->target_list[i], 1);
277       break;
278     }
279   }
280   if (MIGRATION_LIST_SIZE == i)
281   {
282     delete_migration_block (block);
283     ret = GNUNET_YES;
284   }
285   else
286   {
287     ret = GNUNET_NO;
288   }
289   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
290               "Asking for transmission of %u bytes for migration\n", msize);
291   peer->th = GSF_peer_transmit_ (peer->peer, GNUNET_NO, 0 /* priority */ ,
292                                  GNUNET_TIME_UNIT_FOREVER_REL, msize,
293                                  &transmit_message, peer);
294   return ret;
295 }
296
297
298 /**
299  * Count the number of peers this block has
300  * already been forwarded to.
301  *
302  * @param block the block
303  * @return number of times block was forwarded
304  */
305 static unsigned int
306 count_targets (struct MigrationReadyBlock *block)
307 {
308   unsigned int i;
309
310   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
311     if (block->target_list[i] == 0)
312       return i;
313   return i;
314 }
315
316
317 /**
318  * Check if sending this block to this peer would
319  * be a good idea.
320  *
321  * @param peer target peer
322  * @param block the block
323  * @return score (>= 0: feasible, negative: infeasible)
324  */
325 static long
326 score_content (struct MigrationReadyPeer *peer,
327                struct MigrationReadyBlock *block)
328 {
329   unsigned int i;
330   struct GSF_PeerPerformanceData *ppd;
331   struct GNUNET_PeerIdentity id;
332   uint32_t dist;
333
334   ppd = GSF_get_peer_performance_data_ (peer->peer);
335   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
336     if (block->target_list[i] == ppd->pid)
337       return -1;
338   GNUNET_assert (0 != ppd->pid);
339   GNUNET_PEER_resolve (ppd->pid, &id);
340   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &id.hashPubKey);
341   /* closer distance, higher score: */
342   return UINT32_MAX - dist;
343 }
344
345
346 /**
347  * If the migration task is not currently running, consider
348  * (re)scheduling it with the appropriate delay.
349  */
350 static void
351 consider_gathering (void);
352
353
354 /**
355  * Find content for migration to this peer.
356  *
357  * @param mrp peer to find content for
358  */
359 static void
360 find_content (struct MigrationReadyPeer *mrp)
361 {
362   struct MigrationReadyBlock *pos;
363   long score;
364   long best_score;
365   struct MigrationReadyBlock *best;
366
367   GNUNET_assert (NULL == mrp->th);
368   best = NULL;
369   best_score = -1;
370   pos = mig_head;
371   while (NULL != pos)
372   {
373     score = score_content (mrp, pos);
374     if (score > best_score)
375     {
376       best_score = score;
377       best = pos;
378     }
379     pos = pos->next;
380   }
381   if (NULL == best)
382   {
383     if (mig_size < MAX_MIGRATION_QUEUE)
384     {
385       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386                   "No content found for pushing, waiting for queue to fill\n");
387       return;                   /* will fill up eventually... */
388     }
389     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
390                 "No suitable content found, purging content from full queue\n");
391     /* failed to find migration target AND
392      * queue is full, purge most-forwarded
393      * block from queue to make room for more */
394     pos = mig_head;
395     while (NULL != pos)
396     {
397       score = count_targets (pos);
398       if (score >= best_score)
399       {
400         best_score = score;
401         best = pos;
402       }
403       pos = pos->next;
404     }
405     GNUNET_assert (NULL != best);
406     delete_migration_block (best);
407     consider_gathering ();
408     return;
409   }
410   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
411               "Preparing to push best content to peer\n");
412   transmit_content (mrp, best);
413 }
414
415
416 /**
417  * Task that is run periodically to obtain blocks for content
418  * migration
419  *
420  * @param cls unused
421  * @param tc scheduler context (also unused)
422  */
423 static void
424 gather_migration_blocks (void *cls,
425                          const struct GNUNET_SCHEDULER_TaskContext *tc);
426
427
428 /**
429  * If the migration task is not currently running, consider
430  * (re)scheduling it with the appropriate delay.
431  */
432 static void
433 consider_gathering ()
434 {
435   struct GNUNET_TIME_Relative delay;
436
437   if (GSF_dsh == NULL)
438     return;
439   if (mig_qe != NULL)
440     return;
441   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
442     return;
443   if (mig_size >= MAX_MIGRATION_QUEUE)
444     return;
445   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size);
446   delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE);
447   delay = GNUNET_TIME_relative_max (delay, min_migration_delay);
448   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
449               "Scheduling gathering task (queue size: %u)\n", mig_size);
450   mig_task =
451       GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL);
452 }
453
454
455 /**
456  * Process content offered for migration.
457  *
458  * @param cls closure
459  * @param key key for the content
460  * @param size number of bytes in data
461  * @param data content stored
462  * @param type type of the content
463  * @param priority priority of the content
464  * @param anonymity anonymity-level for the content
465  * @param expiration expiration time for the content
466  * @param uid unique identifier for the datum;
467  *        maybe 0 if no unique identifier is available
468  */
469 static void
470 process_migration_content (void *cls, const GNUNET_HashCode * key, size_t size,
471                            const void *data, enum GNUNET_BLOCK_Type type,
472                            uint32_t priority, uint32_t anonymity,
473                            struct GNUNET_TIME_Absolute expiration, uint64_t uid)
474 {
475   struct MigrationReadyBlock *mb;
476   struct MigrationReadyPeer *pos;
477
478   mig_qe = NULL;
479   if (key == NULL)
480   {
481     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for migration...\n");
482     consider_gathering ();
483     return;
484   }
485   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
486       MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
487   {
488     /* content will expire soon, don't bother */
489     consider_gathering ();
490     return;
491   }
492   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
493   {
494     if (GNUNET_OK !=
495         GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
496                                           anonymity, expiration, uid,
497                                           &process_migration_content, NULL))
498       consider_gathering ();
499     return;
500   }
501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
502               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
503               GNUNET_h2s (key), type, mig_size + 1, MAX_MIGRATION_QUEUE);
504   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
505   mb->query = *key;
506   mb->expiration = expiration;
507   mb->size = size;
508   mb->type = type;
509   memcpy (&mb[1], data, size);
510   GNUNET_CONTAINER_DLL_insert_after (mig_head, mig_tail, mig_tail, mb);
511   mig_size++;
512   pos = peer_head;
513   while (pos != NULL)
514   {
515     if (NULL == pos->th)
516     {
517       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518                   "Preparing to push best content to peer\n");
519       if (GNUNET_YES == transmit_content (pos, mb))
520         break;                  /* 'mb' was freed! */
521     }
522     pos = pos->next;
523   }
524   consider_gathering ();
525 }
526
527
528 /**
529  * Task that is run periodically to obtain blocks for content
530  * migration
531  *
532  * @param cls unused
533  * @param tc scheduler context (also unused)
534  */
535 static void
536 gather_migration_blocks (void *cls,
537                          const struct GNUNET_SCHEDULER_TaskContext *tc)
538 {
539   mig_task = GNUNET_SCHEDULER_NO_TASK;
540   if (mig_size >= MAX_MIGRATION_QUEUE)
541     return;
542   if (GSF_dsh != NULL)
543   {
544     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545                 "Asking datastore for content for replication (queue size: %u)\n",
546                 mig_size);
547     mig_qe =
548         GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX,
549                                               GNUNET_TIME_UNIT_FOREVER_REL,
550                                               &process_migration_content, NULL);
551     if (NULL == mig_qe)
552       consider_gathering ();
553   }
554 }
555
556
557 /**
558  * A peer connected to us.  Start pushing content
559  * to this peer.
560  *
561  * @param peer handle for the peer that connected
562  */
563 void
564 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
565 {
566   struct MigrationReadyPeer *mrp;
567
568   if (GNUNET_YES != enabled)
569     return;
570   mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
571   mrp->peer = peer;
572   find_content (mrp);
573   GNUNET_CONTAINER_DLL_insert (peer_head, peer_tail, mrp);
574 }
575
576
577 /**
578  * A peer disconnected from us.  Stop pushing content
579  * to this peer.
580  *
581  * @param peer handle for the peer that disconnected
582  */
583 void
584 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
585 {
586   struct MigrationReadyPeer *pos;
587
588   pos = peer_head;
589   while (pos != NULL)
590   {
591     if (pos->peer == peer)
592     {
593       GNUNET_CONTAINER_DLL_remove (peer_head, peer_tail, pos);
594       if (NULL != pos->th)
595       {
596         GSF_peer_transmit_cancel_ (pos->th);
597         pos->th = NULL;
598       }
599       if (NULL != pos->msg)
600       {
601         GNUNET_free (pos->msg);
602         pos->msg = NULL;
603       }
604       GNUNET_free (pos);
605       return;
606     }
607     pos = pos->next;
608   }
609 }
610
611
612 /**
613  * Setup the module.
614  */
615 void
616 GSF_push_init_ ()
617 {
618   enabled =
619       GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING");
620   if (GNUNET_YES != enabled)
621     return;
622
623   if (GNUNET_OK !=
624       GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY",
625                                            &min_migration_delay))
626   {
627     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
628                 _
629                 ("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
630                 "MIN_MIGRATION_DELAY", "fs");
631     return;
632   }
633   consider_gathering ();
634 }
635
636
637 /**
638  * Shutdown the module.
639  */
640 void
641 GSF_push_done_ ()
642 {
643   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
644   {
645     GNUNET_SCHEDULER_cancel (mig_task);
646     mig_task = GNUNET_SCHEDULER_NO_TASK;
647   }
648   if (NULL != mig_qe)
649   {
650     GNUNET_DATASTORE_cancel (mig_qe);
651     mig_qe = NULL;
652   }
653   while (NULL != mig_head)
654     delete_migration_block (mig_head);
655   GNUNET_assert (0 == mig_size);
656 }
657
658 /* end of gnunet-service-fs_push.c */