flush peer respect value on disconnect
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 /**
35  * Maximum number of blocks we keep in memory for migration.
36  */
37 #define MAX_MIGRATION_QUEUE 8
38
39 /**
40  * Blocks are at most migrated to this number of peers
41  * plus one, each time they are fetched from the database.
42  */
43 #define MIGRATION_LIST_SIZE 2
44
45 /**
46  * How long must content remain valid for us to consider it for migration?
47  * If content will expire too soon, there is clearly no point in pushing
48  * it to other peers.  This value gives the threshold for migration.  Note
49  * that if this value is increased, the migration testcase may need to be
50  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
51  */
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
53
54
55 /**
56  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
57  */
58 struct MigrationReadyBlock
59 {
60
61   /**
62    * This is a doubly-linked list.
63    */
64   struct MigrationReadyBlock *next;
65
66   /**
67    * This is a doubly-linked list.
68    */
69   struct MigrationReadyBlock *prev;
70
71   /**
72    * Query for the block.
73    */
74   struct GNUNET_HashCode query;
75
76   /**
77    * When does this block expire?
78    */
79   struct GNUNET_TIME_Absolute expiration;
80
81   /**
82    * Peers we already forwarded this
83    * block to.  Zero for empty entries.
84    */
85   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
86
87   /**
88    * Size of the block.
89    */
90   size_t size;
91
92   /**
93    *  Number of targets already used.
94    */
95   unsigned int used_targets;
96
97   /**
98    * Type of the block.
99    */
100   enum GNUNET_BLOCK_Type type;
101 };
102
103
104 /**
105  * Information about a peer waiting for
106  * migratable data.
107  */
108 struct MigrationReadyPeer
109 {
110   /**
111    * This is a doubly-linked list.
112    */
113   struct MigrationReadyPeer *next;
114
115   /**
116    * This is a doubly-linked list.
117    */
118   struct MigrationReadyPeer *prev;
119
120   /**
121    * Handle to peer.
122    */
123   struct GSF_ConnectedPeer *peer;
124
125   /**
126    * Handle for current transmission request,
127    * or NULL for none.
128    */
129   struct GSF_PeerTransmitHandle *th;
130
131   /**
132    * Message we are trying to push right now (or NULL)
133    */
134   struct PutMessage *msg;
135 };
136
137
138 /**
139  * Head of linked list of blocks that can be migrated.
140  */
141 static struct MigrationReadyBlock *mig_head;
142
143 /**
144  * Tail of linked list of blocks that can be migrated.
145  */
146 static struct MigrationReadyBlock *mig_tail;
147
148 /**
149  * Head of linked list of peers.
150  */
151 static struct MigrationReadyPeer *peer_head;
152
153 /**
154  * Tail of linked list of peers.
155  */
156 static struct MigrationReadyPeer *peer_tail;
157
158 /**
159  * Request to datastore for migration (or NULL).
160  */
161 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
162
163 /**
164  * ID of task that collects blocks for migration.
165  */
166 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
167
168 /**
169  * What is the maximum frequency at which we are allowed to
170  * poll the datastore for migration content?
171  */
172 static struct GNUNET_TIME_Relative min_migration_delay;
173
174 /**
175  * Size of the doubly-linked list of migration blocks.
176  */
177 static unsigned int mig_size;
178
179 /**
180  * Is this module enabled?
181  */
182 static int enabled;
183
184 /**
185  * Did we find anything in the datastore?
186  */
187 static int value_found;
188
189
190 /**
191  * Delete the given migration block.
192  *
193  * @param mb block to delete
194  */
195 static void
196 delete_migration_block (struct MigrationReadyBlock *mb)
197 {
198   GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb);
199   GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE);
200   mig_size--;
201   GNUNET_free (mb);
202 }
203
204
205 /**
206  * Find content for migration to this peer.
207  */
208 static void
209 find_content (struct MigrationReadyPeer *mrp);
210
211
212 /**
213  * Transmit the message currently scheduled for transmission.
214  *
215  * @param cls the `struct MigrationReadyPeer`
216  * @param buf_size number of bytes available in @a buf
217  * @param buf where to copy the message, NULL on error (peer disconnect)
218  * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
219  */
220 static size_t
221 transmit_message (void *cls,
222                   size_t buf_size,
223                   void *buf)
224 {
225   struct MigrationReadyPeer *peer = cls;
226   struct PutMessage *msg;
227   uint16_t msize;
228
229   peer->th = NULL;
230   msg = peer->msg;
231   peer->msg = NULL;
232   if (NULL == buf)
233   {
234     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
235                 "Failed to migrate content to another peer (disconnect)\n");
236     GNUNET_free (msg);
237     return 0;
238   }
239   msize = ntohs (msg->header.size);
240   GNUNET_assert (msize <= buf_size);
241   memcpy (buf, msg, msize);
242   GNUNET_free (msg);
243   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244               "Pushing %u bytes to %s\n",
245               msize,
246               GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
247   find_content (peer);
248   return msize;
249 }
250
251
252 /**
253  * Send the given block to the given peer.
254  *
255  * @param peer target peer
256  * @param block the block
257  * @return #GNUNET_YES if the block was deleted (!)
258  */
259 static int
260 transmit_content (struct MigrationReadyPeer *peer,
261                   struct MigrationReadyBlock *block)
262 {
263   size_t msize;
264   struct PutMessage *msg;
265   unsigned int i;
266   struct GSF_PeerPerformanceData *ppd;
267   int ret;
268
269   ppd = GSF_get_peer_performance_data_ (peer->peer);
270   GNUNET_assert (NULL == peer->th);
271   msize = sizeof (struct PutMessage) + block->size;
272   msg = GNUNET_malloc (msize);
273   msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
274   msg->header.size = htons (msize);
275   msg->type = htonl (block->type);
276   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
277   memcpy (&msg[1], &block[1], block->size);
278   peer->msg = msg;
279   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
280   {
281     if (block->target_list[i] == 0)
282     {
283       block->target_list[i] = ppd->pid;
284       GNUNET_PEER_change_rc (block->target_list[i], 1);
285       break;
286     }
287   }
288   if (MIGRATION_LIST_SIZE == i)
289   {
290     delete_migration_block (block);
291     ret = GNUNET_YES;
292   }
293   else
294   {
295     ret = GNUNET_NO;
296   }
297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
298               "Asking for transmission of %u bytes to %s for migration\n",
299               msize,
300               GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
301   peer->th = GSF_peer_transmit_ (peer->peer,
302                                  GNUNET_NO, 0 /* priority */ ,
303                                  GNUNET_TIME_UNIT_FOREVER_REL,
304                                  msize,
305                                  &transmit_message, peer);
306   return ret;
307 }
308
309
310 /**
311  * Count the number of peers this block has
312  * already been forwarded to.
313  *
314  * @param block the block
315  * @return number of times block was forwarded
316  */
317 static unsigned int
318 count_targets (struct MigrationReadyBlock *block)
319 {
320   unsigned int i;
321
322   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
323     if (block->target_list[i] == 0)
324       return i;
325   return i;
326 }
327
328
329 /**
330  * Check if sending this block to this peer would
331  * be a good idea.
332  *
333  * @param peer target peer
334  * @param block the block
335  * @return score (>= 0: feasible, negative: infeasible)
336  */
337 static long
338 score_content (struct MigrationReadyPeer *peer,
339                struct MigrationReadyBlock *block)
340 {
341   unsigned int i;
342   struct GSF_PeerPerformanceData *ppd;
343   struct GNUNET_PeerIdentity id;
344   struct GNUNET_HashCode hc;
345   uint32_t dist;
346
347   ppd = GSF_get_peer_performance_data_ (peer->peer);
348   for (i = 0; i < MIGRATION_LIST_SIZE; i++)
349     if (block->target_list[i] == ppd->pid)
350       return -1;
351   GNUNET_assert (0 != ppd->pid);
352   GNUNET_PEER_resolve (ppd->pid, &id);
353   GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc);
354   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc);
355   /* closer distance, higher score: */
356   return UINT32_MAX - dist;
357 }
358
359
360 /**
361  * If the migration task is not currently running, consider
362  * (re)scheduling it with the appropriate delay.
363  */
364 static void
365 consider_gathering (void);
366
367
368 /**
369  * Find content for migration to this peer.
370  *
371  * @param mrp peer to find content for
372  */
373 static void
374 find_content (struct MigrationReadyPeer *mrp)
375 {
376   struct MigrationReadyBlock *pos;
377   long score;
378   long best_score;
379   struct MigrationReadyBlock *best;
380
381   GNUNET_assert (NULL == mrp->th);
382   best = NULL;
383   best_score = -1;
384   pos = mig_head;
385   while (NULL != pos)
386   {
387     score = score_content (mrp, pos);
388     if (score > best_score)
389     {
390       best_score = score;
391       best = pos;
392     }
393     pos = pos->next;
394   }
395   if (NULL == best)
396   {
397     if (mig_size < MAX_MIGRATION_QUEUE)
398     {
399       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
400                   "No content found for pushing, waiting for queue to fill\n");
401       return;                   /* will fill up eventually... */
402     }
403     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404                 "No suitable content found, purging content from full queue\n");
405     /* failed to find migration target AND
406      * queue is full, purge most-forwarded
407      * block from queue to make room for more */
408     pos = mig_head;
409     while (NULL != pos)
410     {
411       score = count_targets (pos);
412       if (score >= best_score)
413       {
414         best_score = score;
415         best = pos;
416       }
417       pos = pos->next;
418     }
419     GNUNET_assert (NULL != best);
420     delete_migration_block (best);
421     consider_gathering ();
422     return;
423   }
424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425               "Preparing to push best content to peer\n");
426   transmit_content (mrp, best);
427 }
428
429
430 /**
431  * Task that is run periodically to obtain blocks for content
432  * migration
433  *
434  * @param cls unused
435  * @param tc scheduler context (also unused)
436  */
437 static void
438 gather_migration_blocks (void *cls,
439                          const struct GNUNET_SCHEDULER_TaskContext *tc);
440
441
442 /**
443  * If the migration task is not currently running, consider
444  * (re)scheduling it with the appropriate delay.
445  */
446 static void
447 consider_gathering ()
448 {
449   struct GNUNET_TIME_Relative delay;
450
451   if (NULL == GSF_dsh)
452     return;
453   if (NULL != mig_qe)
454     return;
455   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
456     return;
457   if (mig_size >= MAX_MIGRATION_QUEUE)
458     return;
459   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size);
460   delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE);
461   delay = GNUNET_TIME_relative_max (delay, min_migration_delay);
462   if (GNUNET_NO == value_found)
463   {
464     /* wait at least 5s if the datastore is empty */
465     delay = GNUNET_TIME_relative_max (delay,
466                                       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
467                                                                      5));
468   }
469   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
470               "Scheduling gathering task (queue size: %u)\n",
471               mig_size);
472   mig_task =
473       GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL);
474 }
475
476
477 /**
478  * Process content offered for migration.
479  *
480  * @param cls closure
481  * @param key key for the content
482  * @param size number of bytes in data
483  * @param data content stored
484  * @param type type of the content
485  * @param priority priority of the content
486  * @param anonymity anonymity-level for the content
487  * @param expiration expiration time for the content
488  * @param uid unique identifier for the datum;
489  *        maybe 0 if no unique identifier is available
490  */
491 static void
492 process_migration_content (void *cls,
493                            const struct GNUNET_HashCode *key,
494                            size_t size,
495                            const void *data,
496                            enum GNUNET_BLOCK_Type type,
497                            uint32_t priority,
498                            uint32_t anonymity,
499                            struct GNUNET_TIME_Absolute expiration,
500                            uint64_t uid)
501 {
502   struct MigrationReadyBlock *mb;
503   struct MigrationReadyPeer *pos;
504
505   mig_qe = NULL;
506   if (NULL == key)
507   {
508     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
509                 "No content found for migration...\n");
510     consider_gathering ();
511     return;
512   }
513   value_found = GNUNET_YES;
514   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
515       MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
516   {
517     /* content will expire soon, don't bother */
518     consider_gathering ();
519     return;
520   }
521   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
522   {
523     if (GNUNET_OK !=
524         GNUNET_FS_handle_on_demand_block (key,
525                                           size,
526                                           data,
527                                           type,
528                                           priority,
529                                           anonymity,
530                                           expiration,
531                                           uid,
532                                           &process_migration_content, NULL))
533       consider_gathering ();
534     return;
535   }
536   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
537               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
538               GNUNET_h2s (key),
539               type, mig_size + 1,
540               MAX_MIGRATION_QUEUE);
541   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
542   mb->query = *key;
543   mb->expiration = expiration;
544   mb->size = size;
545   mb->type = type;
546   memcpy (&mb[1], data, size);
547   GNUNET_CONTAINER_DLL_insert_after (mig_head,
548                                      mig_tail,
549                                      mig_tail,
550                                      mb);
551   mig_size++;
552   for (pos = peer_head; NULL != pos; pos = pos->next)
553   {
554     if (NULL == pos->th)
555     {
556       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
557                   "Preparing to push best content to peer %s\n",
558                   GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
559       if (GNUNET_YES == transmit_content (pos, mb))
560         break;                  /* 'mb' was freed! */
561     }
562   }
563   consider_gathering ();
564 }
565
566
567 /**
568  * Task that is run periodically to obtain blocks for content
569  * migration
570  *
571  * @param cls unused
572  * @param tc scheduler context (also unused)
573  */
574 static void
575 gather_migration_blocks (void *cls,
576                          const struct GNUNET_SCHEDULER_TaskContext *tc)
577 {
578   mig_task = GNUNET_SCHEDULER_NO_TASK;
579   if (mig_size >= MAX_MIGRATION_QUEUE)
580     return;
581   if (NULL == GSF_dsh)
582     return;
583   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
584               "Asking datastore for content for replication (queue size: %u)\n",
585               mig_size);
586   value_found = GNUNET_NO;
587   mig_qe =
588     GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX,
589                                           GNUNET_TIME_UNIT_FOREVER_REL,
590                                           &process_migration_content, NULL);
591   if (NULL == mig_qe)
592     consider_gathering ();
593 }
594
595
596 /**
597  * A peer connected to us.  Start pushing content
598  * to this peer.
599  *
600  * @param peer handle for the peer that connected
601  */
602 void
603 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
604 {
605   struct MigrationReadyPeer *mrp;
606
607   if (GNUNET_YES != enabled)
608     return;
609   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
610               "Adding peer %s to list for pushing\n",
611               GNUNET_i2s (GSF_connected_peer_get_identity2_(peer)));
612
613   mrp = GNUNET_new (struct MigrationReadyPeer);
614   mrp->peer = peer;
615   find_content (mrp);
616   GNUNET_CONTAINER_DLL_insert (peer_head,
617                                peer_tail,
618                                mrp);
619 }
620
621
622 /**
623  * A peer disconnected from us.  Stop pushing content
624  * to this peer.
625  *
626  * @param peer handle for the peer that disconnected
627  */
628 void
629 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
630 {
631   struct MigrationReadyPeer *pos;
632
633   for (pos = peer_head; NULL != pos; pos = pos->next)
634     if (pos->peer == peer)
635       break;
636   if (NULL == pos)
637     return;
638   GNUNET_CONTAINER_DLL_remove (peer_head,
639                                peer_tail,
640                                pos);
641   if (NULL != pos->th)
642   {
643     GSF_peer_transmit_cancel_ (pos->th);
644     pos->th = NULL;
645   }
646   if (NULL != pos->msg)
647   {
648     GNUNET_free (pos->msg);
649     pos->msg = NULL;
650   }
651   GNUNET_free (pos);
652 }
653
654
655 /**
656  * Setup the module.
657  */
658 void
659 GSF_push_init_ ()
660 {
661   enabled =
662       GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING");
663   if (GNUNET_YES != enabled)
664     return;
665
666   if (GNUNET_OK !=
667       GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY",
668                                            &min_migration_delay))
669   {
670     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
671                                "fs", "MIN_MIGRATION_DELAY",
672                                _("time required, content pushing disabled"));
673     return;
674   }
675   consider_gathering ();
676 }
677
678
679 /**
680  * Shutdown the module.
681  */
682 void
683 GSF_push_done_ ()
684 {
685   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
686   {
687     GNUNET_SCHEDULER_cancel (mig_task);
688     mig_task = GNUNET_SCHEDULER_NO_TASK;
689   }
690   if (NULL != mig_qe)
691   {
692     GNUNET_DATASTORE_cancel (mig_qe);
693     mig_qe = NULL;
694   }
695   while (NULL != mig_head)
696     delete_migration_block (mig_head);
697   GNUNET_assert (0 == mig_size);
698 }
699
700 /* end of gnunet-service-fs_push.c */