stuff
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pr.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pr.c
23  * @brief API to handle pending requests
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_pr.h"
28
29
30 /**
31  * An active request.
32  */
33 struct GSF_PendingRequest
34 {
35   /**
36    * Public data for the request.
37    */ 
38   struct GSF_PendingRequestData public_data;
39
40   /**
41    * Function to call if we encounter a reply.
42    */
43   GSF_PendingRequestReplyHandler rh;
44
45   /**
46    * Closure for 'rh'
47    */
48   void *rh_cls;
49
50   /**
51    * Array of hash codes of replies we've already seen.
52    */
53   GNUNET_HashCode *replies_seen;
54
55   /**
56    * Bloomfilter masking replies we've already seen.
57    */
58   struct GNUNET_CONTAINER_BloomFilter *bf;
59
60   /**
61    * Number of valid entries in the 'replies_seen' array.
62    */
63   unsigned int replies_seen_count;
64
65   /**
66    * Length of the 'replies_seen' array.
67    */
68   unsigned int replies_seen_size;
69
70   /**
71    * Mingle value we currently use for the bf.
72    */
73   int32_t mingle;
74                             
75 };
76
77
78 /**
79  * All pending requests, ordered by the query.  Entries
80  * are of type 'struct GSF_PendingRequest*'.
81  */
82 static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
83
84
85 /**
86  * Datastore 'PUT' load tracking.
87  */
88 static struct GNUNET_LOAD_Value *datastore_put_load;
89
90
91 /**
92  * Are we allowed to migrate content to this peer.
93  */
94 static int active_to_migration;
95
96
97 /**
98  * Heap with the request that will expire next at the top.  Contains
99  * pointers of type "struct PendingRequest*"; these will *also* be
100  * aliased from the "requests_by_peer" data structures and the
101  * "requests_by_query" table.  Note that requests from our clients
102  * don't expire and are thus NOT in the "requests_by_expiration"
103  * (or the "requests_by_peer" tables).
104  */
105 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
106
107
108 /**
109  * How many bytes should a bloomfilter be if we have already seen
110  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
111  * of bits set per entry.  Furthermore, we should not re-size the
112  * filter too often (to keep it cheap).
113  *
114  * Since other peers will also add entries but not resize the filter,
115  * we should generally pick a slightly larger size than what the
116  * strict math would suggest.
117  *
118  * @return must be a power of two and smaller or equal to 2^15.
119  */
120 static size_t
121 compute_bloomfilter_size (unsigned int entry_count)
122 {
123   size_t size;
124   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
125   uint16_t max = 1 << 15;
126
127   if (entry_count > max)
128     return max;
129   size = 8;
130   while ((size < max) && (size < ideal))
131     size *= 2;
132   if (size > max)
133     return max;
134   return size;
135 }
136
137
138 /**
139  * Recalculate our bloom filter for filtering replies.  This function
140  * will create a new bloom filter from scratch, so it should only be
141  * called if we have no bloomfilter at all (and hence can create a
142  * fresh one of minimal size without problems) OR if our peer is the
143  * initiator (in which case we may resize to larger than mimimum size).
144  *
145  * @param pr request for which the BF is to be recomputed
146  * @return GNUNET_YES if a refresh actually happened
147  */
148 static int
149 refresh_bloomfilter (struct GSF_PendingRequest *pr)
150 {
151   unsigned int i;
152   size_t nsize;
153   GNUNET_HashCode mhash;
154
155   nsize = compute_bloomfilter_size (pr->replies_seen_off);
156   if ( (bf != NULL) &&
157        (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
158     return GNUNET_NO; /* size not changed */
159   if (pr->bf != NULL)
160     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
161   pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
162                                                    UINT32_MAX);
163   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
164                                               nsize,
165                                               BLOOMFILTER_K);
166   for (i=0;i<pr->replies_seen_count;i++)
167     {
168       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
169                                 pr->mingle,
170                                 &mhash);
171       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
172     }
173   return GNUNET_YES;
174 }
175
176
177 /**
178  * Create a new pending request.  
179  *
180  * @param options request options
181  * @param type type of the block that is being requested
182  * @param query key for the lookup
183  * @param namespace namespace to lookup, NULL for no namespace
184  * @param target preferred target for the request, NULL for none
185  * @param bf bloom filter for known replies, can be NULL
186  * @param mingle mingle value for bf
187  * @param anonymity_level desired anonymity level
188  * @param priority maximum outgoing cummulative request priority to use
189  * @param replies_seen hash codes of known local replies
190  * @param replies_seen_count size of the 'replies_seen' array
191  * @param rh handle to call when we get a reply
192  * @param rh_cls closure for rh
193  * @return handle for the new pending request
194  */
195 struct GSF_PendingRequest *
196 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
197                              enum GNUNET_BLOCK_Type type,
198                              const GNUNET_HashCode *query,
199                              const GNUNET_HashCode *namespace,
200                              const struct GNUNET_PeerIdentity *target,
201                              const struct GNUNET_CONTAINER_BloomFilter *bf,
202                              int32_t mingle,
203                              uint32_t anonymity_level,
204                              uint32_t priority,
205                              const GNUNET_HashCode *replies_seen,
206                              unsigned int replies_seen_count,
207                              GSF_PendingRequestReplyHandler rh,
208                              void *rh_cls)
209 {
210   struct GSF_PendingRequest *pr;
211
212   
213   pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
214   pr->public_data.query = *query;
215   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
216     {
217       GNUNET_assert (NULL != namespace);
218       pr->public_data.namespace = *namespace;
219     }
220   if (NULL != target)
221     {
222       pr->public_data.target = *target;
223       pr->has_target = GNUNET_YES;
224     }
225   pr->public_data.anonymity_level = anonymity_data;
226   pr->public_data.priority = priority;
227   pr->public_data.options = options;
228   pr->public_data.type = type;  
229   pr->rh = rh;
230   pr->rh_cls = rh_cls;
231   if (replies_seen_count > 0)
232     {
233       pr->replies_seen_size = replies_seen_count;
234       pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
235       memcpy (pr->replies_seen,
236               replies_seen,
237               replies_seen_count * sizeof (struct GNUNET_HashCode));
238       pr->replies_seen_count = replies_seen_count;
239     }
240   if (NULL != bf)    
241     {
242       pr->bf = GNUNET_CONTAINER_bloomfilter_copy (bf);
243       pr->mingle = mingle;
244     }
245   else if ( (replies_seen_count > 0) &&
246             (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
247     {
248       GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
249     }
250   GNUNET_CONTAINER_multihashmap_put (pr_map,
251                                      query,
252                                      pr,
253                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
254   // FIXME: if not a local query, we also need to track the
255   // total number of external queries we currently have and
256   // bound it => need an additional heap!
257   return pr;
258 }
259
260
261 /**
262  * Update a given pending request with additional replies
263  * that have been seen.
264  *
265  * @param pr request to update
266  * @param replies_seen hash codes of replies that we've seen
267  * @param replies_seen_count size of the replies_seen array
268  */
269 void
270 GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
271                              const GNUNET_HashCode *replies_seen,
272                              unsigned int replies_seen_count)
273 {
274   unsigned int i;
275   GNUNET_HashCode mhash;
276
277   if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
278     return; /* integer overflow */
279   if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
280     {
281       /* we're responsible for the BF, full refresh */
282       if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
283         GNUNET_array_grow (pr->replies_seen,
284                            pr->replies_seen_size,
285                            replies_seen_count + pr->replies_seen_count);
286       memcpy (&pr->replies_seen[pr->replies_seen_count],
287               replies_seen,
288               sizeof (GNUNET_HashCode) * replies_seen_count);
289       pr->replies_seen_count += replies_seen;
290       if (GNUNET_NO == refresh_bloomfilter (pr))
291         {
292           /* bf not recalculated, simply extend it with new bits */
293           for (i=0;i<pr->replies_seen_count;i++)
294             {
295               GNUNET_BLOCK_mingle_hash (&replies_seen[i],
296                                         pr->mingle,
297                                         &mhash);
298               GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
299             }
300         }
301     }
302   else
303     {
304       if (NULL == pr->bf)
305         {
306           /* we're not the initiator, but the initiator did not give us
307              any bloom-filter, so we need to create one on-the-fly */
308           pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
309                                                            UINT32_MAX);
310           pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count),
311                                                       pr->mingle,
312                                                       BLOOMFILTER_K);
313         }
314       for (i=0;i<pr->replies_seen_count;i++)
315         {
316           GNUNET_BLOCK_mingle_hash (&replies_seen[i],
317                                     pr->mingle,
318                                     &mhash);
319           GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
320         }
321     }
322 }
323
324
325 /**
326  * Generate the message corresponding to the given pending request for
327  * transmission to other peers (or at least determine its size).
328  *
329  * @param pr request to generate the message for
330  * @param do_route are we routing the reply
331  * @param buf_size number of bytes available in buf
332  * @param buf where to copy the message (can be NULL)
333  * @return number of bytes needed (if > buf_size) or used
334  */
335 size_t
336 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
337                                   int do_route,
338                                   size_t buf_size,
339                                   void *buf)
340 {
341   struct PendingMessage *pm;
342   char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
343   struct GetMessage *gm;
344   GNUNET_HashCode *ext;
345   size_t msize;
346   unsigned int k;
347   int no_route;
348   uint32_t bm;
349   uint32_t prio;
350   size_t bf_size;
351
352   k = 0;
353   bm = 0;
354   if (GNUNET_YES != do_route)
355     {
356       bm |= GET_MESSAGE_BIT_RETURN_TO;
357       k++;      
358     }
359   if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
360     {
361       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
362       k++;
363     }
364   if (GNUNET_YES == pr->has_target)
365     {
366       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
367       k++;
368     }
369   bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
370   msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
371   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
372   if (buf_size < msize)
373     return msize;  
374   gm = (struct GetMessage*) lbuf;
375   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
376   gm->header.size = htons (msize);
377   gm->type = htonl (pr->type);
378   if (GNUNET_YES == do_route)
379     prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
380                                      pr->public_data.priority + 1);
381   else
382     prio = 0;
383   pr->public_data.priority -= prio;
384   gm->priority = htonl (prio);
385   gm->ttl = htonl (pr->ttl);
386   gm->filter_mutator = htonl(pr->mingle); 
387   gm->hash_bitmap = htonl (bm);
388   gm->query = pr->query;
389   ext = (GNUNET_HashCode*) &gm[1];
390   k = 0;
391   if (GNUNET_YES != do_route)
392     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
393   if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
394     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
395   if (GNUNET_YES == pr->has_target)
396     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
397   if (pr->bf != NULL)
398     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
399                                                (char*) &ext[k],
400                                                bf_size);
401   memcpy (buf, gm, msize);
402   return msize;
403 }
404
405
406 /**
407  * Iterator to free pending requests.
408  *
409  * @param cls closure, unused
410  * @param key current key code
411  * @param value value in the hash map (pending request)
412  * @return GNUNET_YES (we should continue to iterate)
413  */
414 static int 
415 clean_request (void *cls,
416                const GNUNET_HashCode * key,
417                void *value)
418 {
419   struct GSF_PendingRequest *pr = value;
420   
421   GNUNET_free_non_null (pr->replies_seen);
422   if (NULL != pr->bf)
423     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
424   GNUNET_free (pr);
425   return GNUNET_YES;
426 }
427
428
429 /**
430  * Explicitly cancel a pending request.
431  *
432  * @param pr request to cancel
433  */
434 void
435 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
436 {
437   GNUNET_assert (GNUNET_OK ==
438                  GNUNET_CONTAINER_multihashmap_remove (pr_map,
439                                                        &pr->public_data.query,
440                                                        pr));
441   GNUNET_assert (GNUNET_YES ==
442                  clean_request (NULL, &pr->public_data.query, pr));  
443 }
444
445
446 /**
447  * Iterate over all pending requests.
448  *
449  * @param it function to call for each request
450  * @param cls closure for it
451  */
452 void
453 GSF_iterate_pending_pr_map_ (GSF_PendingRequestIterator it,
454                              void *cls)
455 {
456   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
457                                          (GNUNET_CONTAINER_HashMapIterator) it,
458                                          cls);
459 }
460
461
462
463
464 /**
465  * Closure for "process_reply" function.
466  */
467 struct ProcessReplyClosure
468 {
469   /**
470    * The data for the reply.
471    */
472   const void *data;
473
474   /**
475    * Who gave us this reply? NULL for local host (or DHT)
476    */
477   struct ConnectedPeer *sender;
478
479   /**
480    * When the reply expires.
481    */
482   struct GNUNET_TIME_Absolute expiration;
483
484   /**
485    * Size of data.
486    */
487   size_t size;
488
489   /**
490    * Type of the block.
491    */
492   enum GNUNET_BLOCK_Type type;
493
494   /**
495    * How much was this reply worth to us?
496    */
497   uint32_t priority;
498
499   /**
500    * Anonymity requirements for this reply.
501    */
502   uint32_t anonymity_level;
503
504   /**
505    * Evaluation result (returned).
506    */
507   enum GNUNET_BLOCK_EvaluationResult eval;
508
509   /**
510    * Did we finish processing the associated request?
511    */ 
512   int finished;
513
514   /**
515    * Did we find a matching request?
516    */
517   int request_found;
518 };
519
520
521 /**
522  * Update the performance data for the sender (if any) since
523  * the sender successfully answered one of our queries.
524  *
525  * @param prq information about the sender
526  * @param pr request that was satisfied
527  */
528 static void
529 update_request_performance_data (struct ProcessReplyClosure *prq,
530                                  struct GSF_PendingRequest *pr)
531 {
532   unsigned int i;
533   struct GNUNET_TIME_Relative cur_delay;
534
535   if (prq->sender == NULL)
536     return;      
537   /* FIXME: adapt code to new API... */
538   for (i=0;i<pr->used_targets_off;i++)
539     if (pr->used_targets[i].pid == prq->sender->pid)
540       break;
541   if (i < pr->used_targets_off)
542     {
543       cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
544       prq->sender->avg_delay.rel_value
545         = (prq->sender->avg_delay.rel_value * 
546            (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
547       prq->sender->avg_priority
548         = (prq->sender->avg_priority * 
549            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
550     }
551   if (pr->cp != NULL)
552     {
553       GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
554                              [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
555                              -1);
556       GNUNET_PEER_change_rc (pr->cp->pid, 1);
557       prq->sender->last_p2p_replies
558         [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
559         = pr->cp->pid;
560     }
561   else
562     {
563       if (NULL != prq->sender->last_client_replies
564           [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
565         GNUNET_SERVER_client_drop (prq->sender->last_client_replies
566                                    [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
567       prq->sender->last_client_replies
568         [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
569         = pr->client_request_list->client_list->client;
570       GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
571     }
572 }
573                                 
574
575
576 /**
577  * We have received a reply; handle it!
578  *
579  * @param cls response (struct ProcessReplyClosure)
580  * @param key our query
581  * @param value value in the hash map (info about the query)
582  * @return GNUNET_YES (we should continue to iterate)
583  */
584 static int
585 process_reply (void *cls,
586                const GNUNET_HashCode * key,
587                void *value)
588 {
589   struct ProcessReplyClosure *prq = cls;
590   struct GSF_PendingRequest *pr = value;
591   struct PendingMessage *reply;
592   struct ClientResponseMessage *creply;
593   struct ClientList *cl;
594   struct PutMessage *pm;
595   struct ConnectedPeer *cp;
596   size_t msize;
597
598 #if DEBUG_FS
599   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
600               "Matched result (type %u) for query `%s' with pending request\n",
601               (unsigned int) prq->type,
602               GNUNET_h2s (key));
603 #endif  
604   GNUNET_STATISTICS_update (stats,
605                             gettext_noop ("# replies received and matched"),
606                             1,
607                             GNUNET_NO);
608   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
609                                      prq->type,
610                                      key,
611                                      &pr->bf,
612                                      pr->mingle,
613                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
614                                      prq->data,
615                                      prq->size);
616   switch (prq->eval)
617     {
618     case GNUNET_BLOCK_EVALUATION_OK_MORE:
619       update_request_performance_data (prq, pr);
620       break;
621     case GNUNET_BLOCK_EVALUATION_OK_LAST:
622       update_request_performance_data (prq, pr);
623       /* FIXME: adapt code to new API! */
624       while (NULL != pr->pending_head)
625         destroy_pending_message_list_entry (pr->pending_head);
626       if (pr->qe != NULL)
627         {
628           if (pr->client_request_list != NULL)
629             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
630                                         GNUNET_YES);
631           GNUNET_DATASTORE_cancel (pr->qe);
632           pr->qe = NULL;
633         }
634       pr->do_remove = GNUNET_YES;
635       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
636         {
637           GNUNET_SCHEDULER_cancel (pr->task);
638           pr->task = GNUNET_SCHEDULER_NO_TASK;
639         }
640       GNUNET_break (GNUNET_YES ==
641                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
642                                                           key,
643                                                           pr));
644       GNUNET_LOAD_update (rt_entry_lifetime,
645                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
646       break;
647     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
648       GNUNET_STATISTICS_update (stats,
649                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
650                                 1,
651                                 GNUNET_NO);
652 #if DEBUG_FS && 0
653       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
654                   "Duplicate response `%s', discarding.\n",
655                   GNUNET_h2s (&mhash));
656 #endif
657       return GNUNET_YES; /* duplicate */
658     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
659       return GNUNET_YES; /* wrong namespace */  
660     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
661       GNUNET_break (0);
662       return GNUNET_YES;
663     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
664       GNUNET_break (0);
665       return GNUNET_YES;
666     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
667       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
668                   _("Unsupported block type %u\n"),
669                   prq->type);
670       return GNUNET_NO;
671     }
672   /* FIXME: adapt code to new API! */
673   if (pr->client_request_list != NULL)
674     {
675       if (pr->replies_seen_size == pr->replies_seen_off)
676         GNUNET_array_grow (pr->replies_seen,
677                            pr->replies_seen_size,
678                            pr->replies_seen_size * 2 + 4);      
679       GNUNET_CRYPTO_hash (prq->data,
680                           prq->size,
681                           &pr->replies_seen[pr->replies_seen_off++]);         
682       refresh_bloomfilter (pr);
683     }
684   if (NULL == prq->sender)
685     {
686 #if DEBUG_FS
687       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688                   "Found result for query `%s' in local datastore\n",
689                   GNUNET_h2s (key));
690 #endif
691       GNUNET_STATISTICS_update (stats,
692                                 gettext_noop ("# results found locally"),
693                                 1,
694                                 GNUNET_NO);      
695     }
696   prq->priority += pr->remaining_priority;
697   pr->remaining_priority = 0;
698   pr->results_found++;
699   prq->request_found = GNUNET_YES;
700   /* finally, pass on to other peers / local clients */
701   pr->rh (pr->rh_cls, pr, prq->data, prq->size);
702   return GNUNET_YES;
703 }
704
705
706 /**
707  * Continuation called to notify client about result of the
708  * operation.
709  *
710  * @param cls closure
711  * @param success GNUNET_SYSERR on failure
712  * @param msg NULL on success, otherwise an error message
713  */
714 static void 
715 put_migration_continuation (void *cls,
716                             int success,
717                             const char *msg)
718 {
719   struct GNUNET_TIME_Absolute *start = cls;
720   struct GNUNET_TIME_Relative delay;
721   
722   delay = GNUNET_TIME_absolute_get_duration (*start);
723   GNUNET_free (start);
724   /* FIXME: should we really update the load value on failure? */
725   GNUNET_LOAD_update (datastore_put_load,
726                       delay.rel_value);
727   if (GNUNET_OK == success)
728     return;
729   GNUNET_STATISTICS_update (stats,
730                             gettext_noop ("# datastore 'put' failures"),
731                             1,
732                             GNUNET_NO);
733 }
734
735
736 /**
737  * Test if the DATABASE (PUT) load on this peer is too high
738  * to even consider processing the query at
739  * all.  
740  * 
741  * @return GNUNET_YES if the load is too high to do anything (load high)
742  *         GNUNET_NO to process normally (load normal or low)
743  */
744 static int
745 test_put_load_too_high (uint32_t priority)
746 {
747   double ld;
748
749   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
750     return GNUNET_NO; /* very fast */
751   ld = GNUNET_LOAD_get_load (datastore_put_load);
752   if (ld < 2.0 * (1 + priority))
753     return GNUNET_NO;
754   GNUNET_STATISTICS_update (stats,
755                             gettext_noop ("# storage requests dropped due to high load"),
756                             1,
757                             GNUNET_NO);
758   return GNUNET_YES;
759 }
760
761
762 /**
763  * Iterator called on each result obtained for a DHT
764  * operation that expects a reply
765  *
766  * @param cls closure
767  * @param exp when will this value expire
768  * @param key key of the result
769  * @param get_path NULL-terminated array of pointers
770  *                 to the peers on reverse GET path (or NULL if not recorded)
771  * @param put_path NULL-terminated array of pointers
772  *                 to the peers on the PUT path (or NULL if not recorded)
773  * @param type type of the result
774  * @param size number of bytes in data
775  * @param data pointer to the result data
776  */
777 void
778 GSF_handle_dht_reply_ (void *cls,
779                        struct GNUNET_TIME_Absolute exp,
780                        const GNUNET_HashCode * key,
781                        const struct GNUNET_PeerIdentity * const *get_path,
782                        const struct GNUNET_PeerIdentity * const *put_path,
783                        enum GNUNET_BLOCK_Type type,
784                        size_t size,
785                        const void *data)
786 {
787   struct GSF_PendingRequest *pr = cls;
788   struct ProcessReplyClosure prq;
789
790   memset (&prq, 0, sizeof (prq));
791   prq.data = data;
792   prq.expiration = exp;
793   prq.size = size;  
794   prq.type = type;
795   process_reply (&prq, key, pr);
796   if ( (GNUNET_YES == active_to_migration) &&
797        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
798     {      
799 #if DEBUG_FS
800       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801                   "Replicating result for query `%s' with priority %u\n",
802                   GNUNET_h2s (&query),
803                   prq.priority);
804 #endif
805       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
806       *start = GNUNET_TIME_absolute_get ();
807       GNUNET_DATASTORE_put (dsh,
808                             0, &query, dsize, &put[1],
809                             type, prq.priority, 1 /* anonymity */, 
810                             expiration, 
811                             1 + prq.priority, MAX_DATASTORE_QUEUE,
812                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
813                             &put_migration_continuation, 
814                             start);
815     }
816 }
817
818
819 /**
820  * Handle P2P "CONTENT" message.  Checks that the message is
821  * well-formed and then checks if there are any pending requests for
822  * this content and possibly passes it on (to local clients or other
823  * peers).  Does NOT perform migration (content caching at this peer).
824  *
825  * @param cp the other peer involved (sender or receiver, NULL
826  *        for loopback messages where we are both sender and receiver)
827  * @param message the actual message
828  * @return GNUNET_OK if the message was well-formed,
829  *         GNUNET_SYSERR if the message was malformed (close connection,
830  *         do not cache under any circumstances)
831  */
832 int
833 GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
834                          const struct GNUNET_MessageHeader *message)
835 {
836   const struct PutMessage *put;
837   uint16_t msize;
838   size_t dsize;
839   enum GNUNET_BLOCK_Type type;
840   struct GNUNET_TIME_Absolute expiration;
841   GNUNET_HashCode query;
842   struct ProcessReplyClosure prq;
843   struct GNUNET_TIME_Relative block_time;  
844   double putl;
845   struct GNUNET_TIME_Absolute *start;
846
847   msize = ntohs (message->size);
848   if (msize < sizeof (struct PutMessage))
849     {
850       GNUNET_break_op(0);
851       return GNUNET_SYSERR;
852     }
853   put = (const struct PutMessage*) message;
854   dsize = msize - sizeof (struct PutMessage);
855   type = ntohl (put->type);
856   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
857   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
858     return GNUNET_SYSERR;
859   if (GNUNET_OK !=
860       GNUNET_BLOCK_get_key (block_ctx,
861                             type,
862                             &put[1],
863                             dsize,
864                             &query))
865     {
866       GNUNET_break_op (0);
867       return GNUNET_SYSERR;
868     }
869   /* now, lookup 'query' */
870   prq.data = (const void*) &put[1];
871   if (NULL != cp)
872     prq.sender = cp;
873   else
874     prq.sender = NULL;
875   prq.size = dsize;
876   prq.type = type;
877   prq.expiration = expiration;
878   prq.priority = 0;
879   prq.anonymity_level = 1;
880   prq.finished = GNUNET_NO;
881   prq.request_found = GNUNET_NO;
882   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
883                                               &query,
884                                               &process_reply,
885                                               &prq);
886   if (NULL != cp)
887     {
888       GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
889       GSF_get_peer_performance_data (cp)->trust += prq.priority;
890     }
891   if ( (GNUNET_YES == active_to_migration) &&
892        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
893     {      
894 #if DEBUG_FS
895       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
896                   "Replicating result for query `%s' with priority %u\n",
897                   GNUNET_h2s (&query),
898                   prq.priority);
899 #endif
900       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
901       *start = GNUNET_TIME_absolute_get ();
902       GNUNET_DATASTORE_put (dsh,
903                             0, &query, dsize, &put[1],
904                             type, prq.priority, 1 /* anonymity */, 
905                             expiration, 
906                             1 + prq.priority, MAX_DATASTORE_QUEUE,
907                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
908                             &put_migration_continuation, 
909                             start);
910     }
911   putl = GNUNET_LOAD_get_load (datastore_put_load);
912   if ( (NULL != (cp = prq.sender)) &&
913        (GNUNET_NO == prq.request_found) &&
914        ( (GNUNET_YES != active_to_migration) ||
915          (putl > 2.5 * (1 + prq.priority)) ) ) 
916     {
917       if (GNUNET_YES != active_to_migration) 
918         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
919       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
920                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
921                                                                                    (unsigned int) (60000 * putl * putl)));
922       GSF_block_peer_migration (cp, block_time);
923     }
924   return GNUNET_OK;
925 }
926
927
928 /**
929  * Setup the subsystem.
930  */
931 void
932 GSF_pending_request_init_ ()
933 {
934   pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
935 }
936
937
938 /**
939  * Shutdown the subsystem.
940  */
941 void
942 GSF_pending_request_done_ ()
943 {
944   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
945                                          &clean_request,
946                                          NULL);
947   GNUNET_CONTAINER_multihashmap_destroy (pr_map);
948   pr_map = NULL;
949 }
950
951
952 /* end of gnunet-service-fs_pr.c */