oops
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pr.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pr.c
23  * @brief API to handle pending requests
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_pr.h"
28
29
30 /**
31  * An active request.
32  */
33 struct GSF_PendingRequest
34 {
35   /**
36    * Public data for the request.
37    */ 
38   struct GSF_PendingRequestData public_data;
39
40   /**
41    * Function to call if we encounter a reply.
42    */
43   GSF_PendingRequestReplyHandler rh;
44
45   /**
46    * Closure for 'rh'
47    */
48   void *rh_cls;
49
50   /**
51    * Array of hash codes of replies we've already seen.
52    */
53   GNUNET_HashCode *replies_seen;
54
55   /**
56    * Bloomfilter masking replies we've already seen.
57    */
58   struct GNUNET_CONTAINER_BloomFilter *bf;
59
60   /**
61    * Entry for this pending request in the expiration heap, or NULL.
62    */
63   struct GNUNET_CONTAINER_HeapNode *hnode;
64
65   /**
66    * Number of valid entries in the 'replies_seen' array.
67    */
68   unsigned int replies_seen_count;
69
70   /**
71    * Length of the 'replies_seen' array.
72    */
73   unsigned int replies_seen_size;
74
75   /**
76    * Mingle value we currently use for the bf.
77    */
78   uint32_t mingle;
79                             
80 };
81
82
83 /**
84  * All pending requests, ordered by the query.  Entries
85  * are of type 'struct GSF_PendingRequest*'.
86  */
87 static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
88
89
90 /**
91  * Datastore 'PUT' load tracking.
92  */
93 static struct GNUNET_LOAD_Value *datastore_put_load;
94
95
96 /**
97  * Are we allowed to migrate content to this peer.
98  */
99 static int active_to_migration;
100
101
102 /**
103  * Heap with the request that will expire next at the top.  Contains
104  * pointers of type "struct PendingRequest*"; these will *also* be
105  * aliased from the "requests_by_peer" data structures and the
106  * "requests_by_query" table.  Note that requests from our clients
107  * don't expire and are thus NOT in the "requests_by_expiration"
108  * (or the "requests_by_peer" tables).
109  */
110 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
111
112
113 /**
114  * How many bytes should a bloomfilter be if we have already seen
115  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
116  * of bits set per entry.  Furthermore, we should not re-size the
117  * filter too often (to keep it cheap).
118  *
119  * Since other peers will also add entries but not resize the filter,
120  * we should generally pick a slightly larger size than what the
121  * strict math would suggest.
122  *
123  * @return must be a power of two and smaller or equal to 2^15.
124  */
125 static size_t
126 compute_bloomfilter_size (unsigned int entry_count)
127 {
128   size_t size;
129   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
130   uint16_t max = 1 << 15;
131
132   if (entry_count > max)
133     return max;
134   size = 8;
135   while ((size < max) && (size < ideal))
136     size *= 2;
137   if (size > max)
138     return max;
139   return size;
140 }
141
142
143 /**
144  * Recalculate our bloom filter for filtering replies.  This function
145  * will create a new bloom filter from scratch, so it should only be
146  * called if we have no bloomfilter at all (and hence can create a
147  * fresh one of minimal size without problems) OR if our peer is the
148  * initiator (in which case we may resize to larger than mimimum size).
149  *
150  * @param pr request for which the BF is to be recomputed
151  * @return GNUNET_YES if a refresh actually happened
152  */
153 static int
154 refresh_bloomfilter (struct GSF_PendingRequest *pr)
155 {
156   unsigned int i;
157   size_t nsize;
158   GNUNET_HashCode mhash;
159
160   nsize = compute_bloomfilter_size (pr->replies_seen_off);
161   if ( (bf != NULL) &&
162        (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
163     return GNUNET_NO; /* size not changed */
164   if (pr->bf != NULL)
165     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
166   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
167                                          UINT32_MAX);
168   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
169                                               nsize,
170                                               BLOOMFILTER_K);
171   for (i=0;i<pr->replies_seen_count;i++)
172     {
173       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
174                                 pr->mingle,
175                                 &mhash);
176       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
177     }
178   return GNUNET_YES;
179 }
180
181
182 /**
183  * Create a new pending request.  
184  *
185  * @param options request options
186  * @param type type of the block that is being requested
187  * @param query key for the lookup
188  * @param namespace namespace to lookup, NULL for no namespace
189  * @param target preferred target for the request, NULL for none
190  * @param bf_data raw data for bloom filter for known replies, can be NULL
191  * @param bf_size number of bytes in bf_data
192  * @param mingle mingle value for bf
193  * @param anonymity_level desired anonymity level
194  * @param priority maximum outgoing cummulative request priority to use
195  * @param ttl current time-to-live for the request
196  * @param replies_seen hash codes of known local replies
197  * @param replies_seen_count size of the 'replies_seen' array
198  * @param rh handle to call when we get a reply
199  * @param rh_cls closure for rh
200  * @return handle for the new pending request
201  */
202 struct GSF_PendingRequest *
203 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
204                              enum GNUNET_BLOCK_Type type,
205                              const GNUNET_HashCode *query,
206                              const GNUNET_HashCode *namespace,
207                              const struct GNUNET_PeerIdentity *target,
208                              const char *bf_data,
209                              size_t bf_size,
210                              uint32_t mingle,
211                              uint32_t anonymity_level,
212                              uint32_t priority,
213                              int32_t ttl,
214                              const GNUNET_HashCode *replies_seen,
215                              unsigned int replies_seen_count,
216                              GSF_PendingRequestReplyHandler rh,
217                              void *rh_cls)
218 {
219   struct GSF_PendingRequest *pr;
220   struct GSF_PendingRequest *dpr;
221   
222   pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
223   pr->public_data.query = *query;
224   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
225     {
226       GNUNET_assert (NULL != namespace);
227       pr->public_data.namespace = *namespace;
228     }
229   if (NULL != target)
230     {
231       pr->public_data.target = *target;
232       pr->has_target = GNUNET_YES;
233     }
234   pr->public_data.anonymity_level = anonymity_data;
235   pr->public_data.priority = priority;
236   pr->public_data.original_priority = priority;
237   pr->public_data.options = options;
238   pr->public_data.type = type;  
239   pr->public_data.start_time = GNUNET_TIME_absolute_get ();
240   pr->rh = rh;
241   pr->rh_cls = rh_cls;
242   if (ttl >= 0)
243     pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
244                                                                                (uint32_t) ttl));
245   else
246     pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
247                                              GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
248                                                                             (uint32_t) (- ttl)));
249   if (replies_seen_count > 0)
250     {
251       pr->replies_seen_size = replies_seen_count;
252       pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
253       memcpy (pr->replies_seen,
254               replies_seen,
255               replies_seen_count * sizeof (struct GNUNET_HashCode));
256       pr->replies_seen_count = replies_seen_count;
257     }
258   if (NULL != bf_data)    
259     {
260       pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data,
261                                                   bf_size,
262                                                   BLOOMFILTER_K);
263       pr->mingle = mingle;
264     }
265   else if ( (replies_seen_count > 0) &&
266             (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
267     {
268       GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
269     }
270   GNUNET_CONTAINER_multihashmap_put (pr_map,
271                                      query,
272                                      pr,
273                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
274   if (0 != (options & GSF_PRO_REQUEST_EXPIRES))
275     {
276       pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
277                                                 pr,
278                                                 pr->ttl.abs_value);
279       /* make sure we don't track too many requests */
280       while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
281         {
282           dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
283           GNUNET_assert (dpr != NULL);
284           if (pr == dpr)
285             break; /* let the request live briefly... */
286           dpr->rh (dpr->rh_cls,
287                    dpr,
288                    GNUNET_TIME_UNIT_FOREVER_ABS,
289                    NULL, 0,
290                    GNUNET_SYSERR);
291           GSF_pending_request_cancel_ (dpr);
292         }
293     }
294   return pr;
295 }
296
297
298 /**
299  * Obtain the public data associated with a pending request
300  * 
301  * @param pr pending request
302  * @return associated public data
303  */
304 struct GSF_PendingRequestData *
305 GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
306 {
307   return &pr->public_data;
308 }
309
310
311 /**
312  * Update a given pending request with additional replies
313  * that have been seen.
314  *
315  * @param pr request to update
316  * @param replies_seen hash codes of replies that we've seen
317  * @param replies_seen_count size of the replies_seen array
318  */
319 void
320 GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
321                              const GNUNET_HashCode *replies_seen,
322                              unsigned int replies_seen_count)
323 {
324   unsigned int i;
325   GNUNET_HashCode mhash;
326
327   if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
328     return; /* integer overflow */
329   if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
330     {
331       /* we're responsible for the BF, full refresh */
332       if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
333         GNUNET_array_grow (pr->replies_seen,
334                            pr->replies_seen_size,
335                            replies_seen_count + pr->replies_seen_count);
336       memcpy (&pr->replies_seen[pr->replies_seen_count],
337               replies_seen,
338               sizeof (GNUNET_HashCode) * replies_seen_count);
339       pr->replies_seen_count += replies_seen;
340       if (GNUNET_NO == refresh_bloomfilter (pr))
341         {
342           /* bf not recalculated, simply extend it with new bits */
343           for (i=0;i<pr->replies_seen_count;i++)
344             {
345               GNUNET_BLOCK_mingle_hash (&replies_seen[i],
346                                         pr->mingle,
347                                         &mhash);
348               GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
349             }
350         }
351     }
352   else
353     {
354       if (NULL == pr->bf)
355         {
356           /* we're not the initiator, but the initiator did not give us
357              any bloom-filter, so we need to create one on-the-fly */
358           pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
359                                                  UINT32_MAX);
360           pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count),
361                                                       pr->mingle,
362                                                       BLOOMFILTER_K);
363         }
364       for (i=0;i<pr->replies_seen_count;i++)
365         {
366           GNUNET_BLOCK_mingle_hash (&replies_seen[i],
367                                     pr->mingle,
368                                     &mhash);
369           GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
370         }
371     }
372 }
373
374
375 /**
376  * Generate the message corresponding to the given pending request for
377  * transmission to other peers (or at least determine its size).
378  *
379  * @param pr request to generate the message for
380  * @param do_route are we routing the reply
381  * @param buf_size number of bytes available in buf
382  * @param buf where to copy the message (can be NULL)
383  * @return number of bytes needed (if > buf_size) or used
384  */
385 size_t
386 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
387                                   int do_route,
388                                   size_t buf_size,
389                                   void *buf)
390 {
391   struct PendingMessage *pm;
392   char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
393   struct GetMessage *gm;
394   GNUNET_HashCode *ext;
395   size_t msize;
396   unsigned int k;
397   int no_route;
398   uint32_t bm;
399   uint32_t prio;
400   size_t bf_size;
401
402   k = 0;
403   bm = 0;
404   if (GNUNET_YES != do_route)
405     {
406       bm |= GET_MESSAGE_BIT_RETURN_TO;
407       k++;      
408     }
409   if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
410     {
411       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
412       k++;
413     }
414   if (GNUNET_YES == pr->has_target)
415     {
416       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
417       k++;
418     }
419   bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
420   msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
421   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
422   if (buf_size < msize)
423     return msize;  
424   gm = (struct GetMessage*) lbuf;
425   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
426   gm->header.size = htons (msize);
427   gm->type = htonl (pr->type);
428   if (GNUNET_YES == do_route)
429     prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
430                                      pr->public_data.priority + 1);
431   else
432     prio = 0;
433   pr->public_data.priority -= prio;
434   gm->priority = htonl (prio);
435   gm->ttl = htonl (pr->ttl);
436   gm->filter_mutator = htonl(pr->mingle); 
437   gm->hash_bitmap = htonl (bm);
438   gm->query = pr->query;
439   ext = (GNUNET_HashCode*) &gm[1];
440   k = 0;
441   if (GNUNET_YES != do_route)
442     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
443   if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
444     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
445   if (GNUNET_YES == pr->has_target)
446     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
447   if (pr->bf != NULL)
448     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
449                                                (char*) &ext[k],
450                                                bf_size);
451   memcpy (buf, gm, msize);
452   return msize;
453 }
454
455
456 /**
457  * Iterator to free pending requests.
458  *
459  * @param cls closure, unused
460  * @param key current key code
461  * @param value value in the hash map (pending request)
462  * @return GNUNET_YES (we should continue to iterate)
463  */
464 static int 
465 clean_request (void *cls,
466                const GNUNET_HashCode * key,
467                void *value)
468 {
469   struct GSF_PendingRequest *pr = value;
470   
471   GNUNET_free_non_null (pr->replies_seen);
472   if (NULL != pr->bf)
473     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
474   if (NULL != pr->hnode)
475     GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
476                                        pr->hnode);
477   GNUNET_free (pr);
478   return GNUNET_YES;
479 }
480
481
482 /**
483  * Explicitly cancel a pending request.
484  *
485  * @param pr request to cancel
486  */
487 void
488 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
489 {
490   GNUNET_assert (GNUNET_OK ==
491                  GNUNET_CONTAINER_multihashmap_remove (pr_map,
492                                                        &pr->public_data.query,
493                                                        pr));
494   GNUNET_assert (GNUNET_YES ==
495                  clean_request (NULL, &pr->public_data.query, pr));  
496 }
497
498
499 /**
500  * Iterate over all pending requests.
501  *
502  * @param it function to call for each request
503  * @param cls closure for it
504  */
505 void
506 GSF_iterate_pending_pr_map_ (GSF_PendingRequestIterator it,
507                              void *cls)
508 {
509   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
510                                          (GNUNET_CONTAINER_HashMapIterator) it,
511                                          cls);
512 }
513
514
515
516
517 /**
518  * Closure for "process_reply" function.
519  */
520 struct ProcessReplyClosure
521 {
522   /**
523    * The data for the reply.
524    */
525   const void *data;
526
527   /**
528    * Who gave us this reply? NULL for local host (or DHT)
529    */
530   struct GSF_ConnectedPeer *sender;
531
532   /**
533    * When the reply expires.
534    */
535   struct GNUNET_TIME_Absolute expiration;
536
537   /**
538    * Size of data.
539    */
540   size_t size;
541
542   /**
543    * Type of the block.
544    */
545   enum GNUNET_BLOCK_Type type;
546
547   /**
548    * How much was this reply worth to us?
549    */
550   uint32_t priority;
551
552   /**
553    * Anonymity requirements for this reply.
554    */
555   uint32_t anonymity_level;
556
557   /**
558    * Evaluation result (returned).
559    */
560   enum GNUNET_BLOCK_EvaluationResult eval;
561
562   /**
563    * Did we finish processing the associated request?
564    */ 
565   int finished;
566
567   /**
568    * Did we find a matching request?
569    */
570   int request_found;
571 };
572
573
574 /**
575  * Update the performance data for the sender (if any) since
576  * the sender successfully answered one of our queries.
577  *
578  * @param prq information about the sender
579  * @param pr request that was satisfied
580  */
581 static void
582 update_request_performance_data (struct ProcessReplyClosure *prq,
583                                  struct GSF_PendingRequest *pr)
584 {
585   unsigned int i;
586   struct GNUNET_TIME_Relative cur_delay;
587
588   if (prq->sender == NULL)
589     return;      
590   GSF_peer_update_performance_ (prq->sender,
591                                 pr->start_time,
592                                 prq->priority);
593 }
594                                 
595
596 /**
597  * We have received a reply; handle it!
598  *
599  * @param cls response (struct ProcessReplyClosure)
600  * @param key our query
601  * @param value value in the hash map (info about the query)
602  * @return GNUNET_YES (we should continue to iterate)
603  */
604 static int
605 process_reply (void *cls,
606                const GNUNET_HashCode * key,
607                void *value)
608 {
609   struct ProcessReplyClosure *prq = cls;
610   struct GSF_PendingRequest *pr = value;
611   struct PendingMessage *reply;
612   struct ClientResponseMessage *creply;
613   struct ClientList *cl;
614   struct PutMessage *pm;
615   struct ConnectedPeer *cp;
616   size_t msize;
617   GNUNET_HashCode chash;
618
619 #if DEBUG_FS
620   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
621               "Matched result (type %u) for query `%s' with pending request\n",
622               (unsigned int) prq->type,
623               GNUNET_h2s (key));
624 #endif  
625   GNUNET_STATISTICS_update (stats,
626                             gettext_noop ("# replies received and matched"),
627                             1,
628                             GNUNET_NO);
629   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
630                                      prq->type,
631                                      key,
632                                      &pr->bf,
633                                      pr->mingle,
634                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
635                                      prq->data,
636                                      prq->size);
637   switch (prq->eval)
638     {
639     case GNUNET_BLOCK_EVALUATION_OK_MORE:
640       update_request_performance_data (prq, pr);
641       break;
642     case GNUNET_BLOCK_EVALUATION_OK_LAST:
643       /* short cut: stop processing early, no BF-update, etc. */
644       update_request_performance_data (prq, pr);
645       GNUNET_LOAD_update (rt_entry_lifetime,
646                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
647       /* pass on to other peers / local clients */
648       pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_NO);
649       /* destroy request, we're done */
650       GSF_pending_request_cancel_ (pr);
651       return GNUNET_YES;
652     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
653       GNUNET_STATISTICS_update (stats,
654                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
655                                 1,
656                                 GNUNET_NO);
657 #if DEBUG_FS && 0
658       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
659                   "Duplicate response `%s', discarding.\n",
660                   GNUNET_h2s (&mhash));
661 #endif
662       return GNUNET_YES; /* duplicate */
663     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
664       return GNUNET_YES; /* wrong namespace */  
665     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
666       GNUNET_break (0);
667       return GNUNET_YES;
668     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
669       GNUNET_break (0);
670       return GNUNET_YES;
671     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
672       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
673                   _("Unsupported block type %u\n"),
674                   prq->type);
675       return GNUNET_NO;
676     }
677   /* update bloomfilter */
678   GNUNET_CRYPTO_hash (prq->data,
679                       prq->size,
680                       &chash);
681   GSF_pending_request_update_ (pr, &chash, 1);
682   if (NULL == prq->sender)
683     {
684 #if DEBUG_FS
685       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686                   "Found result for query `%s' in local datastore\n",
687                   GNUNET_h2s (key));
688 #endif
689       GNUNET_STATISTICS_update (stats,
690                                 gettext_noop ("# results found locally"),
691                                 1,
692                                 GNUNET_NO);      
693     }
694   prq->priority += pr->public_data.original_priority;
695   pr->public_data.remaining_priority = 0;
696   pr->public_data.original_priority = 0;
697   pr->public_data.results_found++;
698   prq->request_found = GNUNET_YES;
699   /* finally, pass on to other peer / local client */
700   pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES);
701   return GNUNET_YES;
702 }
703
704
705 /**
706  * Continuation called to notify client about result of the
707  * operation.
708  *
709  * @param cls closure
710  * @param success GNUNET_SYSERR on failure
711  * @param msg NULL on success, otherwise an error message
712  */
713 static void 
714 put_migration_continuation (void *cls,
715                             int success,
716                             const char *msg)
717 {
718   struct GNUNET_TIME_Absolute *start = cls;
719   struct GNUNET_TIME_Relative delay;
720   
721   delay = GNUNET_TIME_absolute_get_duration (*start);
722   GNUNET_free (start);
723   /* FIXME: should we really update the load value on failure? */
724   GNUNET_LOAD_update (datastore_put_load,
725                       delay.rel_value);
726   if (GNUNET_OK == success)
727     return;
728   GNUNET_STATISTICS_update (stats,
729                             gettext_noop ("# datastore 'put' failures"),
730                             1,
731                             GNUNET_NO);
732 }
733
734
735 /**
736  * Test if the DATABASE (PUT) load on this peer is too high
737  * to even consider processing the query at
738  * all.  
739  * 
740  * @return GNUNET_YES if the load is too high to do anything (load high)
741  *         GNUNET_NO to process normally (load normal or low)
742  */
743 static int
744 test_put_load_too_high (uint32_t priority)
745 {
746   double ld;
747
748   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
749     return GNUNET_NO; /* very fast */
750   ld = GNUNET_LOAD_get_load (datastore_put_load);
751   if (ld < 2.0 * (1 + priority))
752     return GNUNET_NO;
753   GNUNET_STATISTICS_update (stats,
754                             gettext_noop ("# storage requests dropped due to high load"),
755                             1,
756                             GNUNET_NO);
757   return GNUNET_YES;
758 }
759
760
761 /**
762  * Iterator called on each result obtained for a DHT
763  * operation that expects a reply
764  *
765  * @param cls closure
766  * @param exp when will this value expire
767  * @param key key of the result
768  * @param get_path NULL-terminated array of pointers
769  *                 to the peers on reverse GET path (or NULL if not recorded)
770  * @param put_path NULL-terminated array of pointers
771  *                 to the peers on the PUT path (or NULL if not recorded)
772  * @param type type of the result
773  * @param size number of bytes in data
774  * @param data pointer to the result data
775  */
776 void
777 GSF_handle_dht_reply_ (void *cls,
778                        struct GNUNET_TIME_Absolute exp,
779                        const GNUNET_HashCode * key,
780                        const struct GNUNET_PeerIdentity * const *get_path,
781                        const struct GNUNET_PeerIdentity * const *put_path,
782                        enum GNUNET_BLOCK_Type type,
783                        size_t size,
784                        const void *data)
785 {
786   struct GSF_PendingRequest *pr = cls;
787   struct ProcessReplyClosure prq;
788
789   memset (&prq, 0, sizeof (prq));
790   prq.data = data;
791   prq.expiration = exp;
792   prq.size = size;  
793   prq.type = type;
794   process_reply (&prq, key, pr);
795   if ( (GNUNET_YES == active_to_migration) &&
796        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
797     {      
798 #if DEBUG_FS
799       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800                   "Replicating result for query `%s' with priority %u\n",
801                   GNUNET_h2s (&query),
802                   prq.priority);
803 #endif
804       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
805       *start = GNUNET_TIME_absolute_get ();
806       GNUNET_DATASTORE_put (dsh,
807                             0, &query, dsize, &put[1],
808                             type, prq.priority, 1 /* anonymity */, 
809                             expiration, 
810                             1 + prq.priority, MAX_DATASTORE_QUEUE,
811                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
812                             &put_migration_continuation, 
813                             start);
814     }
815 }
816
817
818 /**
819  * Handle P2P "CONTENT" message.  Checks that the message is
820  * well-formed and then checks if there are any pending requests for
821  * this content and possibly passes it on (to local clients or other
822  * peers).  Does NOT perform migration (content caching at this peer).
823  *
824  * @param cp the other peer involved (sender or receiver, NULL
825  *        for loopback messages where we are both sender and receiver)
826  * @param message the actual message
827  * @return GNUNET_OK if the message was well-formed,
828  *         GNUNET_SYSERR if the message was malformed (close connection,
829  *         do not cache under any circumstances)
830  */
831 int
832 GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
833                          const struct GNUNET_MessageHeader *message)
834 {
835   const struct PutMessage *put;
836   uint16_t msize;
837   size_t dsize;
838   enum GNUNET_BLOCK_Type type;
839   struct GNUNET_TIME_Absolute expiration;
840   GNUNET_HashCode query;
841   struct ProcessReplyClosure prq;
842   struct GNUNET_TIME_Relative block_time;  
843   double putl;
844   struct GNUNET_TIME_Absolute *start;
845
846   msize = ntohs (message->size);
847   if (msize < sizeof (struct PutMessage))
848     {
849       GNUNET_break_op(0);
850       return GNUNET_SYSERR;
851     }
852   put = (const struct PutMessage*) message;
853   dsize = msize - sizeof (struct PutMessage);
854   type = ntohl (put->type);
855   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
856   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
857     return GNUNET_SYSERR;
858   if (GNUNET_OK !=
859       GNUNET_BLOCK_get_key (block_ctx,
860                             type,
861                             &put[1],
862                             dsize,
863                             &query))
864     {
865       GNUNET_break_op (0);
866       return GNUNET_SYSERR;
867     }
868   /* now, lookup 'query' */
869   prq.data = (const void*) &put[1];
870   if (NULL != cp)
871     prq.sender = cp;
872   else
873     prq.sender = NULL;
874   prq.size = dsize;
875   prq.type = type;
876   prq.expiration = expiration;
877   prq.priority = 0;
878   prq.anonymity_level = 1;
879   prq.finished = GNUNET_NO;
880   prq.request_found = GNUNET_NO;
881   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
882                                               &query,
883                                               &process_reply,
884                                               &prq);
885   if (NULL != cp)
886     {
887       GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
888       GSF_get_peer_performance_data (cp)->trust += prq.priority;
889     }
890   if ( (GNUNET_YES == active_to_migration) &&
891        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
892     {      
893 #if DEBUG_FS
894       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
895                   "Replicating result for query `%s' with priority %u\n",
896                   GNUNET_h2s (&query),
897                   prq.priority);
898 #endif
899       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
900       *start = GNUNET_TIME_absolute_get ();
901       GNUNET_DATASTORE_put (dsh,
902                             0, &query, dsize, &put[1],
903                             type, prq.priority, 1 /* anonymity */, 
904                             expiration, 
905                             1 + prq.priority, MAX_DATASTORE_QUEUE,
906                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
907                             &put_migration_continuation, 
908                             start);
909     }
910   putl = GNUNET_LOAD_get_load (datastore_put_load);
911   if ( (NULL != (cp = prq.sender)) &&
912        (GNUNET_NO == prq.request_found) &&
913        ( (GNUNET_YES != active_to_migration) ||
914          (putl > 2.5 * (1 + prq.priority)) ) ) 
915     {
916       if (GNUNET_YES != active_to_migration) 
917         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
918       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
919                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
920                                                                                    (unsigned int) (60000 * putl * putl)));
921       GSF_block_peer_migration (cp, block_time);
922     }
923   return GNUNET_OK;
924 }
925
926
927 /**
928  * Setup the subsystem.
929  */
930 void
931 GSF_pending_request_init_ ()
932 {
933   pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
934   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
935 }
936
937
938 /**
939  * Shutdown the subsystem.
940  */
941 void
942 GSF_pending_request_done_ ()
943 {
944   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
945                                          &clean_request,
946                                          NULL);
947   GNUNET_CONTAINER_multihashmap_destroy (pr_map);
948   pr_map = NULL;
949   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
950   requests_by_expiration_heap = NULL;
951 }
952
953
954 /* end of gnunet-service-fs_pr.c */