fixing typo, possibly over-eager disconnect
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_push.c
23  * @brief API to push content from our datastore to other peers
24  *            ('anonymous'-content P2P migration)
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_push.h"
32
33
34 #define DEBUG_FS_MIGRATION GNUNET_NO
35
36 /**
37  * How long must content remain valid for us to consider it for migration?  
38  * If content will expire too soon, there is clearly no point in pushing
39  * it to other peers.  This value gives the threshold for migration.  Note
40  * that if this value is increased, the migration testcase may need to be
41  * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
42  */
43 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
44
45
46 /**
47  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
48  */
49 struct MigrationReadyBlock
50 {
51
52   /**
53    * This is a doubly-linked list.
54    */
55   struct MigrationReadyBlock *next;
56
57   /**
58    * This is a doubly-linked list.
59    */
60   struct MigrationReadyBlock *prev;
61
62   /**
63    * Query for the block.
64    */
65   GNUNET_HashCode query;
66
67   /**
68    * When does this block expire? 
69    */
70   struct GNUNET_TIME_Absolute expiration;
71
72   /**
73    * Peers we already forwarded this
74    * block to.  Zero for empty entries.
75    */
76   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
77
78   /**
79    * Size of the block.
80    */
81   size_t size;
82
83   /**
84    *  Number of targets already used.
85    */
86   unsigned int used_targets;
87
88   /**
89    * Type of the block.
90    */
91   enum GNUNET_BLOCK_Type type;
92 };
93
94
95 /**
96  * Information about a peer waiting for
97  * migratable data.
98  */
99 struct MigrationReadyPeer
100 {
101   /**
102    * This is a doubly-linked list.
103    */
104   struct MigrationReadyPeer *next;
105
106   /**
107    * This is a doubly-linked list.
108    */
109   struct MigrationReadyPeer *prev;
110
111   /**
112    * Handle to peer.
113    */
114   struct GSF_ConnectedPeer *peer;
115   
116   /**
117    * Handle for current transmission request,
118    * or NULL for none.
119    */
120   struct GSF_PeerTransmitHandle *th;
121
122   /**
123    * Message we are trying to push right now (or NULL)
124    */
125   struct PutMessage *msg;
126 };
127
128
129 /**
130  * Head of linked list of blocks that can be migrated.
131  */
132 static struct MigrationReadyBlock *mig_head;
133
134 /**
135  * Tail of linked list of blocks that can be migrated.
136  */
137 static struct MigrationReadyBlock *mig_tail;
138
139 /**
140  * Head of linked list of peers.
141  */
142 static struct MigrationReadyPeer *peer_head;
143
144 /**
145  * Tail of linked list of peers.
146  */
147 static struct MigrationReadyPeer *peer_tail;
148
149 /**
150  * Request to datastore for migration (or NULL).
151  */
152 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
153
154 /**
155  * ID of task that collects blocks for migration.
156  */
157 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
158
159 /**
160  * What is the maximum frequency at which we are allowed to
161  * poll the datastore for migration content?
162  */
163 static struct GNUNET_TIME_Relative min_migration_delay;
164
165 /**
166  * Size of the doubly-linked list of migration blocks.
167  */
168 static unsigned int mig_size;
169
170 /**
171  * Is this module enabled?
172  */
173 static int enabled;
174
175
176 /**
177  * Delete the given migration block.
178  *
179  * @param mb block to delete
180  */
181 static void
182 delete_migration_block (struct MigrationReadyBlock *mb)
183 {
184   GNUNET_CONTAINER_DLL_remove (mig_head,
185                                mig_tail,
186                                mb);
187   GNUNET_PEER_decrement_rcs (mb->target_list,
188                              MIGRATION_LIST_SIZE);
189   mig_size--;
190   GNUNET_free (mb);
191 }
192
193
194 /**
195  * Find content for migration to this peer.
196  */ 
197 static void
198 find_content (struct MigrationReadyPeer *mrp);
199
200
201 /**
202  * Transmit the message currently scheduled for
203  * transmission.
204  *
205  * @param cls the 'struct MigrationReadyPeer'
206  * @param buf_size number of bytes available in buf
207  * @param buf where to copy the message, NULL on error (peer disconnect)
208  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
209  */
210 static size_t
211 transmit_message (void *cls,
212                   size_t buf_size,
213                   void *buf)
214 {
215   struct MigrationReadyPeer *peer = cls;
216   struct PutMessage *msg;
217   uint16_t msize;
218
219   peer->th = NULL;
220   msg = peer->msg;
221   peer->msg = NULL;
222   if (buf == NULL)
223     {
224 #if DEBUG_FS_MIGRATION
225       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
226                   "Failed to migrate content to another peer (disconnect)\n");
227 #endif
228       GNUNET_free (msg);
229       return 0;
230     }
231   msize = ntohs (msg->header.size);
232   GNUNET_assert (msize <= buf_size);
233   memcpy (buf, msg, msize);
234   GNUNET_free (msg);
235 #if DEBUG_FS_MIGRATION
236   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
237               "Pushing %u bytes to another peer\n",
238               msize);
239 #endif
240   find_content (peer);
241   return msize;
242 }
243
244
245 /**
246  * Send the given block to the given peer.
247  *
248  * @param peer target peer
249  * @param block the block
250  * @return GNUNET_YES if the block was deleted (!)
251  */
252 static int
253 transmit_content (struct MigrationReadyPeer *peer,
254                   struct MigrationReadyBlock *block)
255 {
256   size_t msize;
257   struct PutMessage *msg;
258   unsigned  int i;
259   struct GSF_PeerPerformanceData *ppd;
260   int ret;
261
262   ppd = GSF_get_peer_performance_data_ (peer->peer);
263   GNUNET_assert (NULL == peer->th);
264   msize = sizeof (struct PutMessage) + block->size;
265   msg = GNUNET_malloc (msize);
266   msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
267   msg->header.size = htons (msize);
268   msg->type = htonl (block->type);
269   msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
270   memcpy (&msg[1],
271           &block[1],
272           block->size);
273   peer->msg = msg;
274   for (i=0;i<MIGRATION_LIST_SIZE;i++)
275     {
276       if (block->target_list[i] == 0)
277         {
278           block->target_list[i] = ppd->pid;
279           GNUNET_PEER_change_rc (block->target_list[i], 1);
280           break;
281         }
282     }
283   if (MIGRATION_LIST_SIZE == i)
284     {
285       delete_migration_block (block);
286       ret = GNUNET_YES;
287     }
288   else
289     {
290       ret = GNUNET_NO;
291     }
292 #if DEBUG_FS_MIGRATION
293   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
294               "Asking for transmission of %u bytes for migration\n",
295               msize);
296 #endif
297   peer->th = GSF_peer_transmit_ (peer->peer,
298                                  GNUNET_NO,
299                                  0 /* priority */,
300                                  GNUNET_TIME_UNIT_FOREVER_REL,
301                                  msize,
302                                  &transmit_message,
303                                  peer);
304   return ret;
305 }
306
307
308 /**
309  * Count the number of peers this block has
310  * already been forwarded to.
311  *
312  * @param block the block
313  * @return number of times block was forwarded
314  */
315 static unsigned int
316 count_targets (struct MigrationReadyBlock *block)
317 {
318   unsigned int i;
319
320   for (i=0;i<MIGRATION_LIST_SIZE;i++)
321     if (block->target_list[i] == 0)
322       return i;
323   return i;
324 }
325
326
327 /**
328  * Check if sending this block to this peer would
329  * be a good idea. 
330  *
331  * @param peer target peer
332  * @param block the block
333  * @return score (>= 0: feasible, negative: infeasible)
334  */
335 static long
336 score_content (struct MigrationReadyPeer *peer,
337                struct MigrationReadyBlock *block)
338 {
339   unsigned int i;
340   struct GSF_PeerPerformanceData *ppd;
341   struct GNUNET_PeerIdentity id;
342   uint32_t dist;
343
344   ppd = GSF_get_peer_performance_data_ (peer->peer);
345   for (i=0;i<MIGRATION_LIST_SIZE;i++)
346     if (block->target_list[i] == ppd->pid)
347       return -1;
348   GNUNET_assert (0 != ppd->pid);
349   GNUNET_PEER_resolve (ppd->pid,
350                        &id);
351   dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
352                                           &id.hashPubKey);
353   /* closer distance, higher score: */
354   return UINT32_MAX - dist;
355 }
356
357
358 /**
359  * If the migration task is not currently running, consider
360  * (re)scheduling it with the appropriate delay.
361  */
362 static void
363 consider_gathering (void);
364
365
366 /**
367  * Find content for migration to this peer.
368  *
369  * @param mrp peer to find content for
370  */ 
371 static void
372 find_content (struct MigrationReadyPeer *mrp)
373 {
374   struct MigrationReadyBlock *pos;
375   long score;
376   long best_score;
377   struct MigrationReadyBlock *best;
378
379   GNUNET_assert (NULL == mrp->th);
380   best = NULL;
381   best_score = -1;
382   pos = mig_head;
383   while (NULL != pos)
384     {
385       score = score_content (mrp, pos);
386       if (score > best_score)
387         {
388           best_score = score;
389           best = pos;
390         }
391       pos = pos->next;
392     }
393   if (NULL == best) 
394     {
395       if (mig_size < MAX_MIGRATION_QUEUE)
396         {
397 #if DEBUG_FS_MIGRATION
398           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399                       "No content found for pushing, waiting for queue to fill\n");
400 #endif
401           return; /* will fill up eventually... */
402         }
403 #if DEBUG_FS_MIGRATION
404       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
405                   "No suitable content found, purging content from full queue\n");
406 #endif
407       /* failed to find migration target AND
408          queue is full, purge most-forwarded
409          block from queue to make room for more */
410       pos = mig_head;
411       while (NULL != pos)
412         {
413           score = count_targets (pos);
414           if (score >= best_score)
415             {
416               best_score = score;
417               best = pos;
418             }
419           pos = pos->next;
420         }
421       GNUNET_assert (NULL != best);
422       delete_migration_block (best);
423       consider_gathering ();
424       return;
425     }
426 #if DEBUG_FS_MIGRATION
427   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
428               "Preparing to push best content to peer\n");
429 #endif
430   transmit_content (mrp, best);
431 }
432
433
434 /**
435  * Task that is run periodically to obtain blocks for content
436  * migration
437  * 
438  * @param cls unused
439  * @param tc scheduler context (also unused)
440  */
441 static void
442 gather_migration_blocks (void *cls,
443                          const struct GNUNET_SCHEDULER_TaskContext *tc);
444
445
446 /**
447  * If the migration task is not currently running, consider
448  * (re)scheduling it with the appropriate delay.
449  */
450 static void
451 consider_gathering ()
452 {
453   struct GNUNET_TIME_Relative delay;
454
455   if (GSF_dsh == NULL)
456     return;
457   if (mig_qe != NULL)
458     return;
459   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
460     return;
461   if (mig_size >= MAX_MIGRATION_QUEUE)  
462     return;
463   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
464                                          mig_size);
465   delay = GNUNET_TIME_relative_divide (delay,
466                                        MAX_MIGRATION_QUEUE);
467   delay = GNUNET_TIME_relative_max (delay,
468                                     min_migration_delay);
469 #if DEBUG_FS_MIGRATION
470   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471               "Scheduling gathering task (queue size: %u)\n",
472               mig_size);
473 #endif
474   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
475                                            &gather_migration_blocks,
476                                            NULL);
477 }
478
479
480 /**
481  * Process content offered for migration.
482  *
483  * @param cls closure
484  * @param key key for the content
485  * @param size number of bytes in data
486  * @param data content stored
487  * @param type type of the content
488  * @param priority priority of the content
489  * @param anonymity anonymity-level for the content
490  * @param expiration expiration time for the content
491  * @param uid unique identifier for the datum;
492  *        maybe 0 if no unique identifier is available
493  */
494 static void
495 process_migration_content (void *cls,
496                            const GNUNET_HashCode *key,
497                            size_t size,
498                            const void *data,
499                            enum GNUNET_BLOCK_Type type,
500                            uint32_t priority,
501                            uint32_t anonymity,
502                            struct GNUNET_TIME_Absolute
503                            expiration, uint64_t uid)
504 {
505   struct MigrationReadyBlock *mb;
506   struct MigrationReadyPeer *pos;
507   
508   mig_qe = NULL;
509   if (key == NULL)
510     {
511 #if DEBUG_FS_MIGRATION
512       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513                   "No content found for migration...\n");
514 #endif
515       consider_gathering ();
516       return;
517     }
518   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < 
519       MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
520     {
521       /* content will expire soon, don't bother */      
522       consider_gathering ();
523       return;
524     }
525   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
526     {
527       if (GNUNET_OK !=
528           GNUNET_FS_handle_on_demand_block (key, size, data,
529                                             type, priority, anonymity,
530                                             expiration, uid, 
531                                             &process_migration_content,
532                                             NULL))      
533         consider_gathering ();  
534       return;
535     }
536 #if DEBUG_FS_MIGRATION
537   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
538               "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
539               GNUNET_h2s (key),
540               type,
541               mig_size + 1,
542               MAX_MIGRATION_QUEUE);
543 #endif
544   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
545   mb->query = *key;
546   mb->expiration = expiration;
547   mb->size = size;
548   mb->type = type;
549   memcpy (&mb[1], data, size);
550   GNUNET_CONTAINER_DLL_insert_after (mig_head,
551                                      mig_tail,
552                                      mig_tail,
553                                      mb);
554   mig_size++;
555   pos = peer_head;
556   while (pos != NULL)
557     {
558       if (NULL == pos->th)
559         {
560 #if DEBUG_FS_MIGRATION
561           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
562                       "Preparing to push best content to peer\n");
563 #endif
564           if (GNUNET_YES == transmit_content (pos, mb))
565             break; /* 'mb' was freed! */
566         }
567       pos = pos->next;
568     }
569   consider_gathering ();
570 }
571
572
573 /**
574  * Task that is run periodically to obtain blocks for content
575  * migration
576  * 
577  * @param cls unused
578  * @param tc scheduler context (also unused)
579  */
580 static void
581 gather_migration_blocks (void *cls,
582                          const struct GNUNET_SCHEDULER_TaskContext *tc)
583 {
584   mig_task = GNUNET_SCHEDULER_NO_TASK;
585   if (mig_size >= MAX_MIGRATION_QUEUE)  
586     return;
587   if (GSF_dsh != NULL)
588     {
589 #if DEBUG_FS_MIGRATION
590       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
591                   "Asking datastore for content for replication (queue size: %u)\n",
592                   mig_size);
593 #endif
594       mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh, 
595                                                      0, UINT_MAX,
596                                                      GNUNET_TIME_UNIT_FOREVER_REL,
597                                                      &process_migration_content, NULL);
598       if (NULL == mig_qe)
599         consider_gathering ();
600     }
601 }
602
603
604 /**
605  * A peer connected to us.  Start pushing content
606  * to this peer.
607  *
608  * @param peer handle for the peer that connected
609  */
610 void
611 GSF_push_start_ (struct GSF_ConnectedPeer *peer)
612 {
613   struct MigrationReadyPeer *mrp;
614
615   if (GNUNET_YES != enabled)
616     return;
617   mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
618   mrp->peer = peer;
619   find_content (mrp);
620   GNUNET_CONTAINER_DLL_insert  (peer_head,
621                                 peer_tail,
622                                 mrp);
623 }
624
625
626 /**
627  * A peer disconnected from us.  Stop pushing content
628  * to this peer.
629  *
630  * @param peer handle for the peer that disconnected
631  */
632 void
633 GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
634 {
635   struct MigrationReadyPeer *pos;
636
637   pos = peer_head;
638   while (pos != NULL)
639     {
640       if (pos->peer == peer)
641         {
642           GNUNET_CONTAINER_DLL_remove (peer_head,
643                                        peer_tail,
644                                        pos);
645           if (NULL != pos->th)
646             {
647               GSF_peer_transmit_cancel_ (pos->th);
648               pos->th = NULL;
649             }
650           if (NULL != pos->msg)
651             {
652               GNUNET_free (pos->msg);
653               pos->msg = NULL;
654             }
655           GNUNET_free (pos);
656           return;
657         }
658       pos = pos->next;
659     }
660 }
661
662
663 /**
664  * Setup the module.
665  */
666 void
667 GSF_push_init_ ()
668 {
669   enabled = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
670                                                   "FS",
671                                                   "CONTENT_PUSHING");
672   if (GNUNET_YES != enabled)
673     return;
674  
675   if (GNUNET_OK != 
676       GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
677                                            "fs",
678                                            "MIN_MIGRATION_DELAY",
679                                            &min_migration_delay))
680     {
681       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
682                   _("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
683                   "MIN_MIGRATION_DELAY",
684                   "fs");
685       return;
686     }
687   consider_gathering ();
688 }
689
690
691 /**
692  * Shutdown the module.
693  */
694 void
695 GSF_push_done_ ()
696 {
697   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
698     {
699       GNUNET_SCHEDULER_cancel (mig_task);
700       mig_task = GNUNET_SCHEDULER_NO_TASK;
701     }
702   if (NULL != mig_qe)
703     {
704       GNUNET_DATASTORE_cancel (mig_qe);
705       mig_qe = NULL;
706     }
707   while (NULL != mig_head)
708     delete_migration_block (mig_head);
709   GNUNET_assert (0 == mig_size);
710 }
711
712 /* end of gnunet-service-fs_push.c */