903549cb7d537d87ad5f07876abacfba250bb624
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29
30 /**
31  * How often do we flush trust values to disk?
32  */
33 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
34
35
36
37 /**
38  * Handle to cancel a transmission request.
39  */
40 struct GSF_PeerTransmitHandle
41 {
42
43   /**
44    * Handle for an active request for transmission to this
45    * peer, or NULL (if core queue was full).
46    */
47   struct GNUNET_CORE_TransmitHandle *cth;
48
49   /**
50    * Time when this transmission request was issued.
51    */
52   struct GNUNET_TIME_Absolute transmission_request_start_time;
53
54   /**
55    * Timeout for this request.
56    */
57   struct GNUNET_TIME_Absolute timeout;
58
59   /**
60    * Task called on timeout, or 0 for none.
61    */
62   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
63
64   /**
65    * Function to call to get the actual message.
66    */
67   GSF_GetMessageCallback gmc;
68
69   /**
70    * Peer this request targets.
71    */
72   struct GSF_ConnectedPeer *cp;
73
74   /**
75    * Closure for 'gmc'.
76    */
77   void *gmc_cls;
78
79   /**
80    * Size of the message to be transmitted.
81    */
82   size_t size;
83
84   /**
85    * GNUNET_YES if this is a query, GNUNET_NO for content.
86    */
87   int is_query;
88
89   /**
90    * Priority of this request.
91    */
92   uint32_t priority;
93
94 };
95
96
97 /**
98  * A connected peer.
99  */
100 struct GSF_ConnectedPeer 
101 {
102
103   /**
104    * Performance data for this peer.
105    */
106   struct GSF_PeerPerformanceData ppd;
107
108   /**
109    * Time until when we blocked this peer from migrating
110    * data to us.
111    */
112   struct GNUNET_TIME_Absolute last_migration_block;
113
114   /**
115    * Messages (replies, queries, content migration) we would like to
116    * send to this peer in the near future.  Sorted by priority, head.
117    */
118   struct GSF_PeerTransmitHandle *pth_head;
119
120   /**
121    * Messages (replies, queries, content migration) we would like to
122    * send to this peer in the near future.  Sorted by priority, tail.
123    */
124   struct GSF_PeerTransmitHandle *pth_tail;
125
126   /**
127    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
128    * NULL if we have successfully reserved 32k, otherwise non-NULL.
129    */
130   struct GNUNET_CORE_InformationRequestContext *irc;
131
132   /**
133    * ID of delay task for scheduling transmission.
134    */
135   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: unused!
136
137   /**
138    * Increase in traffic preference still to be submitted
139    * to the core service for this peer.
140    */
141   uint64_t inc_preference;
142
143   /**
144    * Trust rating for this peer on disk.
145    */
146   uint32_t disk_trust;
147
148   /**
149    * The peer's identity.
150    */
151   GNUNET_PEER_Id pid;
152
153   /**
154    * Which offset in "last_p2p_replies" will be updated next?
155    * (we go round-robin).
156    */
157   unsigned int last_p2p_replies_woff;
158
159   /**
160    * Which offset in "last_client_replies" will be updated next?
161    * (we go round-robin).
162    */
163   unsigned int last_client_replies_woff;
164
165   /**
166    * Current offset into 'last_request_times' ring buffer.
167    */
168   unsigned int last_request_times_off;
169
170 };
171
172
173 /**
174  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
175  */
176 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
177
178
179 /**
180  * Where do we store trust information?
181  */
182 static char *trustDirectory;
183
184
185 /**
186  * Get the filename under which we would store the GNUNET_HELLO_Message
187  * for the given host and protocol.
188  * @return filename of the form DIRECTORY/HOSTID
189  */
190 static char *
191 get_trust_filename (const struct GNUNET_PeerIdentity *id)
192 {
193   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
194   char *fn;
195
196   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
197   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
198   return fn;
199 }
200
201
202 /**
203  * Find latency information in 'atsi'.
204  *
205  * @param atsi performance data
206  * @return connection latency
207  */
208 static struct GNUNET_TIME_Relative
209 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
210 {
211   if (atsi == NULL)
212     return GNUNET_TIME_UNIT_SECONDS;
213   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
214           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
215     atsi++;
216   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
217     {
218       GNUNET_break (0);
219       /* how can we not have latency data? */
220       return GNUNET_TIME_UNIT_SECONDS;
221     }
222   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
223                                         ntohl (atsi->value));
224 }
225
226
227 /**
228  * Update the performance information kept for the given peer.
229  *
230  * @param cp peer record to update
231  * @param atsi transport performance data
232  */
233 static void
234 update_atsi (struct GSF_ConnectedPeer *cp,
235              const struct GNUNET_TRANSPORT_ATS_Information *atsi)
236 {
237   struct GNUNET_TIME_Relative latency;
238
239   latency = get_latency (atsi);
240   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
241                                  latency);
242   /* LATER: merge atsi into cp's performance data (if we ever care...) */
243 }
244
245
246 /**
247  * Return the performance data record for the given peer
248  * 
249  * @param cp peer to query
250  * @return performance data record for the peer
251  */
252 struct GSF_PeerPerformanceData *
253 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
254 {
255   return &cp->ppd;
256 }
257
258
259 /**
260  * Core is ready to transmit to a peer, get the message.
261  *
262  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
263  * @param size number of bytes core is willing to take
264  * @param buf where to copy the message
265  * @return number of bytes copied to buf
266  */
267 static size_t
268 peer_transmit_ready_cb (void *cls,
269                         size_t size,
270                         void *buf)
271 {
272   struct GSF_PeerTransmitHandle *pth = cls;
273   struct GSF_ConnectedPeer *cp;
274   size_t ret;
275
276   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
277     {
278       GNUNET_SCHEDULER_cancel (pth->timeout_task);
279       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
280     }
281   cp = pth->cp;
282   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
283                                cp->pth_tail,
284                                pth);
285   if (pth->is_query)
286     {
287       cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
288       GNUNET_assert (0 < cp->ppd.pending_queries--);    
289     }
290   else
291     {
292       GNUNET_assert (0 < cp->ppd.pending_replies--);
293     }
294   GNUNET_LOAD_update (cp->ppd.transmission_delay,
295                       GNUNET_TIME_absolute_get_duration (pth->request_start_time).rel_value);  
296   ret = pth->gmc (pth->gmc_cls, 
297                   0, NULL);
298   GNUNET_free (pth);  
299   return ret;
300 }
301
302
303 /**
304  * Function called by core upon success or failure of our bandwidth reservation request.
305  *
306  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
307  * @param peer identifies the peer
308  * @param bandwidth_out available amount of outbound bandwidth
309  * @param amount set to the amount that was actually reserved or unreserved;
310  *               either the full requested amount or zero (no partial reservations)
311  * @param preference current traffic preference for the given peer
312  */
313 static void
314 core_reserve_callback (void *cls,
315                        const struct GNUNET_PeerIdentity * peer,
316                        struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
317                        int amount,
318                        uint64_t preference)
319 {
320   struct GSF_ConnectedPeer *cp = cls;
321   uint64_t ip;
322
323   cp->irc = NULL;
324   if (0 == amount)
325     {
326       /* failed; retry! (how did we get here!?) */
327       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
328                   _("Failed to reserve bandwidth to peer `%s'\n"),
329                   GNUNET_i2s (peer));
330       ip = cp->inc_preference;
331       cp->inc_preference = 0;
332       cp->irc = GNUNET_CORE_peer_change_preference (core,
333                                                     peer,
334                                                     GNUNET_TIME_UNIT_FOREVER_REL,
335                                                     GNUNET_BANDWIDTH_VALUE_MAX,
336                                                     GNUNET_FS_DBLOCK_SIZE,
337                                                     ip,
338                                                     &core_reserve_callback,
339                                                     cp);
340       return;
341     }
342   pth = cp->pth_head;
343   if ( (NULL != pth) &&
344        (NULL == pth->cth) )
345     {
346       /* reservation success, try transmission now! */
347       pth->cth = GNUNET_CORE_notify_transmit_ready (core,
348                                                     priority,
349                                                     GNUNET_TIME_absolute_get_remaining (pth->timeout),
350                                                     &target,
351                                                     size,
352                                                     &peer_transmit_ready_cb,
353                                                     pth);
354     }
355 }
356
357
358 /**
359  * A peer connected to us.  Setup the connected peer
360  * records.
361  *
362  * @param peer identity of peer that connected
363  * @param atsi performance data for the connection
364  * @return handle to connected peer entry
365  */
366 struct GSF_ConnectedPeer *
367 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
368                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
369 {
370   struct GSF_ConnectedPeer *cp;
371   char *fn;
372   uint32_t trust;
373   struct GNUNET_TIME_Relative latency;
374
375   cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
376   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
377   cp->pid = GNUNET_PEER_intern (peer);
378   cp->transmission_delay = GNUNET_LOAD_value_init (0);
379   cp->irc = GNUNET_CORE_peer_change_preference (core,
380                                                 peer,
381                                                 GNUNET_TIME_UNIT_FOREVER_REL,
382                                                 GNUNET_BANDWIDTH_VALUE_MAX,
383                                                 GNUNET_FS_DBLOCK_SIZE,
384                                                 0,
385                                                 &core_reserve_callback,
386                                                 cp);
387   fn = get_trust_filename (peer);
388   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
389       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
390     cp->disk_trust = cp->trust = ntohl (trust);
391   GNUNET_free (fn);
392   GNUNET_break (GNUNET_OK ==
393                 GNUNET_CONTAINER_multihashmap_put (cp_map,
394                                                    &peer->hashPubKey,
395                                                    cp,
396                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
397   update_atsi (cp, atsi);
398   GSF_plan_notify_new_peer_ (cp);
399   return cp;
400 }
401
402
403 /**
404  * Handle P2P "MIGRATION_STOP" message.
405  *
406  * @param cls closure, always NULL
407  * @param other the other peer involved (sender or receiver, NULL
408  *        for loopback messages where we are both sender and receiver)
409  * @param message the actual message
410  * @param atsi performance information
411  * @return GNUNET_OK to keep the connection open,
412  *         GNUNET_SYSERR to close it (signal serious error)
413  */
414 int
415 GSF_handle_p2p_migration_stop_ (void *cls,
416                                 const struct GNUNET_PeerIdentity *other,
417                                 const struct GNUNET_MessageHeader *message,
418                                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
419 {
420   struct GSF_ConnectedPeer *cp; 
421   const struct MigrationStopMessage *msm;
422
423   msm = (const struct MigrationStopMessage*) message;
424   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
425                                           &other->hashPubKey);
426   if (cp == NULL)
427     {
428       GNUNET_break (0);
429       return GNUNET_OK;
430     }
431   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
432   update_atsi (cp, atsi);
433   return GNUNET_OK;
434 }
435
436
437 /**
438  * Handle a reply to a pending request.  Also called if a request
439  * expires (then with data == NULL).  The handler may be called
440  * many times (depending on the request type), but will not be
441  * called during or after a call to GSF_pending_request_cancel 
442  * and will also not be called anymore after a call signalling
443  * expiration.
444  *
445  * @param cls user-specified closure
446  * @param pr handle to the original pending request
447  * @param data response data, NULL on request expiration
448  * @param data_len number of bytes in data
449  */
450 static void
451 handle_p2p_reply (void *cls,
452                   struct GSF_PendingRequest *pr,
453                   const void *data,
454                   size_t data_len)
455 {
456 #if SUPPORT_DELAYS  
457   struct GNUNET_TIME_Relative art_delay;
458 #endif
459
460   /* FIXME: adapt code fragments below to new API! */
461
462
463   /* reply will go over the network, check for cover traffic */
464   if ( (prq->anonymity_level >  1) &&
465        (cover_content_count < prq->anonymity_level - 1) )
466     {
467       /* insufficient cover traffic, skip */
468       GNUNET_STATISTICS_update (stats,
469                                 gettext_noop ("# replies suppressed due to lack of cover traffic"),
470                                 1,
471                                 GNUNET_NO);
472       return GNUNET_YES;
473     }   
474   if (prq->anonymity_level >  1) 
475     cover_content_count -= prq->anonymity_level - 1;
476
477
478       cp = pr->cp;
479 #if DEBUG_FS
480       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
481                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
482                   GNUNET_h2s (key),
483                   (unsigned int) cp->pid);
484 #endif  
485       GNUNET_STATISTICS_update (stats,
486                                 gettext_noop ("# replies received for other peers"),
487                                 1,
488                                 GNUNET_NO);
489       msize = sizeof (struct PutMessage) + prq->size;
490       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
491       reply->cont = &transmit_reply_continuation;
492       reply->cont_cls = pr;
493 #if SUPPORT_DELAYS
494       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
495                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
496                                                                            TTL_DECREMENT));
497       reply->delay_until 
498         = GNUNET_TIME_relative_to_absolute (art_delay);
499       GNUNET_STATISTICS_update (stats,
500                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
501                                 art_delay.abs_value,
502                                 GNUNET_NO);
503 #endif
504       reply->msize = msize;
505       reply->priority = UINT32_MAX; /* send replies first! */
506       pm = (struct PutMessage*) &reply[1];
507       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
508       pm->header.size = htons (msize);
509       pm->type = htonl (prq->type);
510       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
511       memcpy (&pm[1], prq->data, prq->size);
512       add_to_pending_messages_for_peer (cp, reply, pr);
513
514
515 }
516
517
518
519 /**
520  * Handle P2P "QUERY" message.
521  *
522  * @param other the other peer involved (sender or receiver, NULL
523  *        for loopback messages where we are both sender and receiver)
524  * @param message the actual message
525  * @return pending request handle, NULL on error
526  */
527 struct GSF_PendingRequest *
528 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
529                        const struct GNUNET_MessageHeader *message)
530 {
531   /* FIXME: adapt old code to new API! */
532   struct PendingRequest *pr;
533   struct ConnectedPeer *cp;
534   struct ConnectedPeer *cps;
535   struct CheckDuplicateRequestClosure cdc;
536   struct GNUNET_TIME_Relative timeout;
537   uint16_t msize;
538   const struct GetMessage *gm;
539   unsigned int bits;
540   const GNUNET_HashCode *opt;
541   uint32_t bm;
542   size_t bfsize;
543   uint32_t ttl_decrement;
544   int32_t priority;
545   enum GNUNET_BLOCK_Type type;
546   int have_ns;
547
548   msize = ntohs(message->size);
549   if (msize < sizeof (struct GetMessage))
550     {
551       GNUNET_break_op (0);
552       return GNUNET_SYSERR;
553     }
554   gm = (const struct GetMessage*) message;
555 #if DEBUG_FS
556   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
557               "Received request for `%s'\n",
558               GNUNET_h2s (&gm->query));
559 #endif
560   type = ntohl (gm->type);
561   bm = ntohl (gm->hash_bitmap);
562   bits = 0;
563   while (bm > 0)
564     {
565       if (1 == (bm & 1))
566         bits++;
567       bm >>= 1;
568     }
569   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
570     {
571       GNUNET_break_op (0);
572       return GNUNET_SYSERR;
573     }  
574   opt = (const GNUNET_HashCode*) &gm[1];
575   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
576   /* bfsize must be power of 2, check! */
577   if (0 != ( (bfsize - 1) & bfsize))
578     {
579       GNUNET_break_op (0);
580       return GNUNET_SYSERR;
581     }
582   cover_query_count++;
583   bm = ntohl (gm->hash_bitmap);
584   bits = 0;
585   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
586                                            &other->hashPubKey);
587   if (NULL == cps)
588     {
589       /* peer must have just disconnected */
590       GNUNET_STATISTICS_update (stats,
591                                 gettext_noop ("# requests dropped due to initiator not being connected"),
592                                 1,
593                                 GNUNET_NO);
594       return GNUNET_SYSERR;
595     }
596   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
597     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
598                                             &opt[bits++]);
599   else
600     cp = cps;
601   if (cp == NULL)
602     {
603 #if DEBUG_FS
604       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
605         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
607                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
608       
609       else
610         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
611                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
612                     GNUNET_i2s (other));
613 #endif
614       GNUNET_STATISTICS_update (stats,
615                                 gettext_noop ("# requests dropped due to missing reverse route"),
616                                 1,
617                                 GNUNET_NO);
618      /* FIXME: try connect? */
619       return GNUNET_OK;
620     }
621   /* note that we can really only check load here since otherwise
622      peers could find out that we are overloaded by not being
623      disconnected after sending us a malformed query... */
624   priority = bound_priority (ntohl (gm->priority), cps);
625   if (priority < 0)
626     {
627 #if DEBUG_FS
628       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629                   "Dropping query from `%s', this peer is too busy.\n",
630                   GNUNET_i2s (other));
631 #endif
632       return GNUNET_OK;
633     }
634 #if DEBUG_FS 
635   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
637               GNUNET_h2s (&gm->query),
638               (unsigned int) type,
639               GNUNET_i2s (other),
640               (unsigned int) bm);
641 #endif
642   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
643   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
644                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
645   if (have_ns)
646     {
647       pr->namespace = (GNUNET_HashCode*) &pr[1];
648       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
649     }
650   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
651        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
652         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
653     {
654       /* don't have BW to send to peer, or would likely take longer than we have for it,
655          so at best indirect the query */
656       priority = 0;
657       pr->forward_only = GNUNET_YES;
658     }
659   pr->type = type;
660   pr->mingle = ntohl (gm->filter_mutator);
661   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
662     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
663   pr->anonymity_level = 1;
664   pr->priority = (uint32_t) priority;
665   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
666   pr->query = gm->query;
667   /* decrement ttl (always) */
668   ttl_decrement = 2 * TTL_DECREMENT +
669     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
670                               TTL_DECREMENT);
671   if ( (pr->ttl < 0) &&
672        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
673     {
674 #if DEBUG_FS
675       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
677                   GNUNET_i2s (other),
678                   pr->ttl,
679                   ttl_decrement);
680 #endif
681       GNUNET_STATISTICS_update (stats,
682                                 gettext_noop ("# requests dropped due TTL underflow"),
683                                 1,
684                                 GNUNET_NO);
685       /* integer underflow => drop (should be very rare)! */      
686       GNUNET_free (pr);
687       return GNUNET_OK;
688     } 
689   pr->ttl -= ttl_decrement;
690   pr->start_time = GNUNET_TIME_absolute_get ();
691
692   /* get bloom filter */
693   if (bfsize > 0)
694     {
695       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
696                                                   bfsize,
697                                                   BLOOMFILTER_K);
698       pr->bf_size = bfsize;
699     }
700   cdc.have = NULL;
701   cdc.pr = pr;
702   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
703                                               &gm->query,
704                                               &check_duplicate_request_peer,
705                                               &cdc);
706   if (cdc.have != NULL)
707     {
708       if (cdc.have->start_time.abs_value + cdc.have->ttl >=
709           pr->start_time.abs_value + pr->ttl)
710         {
711           /* existing request has higher TTL, drop new one! */
712           cdc.have->priority += pr->priority;
713           destroy_pending_request (pr);
714 #if DEBUG_FS
715           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
716                       "Have existing request with higher TTL, dropping new request.\n",
717                       GNUNET_i2s (other));
718 #endif
719           GNUNET_STATISTICS_update (stats,
720                                     gettext_noop ("# requests dropped due to higher-TTL request"),
721                                     1,
722                                     GNUNET_NO);
723           return GNUNET_OK;
724         }
725       else
726         {
727           /* existing request has lower TTL, drop old one! */
728           pr->priority += cdc.have->priority;
729           /* Possible optimization: if we have applicable pending
730              replies in 'cdc.have', we might want to move those over
731              (this is a really rare special-case, so it is not clear
732              that this would be worth it) */
733           destroy_pending_request (cdc.have);
734           /* keep processing 'pr'! */
735         }
736     }
737
738   pr->cp = cp;
739   GNUNET_break (GNUNET_OK ==
740                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
741                                                    &gm->query,
742                                                    pr,
743                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
744   GNUNET_break (GNUNET_OK ==
745                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
746                                                    &other->hashPubKey,
747                                                    pr,
748                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
749   
750   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
751                                             pr,
752                                             pr->start_time.abs_value + pr->ttl);
753
754   GNUNET_STATISTICS_update (stats,
755                             gettext_noop ("# P2P searches received"),
756                             1,
757                             GNUNET_NO);
758   GNUNET_STATISTICS_update (stats,
759                             gettext_noop ("# P2P searches active"),
760                             1,
761                             GNUNET_NO);
762
763   /* calculate change in traffic preference */
764   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
765   /* process locally */
766   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
767     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
768   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
769                                            (pr->priority + 1)); 
770   if (GNUNET_YES != pr->forward_only)
771     {
772 #if DEBUG_FS
773       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774                   "Handing request for `%s' to datastore\n",
775                   GNUNET_h2s (&gm->query));
776 #endif
777       pr->qe = GNUNET_DATASTORE_get (dsh,
778                                      &gm->query,
779                                      type,                             
780                                      pr->priority + 1,
781                                      MAX_DATASTORE_QUEUE,                                
782                                      timeout,
783                                      &process_local_reply,
784                                      pr);
785       if (NULL == pr->qe)
786         {
787           GNUNET_STATISTICS_update (stats,
788                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
789                                     1,
790                                     GNUNET_NO);
791         }
792     }
793   else
794     {
795       GNUNET_STATISTICS_update (stats,
796                                 gettext_noop ("# requests forwarded due to high load"),
797                                 1,
798                                 GNUNET_NO);
799     }
800
801   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
802   switch (pr->type)
803     {
804     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
805     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
806       /* only one result, wait for datastore */
807       if (GNUNET_YES != pr->forward_only)
808         {
809           GNUNET_STATISTICS_update (stats,
810                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
811                                     1,
812                                     GNUNET_NO);
813           break;
814         }
815     default:
816       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
817         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
818                                              pr);
819     }
820
821   /* make sure we don't track too many requests */
822   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
823     {
824       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
825       GNUNET_assert (pr != NULL);
826       destroy_pending_request (pr);
827     }
828   return GNUNET_OK;
829
830
831
832   // FIXME!
833   // parse request
834   // setup pending request (use 'handle_p2p_reply')
835   // track pending request to cancel it on peer disconnect (!)
836   // return it!
837   // (actual planning & execution up to caller!)
838   return NULL;
839 }
840
841
842 /**
843  * Function called if there has been a timeout trying to satisfy
844  * a transmission request.
845  *
846  * @param cls the 'struct GSF_PeerTransmitHandle' of the request 
847  * @param tc scheduler context
848  */
849 static void
850 peer_transmit_timeout (void *cls,
851                        const struct GNUNET_SCHEDULER_TaskContext *tc)
852 {
853   struct GSF_PeerTransmitHandle *pth = cls;
854   struct GSF_ConnectedPeer *cp;
855   
856   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
857   cp = pth->cp;
858   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
859                                cp->pth_tail,
860                                pth);
861   if (pth->is_query)
862     GNUNET_assert (0 < cp->ppd.pending_queries--);    
863   else
864     GNUNET_assert (0 < cp->ppd.pending_replies--);
865   GNUNET_LOAD_update (cp->ppd.transmission_delay,
866                       UINT64_MAX);
867   pth->gmc (pth->gmc_cls, 
868             0, NULL);
869   GNUNET_free (pth);
870 }
871
872
873 /**
874  * Transmit a message to the given peer as soon as possible.
875  * If the peer disconnects before the transmission can happen,
876  * the callback is invoked with a 'NULL' buffer.
877  *
878  * @param peer target peer
879  * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO)
880  * @param priority how important is this request?
881  * @param timeout when does this request timeout (call gmc with error)
882  * @param size number of bytes we would like to send to the peer
883  * @param gmc function to call to get the message
884  * @param gmc_cls closure for gmc
885  * @return handle to cancel request
886  */
887 struct GSF_PeerTransmitHandle *
888 GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
889                     int is_query,
890                     uint32_t priority,
891                     struct GNUNET_TIME_Relative timeout,
892                     size_t size,
893                     GSF_GetMessageCallback gmc,
894                     void *gmc_cls)
895 {
896   struct GSF_ConnectedPeer *cp;
897   struct GSF_PeerTransmitHandle *pth;
898   struct GSF_PeerTransmitHandle *pos;
899   struct GSF_PeerTransmitHandle *prev;
900   struct GNUNET_PeerIdentity target;
901   uint64_t ip;
902   int is_ready;
903
904   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
905                                           &peer->hashPubKey);
906   GNUNET_assert (NULL != cp);
907   pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
908   pth->transmission_request_start_time = GNUNET_TIME_absolute_now ();
909   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
910   pth->gmc = gmc;
911   pth->gmc_cls = gmc_cls;
912   pth->size = size;
913   pth->is_query = is_query;
914   pth->priority = priority;
915   pth->cp = cp;
916   /* insertion sort (by priority, descending) */
917   prev = NULL;
918   pos = cp->pth_head;
919   while ( (pos != NULL) &&
920           (pos->priority > priority) )
921     {
922       prev = pos;
923       pos = pos->next;
924     }
925   if (prev == NULL)
926     GNUNET_CONTAINER_DLL_insert_head (cp->pth_head,
927                                       cp->pth_tail,
928                                       pth);
929   else
930     GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
931                                        cp->pth_tail,
932                                        prev,
933                                        pth);
934   GNUNET_PEER_resolve (cp->pid,
935                        &target);
936   if (is_query)
937     {
938       /* query, need reservation */
939       if (NULL == cp->irc)
940         {
941           /* reservation already done! */
942           is_ready = GNUNET_YES;
943           ip = cp->inc_preference;
944           cp->inc_preference = 0;
945           cp->irc = GNUNET_CORE_peer_change_preference (core,
946                                                         peer,
947                                                         GNUNET_TIME_UNIT_FOREVER_REL,
948                                                         GNUNET_BANDWIDTH_VALUE_MAX,
949                                                         GNUNET_FS_DBLOCK_SIZE,
950                                                         ip,
951                                                         &core_reserve_callback,
952                                                         cp);      
953         }
954       else
955         {
956           /* still waiting for reservation */
957           is_ready = GNUNET_NO;
958         }
959     }
960   else
961     {
962       /* no reservation needed for content */
963       is_ready = GNUNET_YES;
964     }
965   if (is_ready)
966     {
967       pth->cth = GNUNET_CORE_notify_transmit_ready (core,
968                                                     priority,
969                                                     timeout,
970                                                     &target,
971                                                     size,
972                                                     &peer_transmit_ready_cb,
973                                                     pth);
974       /* pth->cth could be NULL here, that's OK, we'll try again
975          later... */
976     }
977   if (pth->cth == NULL)
978     {
979       /* if we're waiting for reservation OR if we could not do notify_transmit_ready,
980          install a timeout task to be on the safe side */
981       pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
982                                                         &peer_transmit_timeout,
983                                                         pth);
984     }
985   return pth;
986 }
987
988
989 /**
990  * Cancel an earlier request for transmission.
991  *
992  * @param pth request to cancel
993  */
994 void
995 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
996 {
997   struct GSF_PeerTransmitHandle *pth = cls;
998   struct GSF_ConnectedPeer *cp;
999
1000   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1001     {
1002       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1003       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1004     }
1005   if (NULL != pth->cth)
1006     {
1007       GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
1008       pth->cth = NULL;
1009     }
1010   cp = pth->cp;
1011   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1012                                cp->pth_tail,
1013                                pth);
1014   if (pth->is_query)
1015     GNUNET_assert (0 < cp->ppd.pending_queries--);    
1016   else
1017     GNUNET_assert (0 < cp->ppd.pending_replies--);
1018   GNUNET_free (pth);
1019 }
1020
1021
1022 /**
1023  * Report on receiving a reply; update the performance record of the given peer.
1024  *
1025  * @param cp responding peer (will be updated)
1026  * @param request_time time at which the original query was transmitted
1027  * @param request_priority priority of the original request
1028  * @param initiator_client local client on responsible for query (or NULL)
1029  * @param initiator_peer other peer responsible for query (or NULL)
1030  */
1031 void
1032 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1033                               struct GNUNET_TIME_Absolute request_time,
1034                               uint32_t request_priority,
1035                               const struct GSF_LocalClient *initiator_client,
1036                               const struct GSF_ConnectedPeer *initiator_peer)
1037 {
1038   struct GNUNET_TIME_Relative delay;
1039   unsigned int i;
1040
1041   delay = GNUNET_TIME_absolute_get_duration (request_time);  
1042   cp->ppd.avg_reply_delay = (cp->ppd.avg_reply_delay * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
1043   cp->ppd.avg_priority = (cp->avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
1044   if (NULL != initiator_client)
1045     {
1046       cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1047     }
1048   else if (NULL != initiator_peer)
1049     {
1050       GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
1051       cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid;
1052       GNUNET_PEER_change_rc (initiator_peer->pid, 1);
1053     }
1054   else
1055     GNUNET_break (0);
1056 }
1057
1058
1059 /**
1060  * Method called whenever a given peer has a status change.
1061  *
1062  * @param cls closure
1063  * @param peer peer identity this notification is about
1064  * @param bandwidth_in available amount of inbound bandwidth
1065  * @param bandwidth_out available amount of outbound bandwidth
1066  * @param timeout absolute time when this peer will time out
1067  *        unless we see some further activity from it
1068  * @param atsi status information
1069  */
1070 void
1071 GSF_peer_status_handler_ (void *cls,
1072                           const struct GNUNET_PeerIdentity *peer,
1073                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1074                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1075                           struct GNUNET_TIME_Absolute timeout,
1076                           const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1077 {
1078   struct GSF_ConnectedPeer *cp;
1079
1080   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1081                                           &peer->hashPubKey);
1082   GNUNET_assert (NULL != cp);
1083   update_atsi (cp, atsi);
1084 }
1085
1086
1087 /**
1088  * A peer disconnected from us.  Tear down the connected peer
1089  * record.
1090  *
1091  * @param cls unused
1092  * @param peer identity of peer that connected
1093  */
1094 void
1095 GSF_peer_disconnect_handler_ (void *cls,
1096                               const struct GNUNET_PeerIdentity *peer)
1097 {
1098   struct GSF_ConnectedPeer *cp;
1099   struct GSF_PeerTransmitHandle *pth;
1100
1101   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1102                                           &peer->hashPubKey);
1103   GNUNET_assert (NULL != cp);
1104   GNUNET_CONTAINER_multihashmap_remove (cp_map,
1105                                         &peer->hashPubKey,
1106                                         cp);
1107   if (NULL != cp->irc)
1108     {
1109       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1110       cp->irc = NULL;
1111     }
1112   GSF_plan_notify_peer_disconnect_ (cp);
1113   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1114   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1115   while (NULL != (pth = cp->pth_head))
1116     {
1117       if (NULL != pth->th)
1118         {
1119           GNUNET_CORE_notify_transmit_ready_cancel (pth->th);
1120           pth->th = NULL;
1121         }
1122       GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1123                                    cp->pth_tail,
1124                                    pth);
1125       GNUNET_free (pth);
1126     }
1127   GNUNET_PEER_change_rc (cp->pid, -1);
1128   GNUNET_free (cp);
1129 }
1130
1131
1132 /**
1133  * Closure for 'call_iterator'.
1134  */
1135 struct IterationContext
1136 {
1137   /**
1138    * Function to call on each entry.
1139    */
1140   GSF_ConnectedPeerIterator it;
1141
1142   /**
1143    * Closure for 'it'.
1144    */
1145   void *it_cls;
1146 };
1147
1148
1149 /**
1150  * Function that calls the callback for each peer.
1151  *
1152  * @param cls the 'struct IterationContext*'
1153  * @param key identity of the peer
1154  * @param value the 'struct GSF_ConnectedPeer*'
1155  * @return GNUNET_YES to continue iteration
1156  */
1157 static int
1158 call_iterator (void *cls,
1159                const GNUNET_HashCode *key,
1160                void *value)
1161 {
1162   struct IterationContext *ic = cls;
1163   struct GSF_ConnectedPeer *cp = value;
1164   
1165   ic->it (ic->it_cls,
1166           (const struct GNUNET_PeerIdentity*) key,
1167           cp,
1168           &cp->ppd);
1169   return GNUNET_YES;
1170 }
1171
1172
1173 /**
1174  * Iterate over all connected peers.
1175  *
1176  * @param it function to call for each peer
1177  * @param it_cls closure for it
1178  */
1179 void
1180 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1181                               void *it_cls)
1182 {
1183   struct IterationContext ic;
1184
1185   ic.it = it;
1186   ic.it_cls = it_cls;
1187   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1188                                          &call_iterator,
1189                                          &ic);
1190 }
1191
1192
1193 /**
1194  * Obtain the identity of a connected peer.
1195  *
1196  * @param cp peer to reserve bandwidth from
1197  * @param id identity to set (written to)
1198  */
1199 void
1200 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1201                                   struct GNUNET_PeerIdentity *id)
1202 {
1203   GNUNET_PEER_resolve (cp->pid,
1204                        &id);
1205 }
1206
1207
1208 /**
1209  * Ask a peer to stop migrating data to us until the given point
1210  * in time.
1211  * 
1212  * @param cp peer to ask
1213  * @param block_time until when to block
1214  */
1215 void
1216 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1217                            struct GNUNET_TIME_Relative block_time)
1218 {
1219   struct PendingMessage *pm;
1220   struct MigrationStopMessage *msm;
1221
1222   if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value)
1223     return; /* already blocked */
1224   cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
1225
1226   /* FIXME: adapt old code below to new API! */
1227   pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
1228                       sizeof (struct MigrationStopMessage));
1229   pm->msize = sizeof (struct MigrationStopMessage);
1230   pm->priority = UINT32_MAX;
1231   msm = (struct MigrationStopMessage*) &pm[1];
1232   msm->header.size = htons (sizeof (struct MigrationStopMessage));
1233   msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1234   msm->duration = GNUNET_TIME_relative_hton (block_time);
1235   add_to_pending_messages_for_peer (cp,
1236                                     pm,
1237                                     NULL);
1238 }
1239
1240
1241
1242
1243 /**
1244  * Write host-trust information to a file - flush the buffer entry!
1245  *
1246  * @param cls closure, not used
1247  * @param key host identity
1248  * @param value the 'struct GSF_ConnectedPeer' to flush
1249  * @return GNUNET_OK to continue iteration
1250  */
1251 static int
1252 flush_trust (void *cls,
1253              const GNUNET_HashCode *key,
1254              void *value)
1255 {
1256   struct GSF_ConnectedPeer *cp = value;
1257   char *fn;
1258   uint32_t trust;
1259   struct GNUNET_PeerIdentity pid;
1260
1261   if (cp->trust == cp->disk_trust)
1262     return GNUNET_OK;                     /* unchanged */
1263   GNUNET_PEER_resolve (cp->pid,
1264                        &pid);
1265   fn = get_trust_filename (&pid);
1266   if (cp->trust == 0)
1267     {
1268       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1269         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1270                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1271     }
1272   else
1273     {
1274       trust = htonl (cp->trust);
1275       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1276                                                     sizeof(uint32_t),
1277                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1278                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1279         cp->disk_trust = cp->trust;
1280     }
1281   GNUNET_free (fn);
1282   return GNUNET_OK;
1283 }
1284
1285
1286 /**
1287  * Notify core about a preference we have for the given peer
1288  * (to allocate more resources towards it).  The change will
1289  * be communicated the next time we reserve bandwidth with
1290  * core (not instantly).
1291  *
1292  * @param cp peer to reserve bandwidth from
1293  * @param pref preference change
1294  */
1295 void
1296 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1297                                        uint64_t pref)
1298 {
1299   cp->inc_preference += pref;
1300 }
1301
1302
1303 /**
1304  * Call this method periodically to flush trust information to disk.
1305  *
1306  * @param cls closure, not used
1307  * @param tc task context, not used
1308  */
1309 static void
1310 cron_flush_trust (void *cls,
1311                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1312 {
1313
1314   if (NULL == cp_map)
1315     return;
1316   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1317                                          &flush_trust,
1318                                          NULL);
1319   if (NULL == tc)
1320     return;
1321   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1322     return;
1323   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, 
1324                                 &cron_flush_trust, 
1325                                 NULL);
1326 }
1327
1328
1329 /**
1330  * Initialize peer management subsystem.
1331  *
1332  * @param cfg configuration to use
1333  */
1334 void
1335 GSF_connected_peer_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
1336 {
1337   cp_map = GNUNET_CONTAINER_multihashmap_create (128);
1338   GNUNET_assert (GNUNET_OK ==
1339                  GNUNET_CONFIGURATION_get_value_filename (cfg,
1340                                                           "fs",
1341                                                           "TRUST",
1342                                                           &trustDirectory));
1343   GNUNET_DISK_directory_create (trustDirectory);
1344   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1345                                       &cron_flush_trust, NULL);
1346 }
1347
1348
1349 /**
1350  * Iterator to free peer entries.
1351  *
1352  * @param cls closure, unused
1353  * @param key current key code
1354  * @param value value in the hash map (peer entry)
1355  * @return GNUNET_YES (we should continue to iterate)
1356  */
1357 static int 
1358 clean_peer (void *cls,
1359             const GNUNET_HashCode * key,
1360             void *value)
1361 {
1362   GSF_peer_disconnect_handler_ (NULL, 
1363                                 (const struct GNUNET_PeerIdentity*) key);
1364   return GNUNET_YES;
1365 }
1366
1367
1368 /**
1369  * Shutdown peer management subsystem.
1370  */
1371 void
1372 GSF_connected_peer_done_ ()
1373 {
1374   cron_flush_trust (NULL, NULL);
1375   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1376                                          &clean_peer,
1377                                          NULL);
1378   GNUNET_CONTAINER_multihashmap_destroy (cp_map);
1379   cp_map = NULL;
1380   GNUNET_free (trustDirectory);
1381   trustDirectory = NULL;
1382 }
1383
1384
1385 /**
1386  * Iterator to remove references to LC entry.
1387  *
1388  * @param the 'struct GSF_LocalClient*' to look for
1389  * @param key current key code
1390  * @param value value in the hash map (peer entry)
1391  * @return GNUNET_YES (we should continue to iterate)
1392  */
1393 static int 
1394 clean_local_client (void *cls,
1395                     const GNUNET_HashCode * key,
1396                     void *value)
1397 {
1398   const struct GSF_LocalClient *lc = cls;
1399   struct GSF_ConnectedPeer *cp = value;
1400   unsigned int i;
1401
1402   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1403     if (cp->ppd.last_client_replies[i] == lc)
1404       cp->ppd.last_client_replies[i] = NULL;
1405   return GNUNET_YES;
1406 }
1407
1408
1409 /**
1410  * Notification that a local client disconnected.  Clean up all of our
1411  * references to the given handle.
1412  *
1413  * @param lc handle to the local client (henceforth invalid)
1414  */
1415 void
1416 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1417 {
1418   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1419                                          &clean_local_client,
1420                                          (void*) lc);
1421 }
1422
1423
1424 /* end of gnunet-service-fs_cp.c */