stuff
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29
30 /**
31  * How often do we flush trust values to disk?
32  */
33 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
34
35
36 /**
37  * Handle to cancel a transmission request.
38  */
39 struct GSF_PeerTransmitHandle
40 {
41
42   /**
43    * Handle for an active request for transmission to this
44    * peer, or NULL (if core queue was full).
45    */
46   struct GNUNET_CORE_TransmitHandle *cth;
47
48   /**
49    * Time when this transmission request was issued.
50    */
51   struct GNUNET_TIME_Absolute transmission_request_start_time;
52
53   /**
54    * Timeout for this request.
55    */
56   struct GNUNET_TIME_Absolute timeout;
57
58   /**
59    * Task called on timeout, or 0 for none.
60    */
61   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
62
63   /**
64    * Function to call to get the actual message.
65    */
66   GSF_GetMessageCallback gmc;
67
68   /**
69    * Peer this request targets.
70    */
71   struct GSF_ConnectedPeer *cp;
72
73   /**
74    * Closure for 'gmc'.
75    */
76   void *gmc_cls;
77
78   /**
79    * Size of the message to be transmitted.
80    */
81   size_t size;
82
83   /**
84    * GNUNET_YES if this is a query, GNUNET_NO for content.
85    */
86   int is_query;
87
88   /**
89    * Priority of this request.
90    */
91   uint32_t priority;
92
93 };
94
95
96 /**
97  * A connected peer.
98  */
99 struct GSF_ConnectedPeer 
100 {
101
102   /**
103    * Performance data for this peer.
104    */
105   struct GSF_PeerPerformanceData ppd;
106
107   /**
108    * Time until when we blocked this peer from migrating
109    * data to us.
110    */
111   struct GNUNET_TIME_Absolute last_migration_block;
112
113   /**
114    * Messages (replies, queries, content migration) we would like to
115    * send to this peer in the near future.  Sorted by priority, head.
116    */
117   struct GSF_PeerTransmitHandle *pth_head;
118
119   /**
120    * Messages (replies, queries, content migration) we would like to
121    * send to this peer in the near future.  Sorted by priority, tail.
122    */
123   struct GSF_PeerTransmitHandle *pth_tail;
124
125   /**
126    * Migration stop message in our queue, or NULL if we have none pending.
127    */
128   struct GSF_PeerTransmitHandle *migration_pth;
129
130   /**
131    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
132    * NULL if we have successfully reserved 32k, otherwise non-NULL.
133    */
134   struct GNUNET_CORE_InformationRequestContext *irc;
135
136   /**
137    * Active requests from this neighbour.
138    */
139   struct GNUNET_CONTAINER_MulitHashMap *request_map;
140
141   /**
142    * ID of delay task for scheduling transmission.
143    */
144   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: used in 'push' (ugh!)
145
146   /**
147    * Increase in traffic preference still to be submitted
148    * to the core service for this peer.
149    */
150   uint64_t inc_preference;
151
152   /**
153    * Trust rating for this peer on disk.
154    */
155   uint32_t disk_trust;
156
157   /**
158    * The peer's identity.
159    */
160   GNUNET_PEER_Id pid;
161
162   /**
163    * Which offset in "last_p2p_replies" will be updated next?
164    * (we go round-robin).
165    */
166   unsigned int last_p2p_replies_woff;
167
168   /**
169    * Which offset in "last_client_replies" will be updated next?
170    * (we go round-robin).
171    */
172   unsigned int last_client_replies_woff;
173
174   /**
175    * Current offset into 'last_request_times' ring buffer.
176    */
177   unsigned int last_request_times_off;
178
179 };
180
181
182 /**
183  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
184  */
185 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
186
187
188 /**
189  * Where do we store trust information?
190  */
191 static char *trustDirectory;
192
193
194 /**
195  * Get the filename under which we would store the GNUNET_HELLO_Message
196  * for the given host and protocol.
197  * @return filename of the form DIRECTORY/HOSTID
198  */
199 static char *
200 get_trust_filename (const struct GNUNET_PeerIdentity *id)
201 {
202   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
203   char *fn;
204
205   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
206   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
207   return fn;
208 }
209
210
211 /**
212  * Find latency information in 'atsi'.
213  *
214  * @param atsi performance data
215  * @return connection latency
216  */
217 static struct GNUNET_TIME_Relative
218 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
219 {
220   if (atsi == NULL)
221     return GNUNET_TIME_UNIT_SECONDS;
222   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
223           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
224     atsi++;
225   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
226     {
227       GNUNET_break (0);
228       /* how can we not have latency data? */
229       return GNUNET_TIME_UNIT_SECONDS;
230     }
231   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
232                                         ntohl (atsi->value));
233 }
234
235
236 /**
237  * Update the performance information kept for the given peer.
238  *
239  * @param cp peer record to update
240  * @param atsi transport performance data
241  */
242 static void
243 update_atsi (struct GSF_ConnectedPeer *cp,
244              const struct GNUNET_TRANSPORT_ATS_Information *atsi)
245 {
246   struct GNUNET_TIME_Relative latency;
247
248   latency = get_latency (atsi);
249   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
250                                  latency);
251   /* LATER: merge atsi into cp's performance data (if we ever care...) */
252 }
253
254
255 /**
256  * Return the performance data record for the given peer
257  * 
258  * @param cp peer to query
259  * @return performance data record for the peer
260  */
261 struct GSF_PeerPerformanceData *
262 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
263 {
264   return &cp->ppd;
265 }
266
267
268 /**
269  * Core is ready to transmit to a peer, get the message.
270  *
271  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
272  * @param size number of bytes core is willing to take
273  * @param buf where to copy the message
274  * @return number of bytes copied to buf
275  */
276 static size_t
277 peer_transmit_ready_cb (void *cls,
278                         size_t size,
279                         void *buf)
280 {
281   struct GSF_PeerTransmitHandle *pth = cls;
282   struct GSF_ConnectedPeer *cp;
283   size_t ret;
284
285   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
286     {
287       GNUNET_SCHEDULER_cancel (pth->timeout_task);
288       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
289     }
290   cp = pth->cp;
291   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
292                                cp->pth_tail,
293                                pth);
294   if (GNUNET_YES == pth->is_query)
295     {
296       cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
297       GNUNET_assert (0 < cp->ppd.pending_queries--);    
298     }
299   else if (GNUNET_NO == pth->is_query)
300     {
301       GNUNET_assert (0 < cp->ppd.pending_replies--);
302     }
303   GNUNET_LOAD_update (cp->ppd.transmission_delay,
304                       GNUNET_TIME_absolute_get_duration (pth->request_start_time).rel_value);  
305   ret = pth->gmc (pth->gmc_cls, 
306                   0, NULL);
307   GNUNET_free (pth);  
308   return ret;
309 }
310
311
312 /**
313  * Function called by core upon success or failure of our bandwidth reservation request.
314  *
315  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
316  * @param peer identifies the peer
317  * @param bandwidth_out available amount of outbound bandwidth
318  * @param amount set to the amount that was actually reserved or unreserved;
319  *               either the full requested amount or zero (no partial reservations)
320  * @param preference current traffic preference for the given peer
321  */
322 static void
323 core_reserve_callback (void *cls,
324                        const struct GNUNET_PeerIdentity * peer,
325                        struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
326                        int amount,
327                        uint64_t preference)
328 {
329   struct GSF_ConnectedPeer *cp = cls;
330   uint64_t ip;
331
332   cp->irc = NULL;
333   if (0 == amount)
334     {
335       /* failed; retry! (how did we get here!?) */
336       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
337                   _("Failed to reserve bandwidth to peer `%s'\n"),
338                   GNUNET_i2s (peer));
339       ip = cp->inc_preference;
340       cp->inc_preference = 0;
341       cp->irc = GNUNET_CORE_peer_change_preference (core,
342                                                     peer,
343                                                     GNUNET_TIME_UNIT_FOREVER_REL,
344                                                     GNUNET_BANDWIDTH_VALUE_MAX,
345                                                     GNUNET_FS_DBLOCK_SIZE,
346                                                     ip,
347                                                     &core_reserve_callback,
348                                                     cp);
349       return;
350     }
351   pth = cp->pth_head;
352   if ( (NULL != pth) &&
353        (NULL == pth->cth) )
354     {
355       /* reservation success, try transmission now! */
356       pth->cth = GNUNET_CORE_notify_transmit_ready (core,
357                                                     priority,
358                                                     GNUNET_TIME_absolute_get_remaining (pth->timeout),
359                                                     &target,
360                                                     size,
361                                                     &peer_transmit_ready_cb,
362                                                     pth);
363     }
364 }
365
366
367 /**
368  * A peer connected to us.  Setup the connected peer
369  * records.
370  *
371  * @param peer identity of peer that connected
372  * @param atsi performance data for the connection
373  * @return handle to connected peer entry
374  */
375 struct GSF_ConnectedPeer *
376 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
377                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
378 {
379   struct GSF_ConnectedPeer *cp;
380   char *fn;
381   uint32_t trust;
382   struct GNUNET_TIME_Relative latency;
383
384   cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
385   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
386   cp->pid = GNUNET_PEER_intern (peer);
387   cp->transmission_delay = GNUNET_LOAD_value_init (0);
388   cp->irc = GNUNET_CORE_peer_change_preference (core,
389                                                 peer,
390                                                 GNUNET_TIME_UNIT_FOREVER_REL,
391                                                 GNUNET_BANDWIDTH_VALUE_MAX,
392                                                 GNUNET_FS_DBLOCK_SIZE,
393                                                 0,
394                                                 &core_reserve_callback,
395                                                 cp);
396   fn = get_trust_filename (peer);
397   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
398       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
399     cp->disk_trust = cp->trust = ntohl (trust);
400   GNUNET_free (fn);
401   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
402   GNUNET_break (GNUNET_OK ==
403                 GNUNET_CONTAINER_multihashmap_put (cp_map,
404                                                    &peer->hashPubKey,
405                                                    cp,
406                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
407   update_atsi (cp, atsi);
408   GSF_plan_notify_new_peer_ (cp);
409   return cp;
410 }
411
412
413 /**
414  * Handle P2P "MIGRATION_STOP" message.
415  *
416  * @param cls closure, always NULL
417  * @param other the other peer involved (sender or receiver, NULL
418  *        for loopback messages where we are both sender and receiver)
419  * @param message the actual message
420  * @param atsi performance information
421  * @return GNUNET_OK to keep the connection open,
422  *         GNUNET_SYSERR to close it (signal serious error)
423  */
424 int
425 GSF_handle_p2p_migration_stop_ (void *cls,
426                                 const struct GNUNET_PeerIdentity *other,
427                                 const struct GNUNET_MessageHeader *message,
428                                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
429 {
430   struct GSF_ConnectedPeer *cp; 
431   const struct MigrationStopMessage *msm;
432
433   msm = (const struct MigrationStopMessage*) message;
434   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
435                                           &other->hashPubKey);
436   if (cp == NULL)
437     {
438       GNUNET_break (0);
439       return GNUNET_OK;
440     }
441   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
442   update_atsi (cp, atsi);
443   return GNUNET_OK;
444 }
445
446
447 /**
448  * Handle a reply to a pending request.  Also called if a request
449  * expires (then with data == NULL).  The handler may be called
450  * many times (depending on the request type), but will not be
451  * called during or after a call to GSF_pending_request_cancel 
452  * and will also not be called anymore after a call signalling
453  * expiration.
454  *
455  * @param cls 'struct GSF_ConnectedPeer' of the peer that would
456  *            have liked an answer to the request
457  * @param pr handle to the original pending request
458  * @param data response data, NULL on request expiration
459  * @param data_len number of bytes in data
460  */
461 static void
462 handle_p2p_reply (void *cls,
463                   struct GSF_PendingRequest *pr,
464                   const void *data,
465                   size_t data_len)
466 {
467   struct GSF_ConnectedPeer *cp = cls;
468
469 #if SUPPORT_DELAYS  
470   struct GNUNET_TIME_Relative art_delay;
471 #endif
472
473   /* FIXME: adapt code fragments below to new API! */
474   if (NULL == data)
475     {
476       /* FIXME: request expired! clean up! */
477       GNUNET_STATISTICS_update (stats,
478                                 gettext_noop ("# P2P searches active"),
479                                 -1,
480                                 GNUNET_NO);
481       return;
482     }
483
484   /* reply will go over the network, check for cover traffic */
485   if ( (prq->anonymity_level >  1) &&
486        (cover_content_count < prq->anonymity_level - 1) )
487     {
488       /* insufficient cover traffic, skip */
489       GNUNET_STATISTICS_update (stats,
490                                 gettext_noop ("# replies suppressed due to lack of cover traffic"),
491                                 1,
492                                 GNUNET_NO);
493       return GNUNET_YES;
494     }   
495   if (prq->anonymity_level >  1) 
496     cover_content_count -= prq->anonymity_level - 1;
497
498
499       cp = pr->cp;
500 #if DEBUG_FS
501       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
502                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
503                   GNUNET_h2s (key),
504                   (unsigned int) cp->pid);
505 #endif  
506       GNUNET_STATISTICS_update (stats,
507                                 gettext_noop ("# replies received for other peers"),
508                                 1,
509                                 GNUNET_NO);
510       msize = sizeof (struct PutMessage) + prq->size;
511       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
512       reply->cont = &transmit_reply_continuation;
513       reply->cont_cls = pr;
514 #if SUPPORT_DELAYS
515       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
516                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
517                                                                            TTL_DECREMENT));
518       reply->delay_until 
519         = GNUNET_TIME_relative_to_absolute (art_delay);
520       GNUNET_STATISTICS_update (stats,
521                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
522                                 art_delay.abs_value,
523                                 GNUNET_NO);
524 #endif
525       reply->msize = msize;
526       reply->priority = UINT32_MAX; /* send replies first! */
527       pm = (struct PutMessage*) &reply[1];
528       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
529       pm->header.size = htons (msize);
530       pm->type = htonl (prq->type);
531       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
532       memcpy (&pm[1], prq->data, prq->size);
533       add_to_pending_messages_for_peer (cp, reply, pr);
534
535
536 }
537
538
539 /**
540  * Handle P2P "QUERY" message.  Creates the pending request entry
541  * and sets up all of the data structures to that we will
542  * process replies properly.  Does not initiate forwarding or
543  * local database lookups.
544  *
545  * @param other the other peer involved (sender or receiver, NULL
546  *        for loopback messages where we are both sender and receiver)
547  * @param message the actual message
548  * @return pending request handle, NULL on error
549  */
550 struct GSF_PendingRequest *
551 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
552                        const struct GNUNET_MessageHeader *message)
553 {
554   struct GSF_PendingRequest *pr;
555   struct GSF_PendingRequestData *prd;
556   struct GSF_ConnectedPeer *cp;
557   struct GSF_ConnectedPeer *cps;
558   GNUNET_HashCode *namespace;
559   struct GNUNET_PeerIdentity *target;
560   enum GSF_PendingRequestOptions options;                            
561   struct GNUNET_TIME_Relative timeout;
562   uint16_t msize;
563   const struct GetMessage *gm;
564   unsigned int bits;
565   const GNUNET_HashCode *opt;
566   uint32_t bm;
567   size_t bfsize;
568   uint32_t ttl_decrement;
569   int32_t priority;
570   int32_t ttl;
571   enum GNUNET_BLOCK_Type type;
572
573
574   msize = ntohs(message->size);
575   if (msize < sizeof (struct GetMessage))
576     {
577       GNUNET_break_op (0);
578       return GNUNET_SYSERR;
579     }
580   gm = (const struct GetMessage*) message;
581 #if DEBUG_FS
582   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
583               "Received request for `%s'\n",
584               GNUNET_h2s (&gm->query));
585 #endif
586   type = ntohl (gm->type);
587   bm = ntohl (gm->hash_bitmap);
588   bits = 0;
589   while (bm > 0)
590     {
591       if (1 == (bm & 1))
592         bits++;
593       bm >>= 1;
594     }
595   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
596     {
597       GNUNET_break_op (0);
598       return GNUNET_SYSERR;
599     }  
600   opt = (const GNUNET_HashCode*) &gm[1];
601   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
602   /* bfsize must be power of 2, check! */
603   if (0 != ( (bfsize - 1) & bfsize))
604     {
605       GNUNET_break_op (0);
606       return GNUNET_SYSERR;
607     }
608   cover_query_count++;
609   bm = ntohl (gm->hash_bitmap);
610   bits = 0;
611   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
612                                            &other->hashPubKey);
613   if (NULL == cps)
614     {
615       /* peer must have just disconnected */
616       GNUNET_STATISTICS_update (stats,
617                                 gettext_noop ("# requests dropped due to initiator not being connected"),
618                                 1,
619                                 GNUNET_NO);
620       return GNUNET_SYSERR;
621     }
622   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
623     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
624                                             &opt[bits++]);
625   else
626     cp = cps;
627   if (cp == NULL)
628     {
629 #if DEBUG_FS
630       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
631         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
633                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
634       
635       else
636         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
637                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
638                     GNUNET_i2s (other));
639 #endif
640       GNUNET_STATISTICS_update (stats,
641                                 gettext_noop ("# requests dropped due to missing reverse route"),
642                                 1,
643                                 GNUNET_NO);
644       return GNUNET_OK;
645     }
646   /* note that we can really only check load here since otherwise
647      peers could find out that we are overloaded by not being
648      disconnected after sending us a malformed query... */
649   priority = bound_priority (ntohl (gm->priority), cps);
650   if (priority < 0)
651     {
652 #if DEBUG_FS
653       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
654                   "Dropping query from `%s', this peer is too busy.\n",
655                   GNUNET_i2s (other));
656 #endif
657       return GNUNET_OK;
658     }
659 #if DEBUG_FS 
660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
661               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
662               GNUNET_h2s (&gm->query),
663               (unsigned int) type,
664               GNUNET_i2s (other),
665               (unsigned int) bm);
666 #endif
667   namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
668   target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
669   options = 0;
670   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
671        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
672         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
673     {
674       /* don't have BW to send to peer, or would likely take longer than we have for it,
675          so at best indirect the query */
676       priority = 0;
677       options |= GSF_PRO_FORWARD_ONLY;
678     }
679   ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
680   /* decrement ttl (always) */
681   ttl_decrement = 2 * TTL_DECREMENT +
682     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
683                               TTL_DECREMENT);
684   if ( (ttl < 0) &&
685        (((int32_t)(ttl - ttl_decrement)) > 0) )
686     {
687 #if DEBUG_FS
688       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
689                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
690                   GNUNET_i2s (other),
691                   ttl,
692                   ttl_decrement);
693 #endif
694       GNUNET_STATISTICS_update (stats,
695                                 gettext_noop ("# requests dropped due TTL underflow"),
696                                 1,
697                                 GNUNET_NO);
698       /* integer underflow => drop (should be very rare)! */      
699       return GNUNET_OK;
700     } 
701   ttl -= ttl_decrement;
702
703   /* test if the request already exists */
704   pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
705                                           &gm->query);
706   if (pr != NULL) 
707     {      
708       prd = GSF_pending_request_get_data_ (pr);
709       if ( (prd->type == type) &&
710            ( (type != GNUNET_BLOCK_TYPE_SBLOCK) ||
711              (0 == memcmp (prd->namespace,
712                            namespace,
713                            sizeof (GNUNET_HashCode))) ) )
714         {
715           if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl)
716             {
717               /* existing request has higher TTL, drop new one! */
718               prd->priority += priority;
719 #if DEBUG_FS
720               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
721                           "Have existing request with higher TTL, dropping new request.\n",
722                           GNUNET_i2s (other));
723 #endif
724               GNUNET_STATISTICS_update (stats,
725                                         gettext_noop ("# requests dropped due to higher-TTL request"),
726                                         1,
727                                         GNUNET_NO);
728               return GNUNET_OK;
729             }
730           /* existing request has lower TTL, drop old one! */
731           pr->priority += prd->priority;
732           GSF_pending_request_cancel_ (pr);
733           GNUNET_assert (GNUNET_YES ==
734                          GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
735                                                                &gm->query,
736                                                                pr));
737         }
738     }
739   
740   pr = GSF_pending_request_create (options,
741                                    type,
742                                    &gm->query,
743                                    namespace,
744                                    target,
745                                    (bf_size > 0) ? (const char*)&opt[bits] : NULL,
746                                    bf_size,
747                                    ntohl (gm->filter_mutator),
748                                    1 /* anonymity */
749                                    (uint32_t) priority,
750                                    ttl,
751                                    NULL, 0, /* replies_seen */
752                                    &handle_p2p_reply,
753                                    cp);
754   GNUNET_break (GNUNET_OK ==
755                 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
756                                                    &gm->query,
757                                                    pr,
758                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
759   GNUNET_STATISTICS_update (stats,
760                             gettext_noop ("# P2P searches received"),
761                             1,
762                             GNUNET_NO);
763   GNUNET_STATISTICS_update (stats,
764                             gettext_noop ("# P2P searches active"),
765                             1,
766                             GNUNET_NO);
767   return pr;
768 }
769
770
771 /**
772  * Function called if there has been a timeout trying to satisfy
773  * a transmission request.
774  *
775  * @param cls the 'struct GSF_PeerTransmitHandle' of the request 
776  * @param tc scheduler context
777  */
778 static void
779 peer_transmit_timeout (void *cls,
780                        const struct GNUNET_SCHEDULER_TaskContext *tc)
781 {
782   struct GSF_PeerTransmitHandle *pth = cls;
783   struct GSF_ConnectedPeer *cp;
784   
785   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
786   cp = pth->cp;
787   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
788                                cp->pth_tail,
789                                pth);
790   if (GNUNET_YES == pth->is_query)
791     GNUNET_assert (0 < cp->ppd.pending_queries--);    
792   else if (GNUNET_NO == pth->is_query)
793     GNUNET_assert (0 < cp->ppd.pending_replies--);
794   GNUNET_LOAD_update (cp->ppd.transmission_delay,
795                       UINT64_MAX);
796   pth->gmc (pth->gmc_cls, 
797             0, NULL);
798   GNUNET_free (pth);
799 }
800
801
802 /**
803  * Transmit a message to the given peer as soon as possible.
804  * If the peer disconnects before the transmission can happen,
805  * the callback is invoked with a 'NULL' buffer.
806  *
807  * @param peer target peer
808  * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
809  * @param priority how important is this request?
810  * @param timeout when does this request timeout (call gmc with error)
811  * @param size number of bytes we would like to send to the peer
812  * @param gmc function to call to get the message
813  * @param gmc_cls closure for gmc
814  * @return handle to cancel request
815  */
816 struct GSF_PeerTransmitHandle *
817 GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
818                     int is_query,
819                     uint32_t priority,
820                     struct GNUNET_TIME_Relative timeout,
821                     size_t size,
822                     GSF_GetMessageCallback gmc,
823                     void *gmc_cls)
824 {
825   struct GSF_ConnectedPeer *cp;
826   struct GSF_PeerTransmitHandle *pth;
827   struct GSF_PeerTransmitHandle *pos;
828   struct GSF_PeerTransmitHandle *prev;
829   struct GNUNET_PeerIdentity target;
830   uint64_t ip;
831   int is_ready;
832
833   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
834                                           &peer->hashPubKey);
835   GNUNET_assert (NULL != cp);
836   pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
837   pth->transmission_request_start_time = GNUNET_TIME_absolute_now ();
838   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
839   pth->gmc = gmc;
840   pth->gmc_cls = gmc_cls;
841   pth->size = size;
842   pth->is_query = is_query;
843   pth->priority = priority;
844   pth->cp = cp;
845   /* insertion sort (by priority, descending) */
846   prev = NULL;
847   pos = cp->pth_head;
848   while ( (pos != NULL) &&
849           (pos->priority > priority) )
850     {
851       prev = pos;
852       pos = pos->next;
853     }
854   if (prev == NULL)
855     GNUNET_CONTAINER_DLL_insert_head (cp->pth_head,
856                                       cp->pth_tail,
857                                       pth);
858   else
859     GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
860                                        cp->pth_tail,
861                                        prev,
862                                        pth);
863   GNUNET_PEER_resolve (cp->pid,
864                        &target);
865   if (GNUNET_YES == is_query)
866     {
867       /* query, need reservation */
868       cp->ppd.pending_queries++;
869       if (NULL == cp->irc)
870         {
871           /* reservation already done! */
872           is_ready = GNUNET_YES;
873           ip = cp->inc_preference;
874           cp->inc_preference = 0;
875           cp->irc = GNUNET_CORE_peer_change_preference (core,
876                                                         peer,
877                                                         GNUNET_TIME_UNIT_FOREVER_REL,
878                                                         GNUNET_BANDWIDTH_VALUE_MAX,
879                                                         GNUNET_FS_DBLOCK_SIZE,
880                                                         ip,
881                                                         &core_reserve_callback,
882                                                         cp);      
883         }
884       else
885         {
886           /* still waiting for reservation */
887           is_ready = GNUNET_NO;
888         }
889     }
890   else if (GNUNET_NO == is_query)
891     {
892       /* no reservation needed for content */
893       cp->ppd.pending_replies++;
894       is_ready = GNUNET_YES;
895     }
896   else
897     {
898       /* not a query or content, no reservation needed */
899       is_ready = GNUNET_YES;
900     }
901   if (is_ready)
902     {
903       pth->cth = GNUNET_CORE_notify_transmit_ready (core,
904                                                     priority,
905                                                     timeout,
906                                                     &target,
907                                                     size,
908                                                     &peer_transmit_ready_cb,
909                                                     pth);
910       /* pth->cth could be NULL here, that's OK, we'll try again
911          later... */
912     }
913   if (pth->cth == NULL)
914     {
915       /* if we're waiting for reservation OR if we could not do notify_transmit_ready,
916          install a timeout task to be on the safe side */
917       pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
918                                                         &peer_transmit_timeout,
919                                                         pth);
920     }
921   return pth;
922 }
923
924
925 /**
926  * Cancel an earlier request for transmission.
927  *
928  * @param pth request to cancel
929  */
930 void
931 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
932 {
933   struct GSF_PeerTransmitHandle *pth = cls;
934   struct GSF_ConnectedPeer *cp;
935
936   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
937     {
938       GNUNET_SCHEDULER_cancel (pth->timeout_task);
939       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
940     }
941   if (NULL != pth->cth)
942     {
943       GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
944       pth->cth = NULL;
945     }
946   cp = pth->cp;
947   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
948                                cp->pth_tail,
949                                pth);
950   if (GNUNET_YES == pth->is_query)
951     GNUNET_assert (0 < cp->ppd.pending_queries--);    
952   else if (GNUNET_NO == pth->is_query)
953     GNUNET_assert (0 < cp->ppd.pending_replies--);
954   GNUNET_free (pth);
955 }
956
957
958 /**
959  * Report on receiving a reply; update the performance record of the given peer.
960  *
961  * @param cp responding peer (will be updated)
962  * @param request_time time at which the original query was transmitted
963  * @param request_priority priority of the original request
964  * @param initiator_client local client on responsible for query (or NULL)
965  * @param initiator_peer other peer responsible for query (or NULL)
966  */
967 void
968 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
969                               struct GNUNET_TIME_Absolute request_time,
970                               uint32_t request_priority,
971                               const struct GSF_LocalClient *initiator_client,
972                               const struct GSF_ConnectedPeer *initiator_peer)
973 {
974   struct GNUNET_TIME_Relative delay;
975   unsigned int i;
976
977   delay = GNUNET_TIME_absolute_get_duration (request_time);  
978   cp->ppd.avg_reply_delay = (cp->ppd.avg_reply_delay * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
979   cp->ppd.avg_priority = (cp->avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
980   if (NULL != initiator_client)
981     {
982       cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
983     }
984   else if (NULL != initiator_peer)
985     {
986       GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
987       cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid;
988       GNUNET_PEER_change_rc (initiator_peer->pid, 1);
989     }
990   else
991     GNUNET_break (0);
992 }
993
994
995 /**
996  * Method called whenever a given peer has a status change.
997  *
998  * @param cls closure
999  * @param peer peer identity this notification is about
1000  * @param bandwidth_in available amount of inbound bandwidth
1001  * @param bandwidth_out available amount of outbound bandwidth
1002  * @param timeout absolute time when this peer will time out
1003  *        unless we see some further activity from it
1004  * @param atsi status information
1005  */
1006 void
1007 GSF_peer_status_handler_ (void *cls,
1008                           const struct GNUNET_PeerIdentity *peer,
1009                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1010                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1011                           struct GNUNET_TIME_Absolute timeout,
1012                           const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1013 {
1014   struct GSF_ConnectedPeer *cp;
1015
1016   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1017                                           &peer->hashPubKey);
1018   GNUNET_assert (NULL != cp);
1019   update_atsi (cp, atsi);
1020 }
1021
1022
1023 /**
1024  * Cancel all requests associated with the peer.
1025  *
1026  * @param cls unused
1027  * @param query hash code of the request
1028  * @param value the 'struct GSF_PendingRequest'
1029  * @return GNUNET_YES (continue to iterate)
1030  */
1031 static int
1032 cancel_pending_request (void *cls,
1033                         const GNUNET_HashCode *query,
1034                         void *value)
1035 {
1036   struct GSF_PendingRequest *pr = value;
1037
1038   GSF_pending_request_cancel_ (pr);
1039   return GNUNET_OK;
1040 }
1041
1042
1043 /**
1044  * A peer disconnected from us.  Tear down the connected peer
1045  * record.
1046  *
1047  * @param cls unused
1048  * @param peer identity of peer that connected
1049  */
1050 void
1051 GSF_peer_disconnect_handler_ (void *cls,
1052                               const struct GNUNET_PeerIdentity *peer)
1053 {
1054   struct GSF_ConnectedPeer *cp;
1055   struct GSF_PeerTransmitHandle *pth;
1056
1057   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1058                                           &peer->hashPubKey);
1059   GNUNET_assert (NULL != cp);
1060   GNUNET_CONTAINER_multihashmap_remove (cp_map,
1061                                         &peer->hashPubKey,
1062                                         cp);
1063   if (NULL != cp->migration_pth)
1064     {
1065       GSF_peer_transmit_cancel_ (cp->migration_pth);
1066       cp->migration_pth = NULL;
1067     }
1068   if (NULL != cp->irc)
1069     {
1070       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1071       cp->irc = NULL;
1072     }
1073   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1074                                          &cancel_pending_request,
1075                                          cp);
1076   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1077   cp->request_map = NULL;
1078   GSF_plan_notify_peer_disconnect_ (cp);
1079   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1080   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1081   while (NULL != (pth = cp->pth_head))
1082     {
1083       if (NULL != pth->th)
1084         {
1085           GNUNET_CORE_notify_transmit_ready_cancel (pth->th);
1086           pth->th = NULL;
1087         }
1088       GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1089                                    cp->pth_tail,
1090                                    pth);
1091       GNUNET_free (pth);
1092     }
1093   GNUNET_PEER_change_rc (cp->pid, -1);
1094   GNUNET_free (cp);
1095 }
1096
1097
1098 /**
1099  * Closure for 'call_iterator'.
1100  */
1101 struct IterationContext
1102 {
1103   /**
1104    * Function to call on each entry.
1105    */
1106   GSF_ConnectedPeerIterator it;
1107
1108   /**
1109    * Closure for 'it'.
1110    */
1111   void *it_cls;
1112 };
1113
1114
1115 /**
1116  * Function that calls the callback for each peer.
1117  *
1118  * @param cls the 'struct IterationContext*'
1119  * @param key identity of the peer
1120  * @param value the 'struct GSF_ConnectedPeer*'
1121  * @return GNUNET_YES to continue iteration
1122  */
1123 static int
1124 call_iterator (void *cls,
1125                const GNUNET_HashCode *key,
1126                void *value)
1127 {
1128   struct IterationContext *ic = cls;
1129   struct GSF_ConnectedPeer *cp = value;
1130   
1131   ic->it (ic->it_cls,
1132           (const struct GNUNET_PeerIdentity*) key,
1133           cp,
1134           &cp->ppd);
1135   return GNUNET_YES;
1136 }
1137
1138
1139 /**
1140  * Iterate over all connected peers.
1141  *
1142  * @param it function to call for each peer
1143  * @param it_cls closure for it
1144  */
1145 void
1146 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1147                               void *it_cls)
1148 {
1149   struct IterationContext ic;
1150
1151   ic.it = it;
1152   ic.it_cls = it_cls;
1153   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1154                                          &call_iterator,
1155                                          &ic);
1156 }
1157
1158
1159 /**
1160  * Obtain the identity of a connected peer.
1161  *
1162  * @param cp peer to reserve bandwidth from
1163  * @param id identity to set (written to)
1164  */
1165 void
1166 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1167                                   struct GNUNET_PeerIdentity *id)
1168 {
1169   GNUNET_PEER_resolve (cp->pid,
1170                        &id);
1171 }
1172
1173
1174 /**
1175  * Assemble a migration stop message for transmission.
1176  *
1177  * @param cls the 'struct GSF_ConnectedPeer' to use
1178  * @param size number of bytes we're allowed to write to buf
1179  * @param buf where to copy the message
1180  * @return number of bytes copied to buf
1181  */
1182 static size_t
1183 create_migration_stop_message (void *cls,
1184                                size_t size,
1185                                void *buf)
1186 {
1187   struct GSF_ConnectedPeer *cp = cls;
1188   struct MigrationStopMessage msm;
1189
1190   cp->migration_pth = NULL;
1191   if (NULL == buf)
1192     return 0;
1193   GNUNET_assert (size > sizeof (struct MigrationStopMessage));
1194   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1195   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1196   msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block));
1197   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1198   return sizeof (struct MigrationStopMessage);
1199 }
1200
1201
1202 /**
1203  * Ask a peer to stop migrating data to us until the given point
1204  * in time.
1205  * 
1206  * @param cp peer to ask
1207  * @param block_time until when to block
1208  */
1209 void
1210 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1211                            struct GNUNET_TIME_Relative block_time)
1212 {
1213   if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value)
1214     return; /* already blocked */
1215   cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
1216   if (cp->migration_pth != NULL)
1217     GSF_peer_transmit_cancel_ (cp->migration_pth);
1218   cp->migration_pth 
1219     = GSF_peer_transmit_ (cp,
1220                           GNUNET_SYSERR,
1221                           UINT32_MAX,
1222                           GNUNET_TIME_UNIT_FOREVER_REL,
1223                           sizeof (struct MigrationStopMessage),
1224                           &create_migration_stop_message,
1225                           cp);
1226 }
1227
1228
1229 /**
1230  * Write host-trust information to a file - flush the buffer entry!
1231  *
1232  * @param cls closure, not used
1233  * @param key host identity
1234  * @param value the 'struct GSF_ConnectedPeer' to flush
1235  * @return GNUNET_OK to continue iteration
1236  */
1237 static int
1238 flush_trust (void *cls,
1239              const GNUNET_HashCode *key,
1240              void *value)
1241 {
1242   struct GSF_ConnectedPeer *cp = value;
1243   char *fn;
1244   uint32_t trust;
1245   struct GNUNET_PeerIdentity pid;
1246
1247   if (cp->trust == cp->disk_trust)
1248     return GNUNET_OK;                     /* unchanged */
1249   GNUNET_PEER_resolve (cp->pid,
1250                        &pid);
1251   fn = get_trust_filename (&pid);
1252   if (cp->trust == 0)
1253     {
1254       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1255         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1256                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1257     }
1258   else
1259     {
1260       trust = htonl (cp->trust);
1261       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1262                                                     sizeof(uint32_t),
1263                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1264                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1265         cp->disk_trust = cp->trust;
1266     }
1267   GNUNET_free (fn);
1268   return GNUNET_OK;
1269 }
1270
1271
1272 /**
1273  * Notify core about a preference we have for the given peer
1274  * (to allocate more resources towards it).  The change will
1275  * be communicated the next time we reserve bandwidth with
1276  * core (not instantly).
1277  *
1278  * @param cp peer to reserve bandwidth from
1279  * @param pref preference change
1280  */
1281 void
1282 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1283                                        uint64_t pref)
1284 {
1285   cp->inc_preference += pref;
1286 }
1287
1288
1289 /**
1290  * Call this method periodically to flush trust information to disk.
1291  *
1292  * @param cls closure, not used
1293  * @param tc task context, not used
1294  */
1295 static void
1296 cron_flush_trust (void *cls,
1297                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1298 {
1299
1300   if (NULL == cp_map)
1301     return;
1302   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1303                                          &flush_trust,
1304                                          NULL);
1305   if (NULL == tc)
1306     return;
1307   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1308     return;
1309   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, 
1310                                 &cron_flush_trust, 
1311                                 NULL);
1312 }
1313
1314
1315 /**
1316  * Initialize peer management subsystem.
1317  *
1318  * @param cfg configuration to use
1319  */
1320 void
1321 GSF_connected_peer_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
1322 {
1323   cp_map = GNUNET_CONTAINER_multihashmap_create (128);
1324   GNUNET_assert (GNUNET_OK ==
1325                  GNUNET_CONFIGURATION_get_value_filename (cfg,
1326                                                           "fs",
1327                                                           "TRUST",
1328                                                           &trustDirectory));
1329   GNUNET_DISK_directory_create (trustDirectory);
1330   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1331                                       &cron_flush_trust, NULL);
1332 }
1333
1334
1335 /**
1336  * Iterator to free peer entries.
1337  *
1338  * @param cls closure, unused
1339  * @param key current key code
1340  * @param value value in the hash map (peer entry)
1341  * @return GNUNET_YES (we should continue to iterate)
1342  */
1343 static int 
1344 clean_peer (void *cls,
1345             const GNUNET_HashCode * key,
1346             void *value)
1347 {
1348   GSF_peer_disconnect_handler_ (NULL, 
1349                                 (const struct GNUNET_PeerIdentity*) key);
1350   return GNUNET_YES;
1351 }
1352
1353
1354 /**
1355  * Shutdown peer management subsystem.
1356  */
1357 void
1358 GSF_connected_peer_done_ ()
1359 {
1360   cron_flush_trust (NULL, NULL);
1361   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1362                                          &clean_peer,
1363                                          NULL);
1364   GNUNET_CONTAINER_multihashmap_destroy (cp_map);
1365   cp_map = NULL;
1366   GNUNET_free (trustDirectory);
1367   trustDirectory = NULL;
1368 }
1369
1370
1371 /**
1372  * Iterator to remove references to LC entry.
1373  *
1374  * @param the 'struct GSF_LocalClient*' to look for
1375  * @param key current key code
1376  * @param value value in the hash map (peer entry)
1377  * @return GNUNET_YES (we should continue to iterate)
1378  */
1379 static int 
1380 clean_local_client (void *cls,
1381                     const GNUNET_HashCode * key,
1382                     void *value)
1383 {
1384   const struct GSF_LocalClient *lc = cls;
1385   struct GSF_ConnectedPeer *cp = value;
1386   unsigned int i;
1387
1388   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1389     if (cp->ppd.last_client_replies[i] == lc)
1390       cp->ppd.last_client_replies[i] = NULL;
1391   return GNUNET_YES;
1392 }
1393
1394
1395 /**
1396  * Notification that a local client disconnected.  Clean up all of our
1397  * references to the given handle.
1398  *
1399  * @param lc handle to the local client (henceforth invalid)
1400  */
1401 void
1402 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1403 {
1404   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1405                                          &clean_local_client,
1406                                          (void*) lc);
1407 }
1408
1409
1410 /* end of gnunet-service-fs_cp.c */