stuff
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29
30 /**
31  * How often do we flush trust values to disk?
32  */
33 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
34
35
36
37 /**
38  * Handle to cancel a transmission request.
39  */
40 struct GSF_PeerTransmitHandle
41 {
42
43   /**
44    * Handle for an active request for transmission to this
45    * peer, or NULL (if core queue was full).
46    */
47   struct GNUNET_CORE_TransmitHandle *cth;
48
49   /**
50    * Time when this transmission request was issued.
51    */
52   struct GNUNET_TIME_Absolute transmission_request_start_time;
53
54   /**
55    * Timeout for this request.
56    */
57   struct GNUNET_TIME_Absolute timeout;
58
59   /**
60    * Task called on timeout, or 0 for none.
61    */
62   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
63
64   /**
65    * Function to call to get the actual message.
66    */
67   GSF_GetMessageCallback gmc;
68
69   /**
70    * Peer this request targets.
71    */
72   struct GSF_ConnectedPeer *cp;
73
74   /**
75    * Closure for 'gmc'.
76    */
77   void *gmc_cls;
78
79   /**
80    * Size of the message to be transmitted.
81    */
82   size_t size;
83
84   /**
85    * GNUNET_YES if this is a query, GNUNET_NO for content.
86    */
87   int is_query;
88
89   /**
90    * Priority of this request.
91    */
92   uint32_t priority;
93
94 };
95
96
97 /**
98  * A connected peer.
99  */
100 struct GSF_ConnectedPeer 
101 {
102
103   /**
104    * Performance data for this peer.
105    */
106   struct GSF_PeerPerformanceData ppd;
107
108   /**
109    * Time until when we blocked this peer from migrating
110    * data to us.
111    */
112   struct GNUNET_TIME_Absolute last_migration_block;
113
114   /**
115    * Messages (replies, queries, content migration) we would like to
116    * send to this peer in the near future.  Sorted by priority, head.
117    */
118   struct GSF_PeerTransmitHandle *pth_head;
119
120   /**
121    * Messages (replies, queries, content migration) we would like to
122    * send to this peer in the near future.  Sorted by priority, tail.
123    */
124   struct GSF_PeerTransmitHandle *pth_tail;
125
126   /**
127    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
128    * NULL if we have successfully reserved 32k, otherwise non-NULL.
129    */
130   struct GNUNET_CORE_InformationRequestContext *irc;
131
132   /**
133    * ID of delay task for scheduling transmission.
134    */
135   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: unused!
136
137   /**
138    * Increase in traffic preference still to be submitted
139    * to the core service for this peer.
140    */
141   uint64_t inc_preference;
142
143   /**
144    * Trust rating for this peer
145    */
146   uint32_t trust;
147
148   /**
149    * Trust rating for this peer on disk.
150    */
151   uint32_t disk_trust;
152
153   /**
154    * The peer's identity.
155    */
156   GNUNET_PEER_Id pid;
157
158   /**
159    * Which offset in "last_p2p_replies" will be updated next?
160    * (we go round-robin).
161    */
162   unsigned int last_p2p_replies_woff;
163
164   /**
165    * Which offset in "last_client_replies" will be updated next?
166    * (we go round-robin).
167    */
168   unsigned int last_client_replies_woff;
169
170   /**
171    * Current offset into 'last_request_times' ring buffer.
172    */
173   unsigned int last_request_times_off;
174
175 };
176
177
178 /**
179  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
180  */
181 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
182
183
184 /**
185  * Where do we store trust information?
186  */
187 static char *trustDirectory;
188
189
190 /**
191  * Get the filename under which we would store the GNUNET_HELLO_Message
192  * for the given host and protocol.
193  * @return filename of the form DIRECTORY/HOSTID
194  */
195 static char *
196 get_trust_filename (const struct GNUNET_PeerIdentity *id)
197 {
198   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
199   char *fn;
200
201   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
202   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
203   return fn;
204 }
205
206
207 /**
208  * Find latency information in 'atsi'.
209  *
210  * @param atsi performance data
211  * @return connection latency
212  */
213 static struct GNUNET_TIME_Relative
214 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
215 {
216   if (atsi == NULL)
217     return GNUNET_TIME_UNIT_SECONDS;
218   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
219           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
220     atsi++;
221   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
222     {
223       GNUNET_break (0);
224       /* how can we not have latency data? */
225       return GNUNET_TIME_UNIT_SECONDS;
226     }
227   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
228                                         ntohl (atsi->value));
229 }
230
231
232 /**
233  * Update the performance information kept for the given peer.
234  *
235  * @param cp peer record to update
236  * @param atsi transport performance data
237  */
238 static void
239 update_atsi (struct GSF_ConnectedPeer *cp,
240              const struct GNUNET_TRANSPORT_ATS_Information *atsi)
241 {
242   struct GNUNET_TIME_Relative latency;
243
244   latency = get_latency (atsi);
245   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
246                                  latency);
247   /* LATER: merge atsi into cp's performance data (if we ever care...) */
248 }
249
250
251 /**
252  * Core is ready to transmit to a peer, get the message.
253  *
254  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
255  * @param size number of bytes core is willing to take
256  * @param buf where to copy the message
257  * @return number of bytes copied to buf
258  */
259 static size_t
260 peer_transmit_ready_cb (void *cls,
261                         size_t size,
262                         void *buf)
263 {
264   struct GSF_PeerTransmitHandle *pth = cls;
265   struct GSF_ConnectedPeer *cp;
266   size_t ret;
267
268   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
269     {
270       GNUNET_SCHEDULER_cancel (pth->timeout_task);
271       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
272     }
273   cp = pth->cp;
274   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
275                                cp->pth_tail,
276                                pth);
277   if (pth->is_query)
278     {
279       cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
280       GNUNET_assert (0 < cp->ppd.pending_queries--);    
281     }
282   else
283     {
284       GNUNET_assert (0 < cp->ppd.pending_replies--);
285     }
286   GNUNET_LOAD_update (cp->ppd.transmission_delay,
287                       GNUNET_TIME_absolute_get_duration (pth->request_start_time).rel_value);  
288   ret = pth->gmc (pth->gmc_cls, 
289                   0, NULL);
290   GNUNET_free (pth);  
291   return ret;
292 }
293
294
295 /**
296  * Function called by core upon success or failure of our bandwidth reservation request.
297  *
298  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
299  * @param peer identifies the peer
300  * @param bandwidth_out available amount of outbound bandwidth
301  * @param amount set to the amount that was actually reserved or unreserved;
302  *               either the full requested amount or zero (no partial reservations)
303  * @param preference current traffic preference for the given peer
304  */
305 static void
306 core_reserve_callback (void *cls,
307                        const struct GNUNET_PeerIdentity * peer,
308                        struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
309                        int amount,
310                        uint64_t preference)
311 {
312   struct GSF_ConnectedPeer *cp = cls;
313   uint64_t ip;
314
315   cp->irc = NULL;
316   if (0 == amount)
317     {
318       /* failed; retry! (how did we get here!?) */
319       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
320                   _("Failed to reserve bandwidth to peer `%s'\n"),
321                   GNUNET_i2s (peer));
322       ip = cp->inc_preference;
323       cp->inc_preference = 0;
324       cp->irc = GNUNET_CORE_peer_change_preference (core,
325                                                     peer,
326                                                     GNUNET_TIME_UNIT_FOREVER_REL,
327                                                     GNUNET_BANDWIDTH_VALUE_MAX,
328                                                     GNUNET_FS_DBLOCK_SIZE,
329                                                     ip,
330                                                     &core_reserve_callback,
331                                                     cp);
332       return;
333     }
334   pth = cp->pth_head;
335   if ( (NULL != pth) &&
336        (NULL == pth->cth) )
337     {
338       /* reservation success, try transmission now! */
339       pth->cth = GNUNET_CORE_notify_transmit_ready (core,
340                                                     priority,
341                                                     GNUNET_TIME_absolute_get_remaining (pth->timeout),
342                                                     &target,
343                                                     size,
344                                                     &peer_transmit_ready_cb,
345                                                     pth);
346     }
347 }
348
349
350 /**
351  * A peer connected to us.  Setup the connected peer
352  * records.
353  *
354  * @param peer identity of peer that connected
355  * @param atsi performance data for the connection
356  * @return handle to connected peer entry
357  */
358 struct GSF_ConnectedPeer *
359 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
360                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
361 {
362   struct GSF_ConnectedPeer *cp;
363   char *fn;
364   uint32_t trust;
365   struct GNUNET_TIME_Relative latency;
366
367   cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
368   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
369   cp->pid = GNUNET_PEER_intern (peer);
370   cp->transmission_delay = GNUNET_LOAD_value_init (0);
371   cp->irc = GNUNET_CORE_peer_change_preference (core,
372                                                 peer,
373                                                 GNUNET_TIME_UNIT_FOREVER_REL,
374                                                 GNUNET_BANDWIDTH_VALUE_MAX,
375                                                 GNUNET_FS_DBLOCK_SIZE,
376                                                 0,
377                                                 &core_reserve_callback,
378                                                 cp);
379   fn = get_trust_filename (peer);
380   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
381       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
382     cp->disk_trust = cp->trust = ntohl (trust);
383   GNUNET_free (fn);
384   GNUNET_break (GNUNET_OK ==
385                 GNUNET_CONTAINER_multihashmap_put (cp_map,
386                                                    &peer->hashPubKey,
387                                                    cp,
388                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
389   update_atsi (cp, atsi);
390   GSF_plan_notify_new_peer_ (cp);
391   return cp;
392 }
393
394
395 /**
396  * Handle P2P "MIGRATION_STOP" message.
397  *
398  * @param cls closure, always NULL
399  * @param other the other peer involved (sender or receiver, NULL
400  *        for loopback messages where we are both sender and receiver)
401  * @param message the actual message
402  * @param atsi performance information
403  * @return GNUNET_OK to keep the connection open,
404  *         GNUNET_SYSERR to close it (signal serious error)
405  */
406 int
407 GSF_handle_p2p_migration_stop_ (void *cls,
408                                 const struct GNUNET_PeerIdentity *other,
409                                 const struct GNUNET_MessageHeader *message,
410                                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
411 {
412   struct GSF_ConnectedPeer *cp; 
413   const struct MigrationStopMessage *msm;
414
415   msm = (const struct MigrationStopMessage*) message;
416   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
417                                           &other->hashPubKey);
418   if (cp == NULL)
419     {
420       GNUNET_break (0);
421       return GNUNET_OK;
422     }
423   cp->ppd.migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
424   update_atsi (cp, atsi);
425   return GNUNET_OK;
426 }
427
428
429 /**
430  * Function called if there has been a timeout trying to satisfy
431  * a transmission request.
432  *
433  * @param cls the 'struct GSF_PeerTransmitHandle' of the request 
434  * @param tc scheduler context
435  */
436 static void
437 peer_transmit_timeout (void *cls,
438                        const struct GNUNET_SCHEDULER_TaskContext *tc)
439 {
440   struct GSF_PeerTransmitHandle *pth = cls;
441   struct GSF_ConnectedPeer *cp;
442   
443   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
444   cp = pth->cp;
445   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
446                                cp->pth_tail,
447                                pth);
448   if (pth->is_query)
449     GNUNET_assert (0 < cp->ppd.pending_queries--);    
450   else
451     GNUNET_assert (0 < cp->ppd.pending_replies--);
452   GNUNET_LOAD_update (cp->ppd.transmission_delay,
453                       UINT64_MAX);
454   pth->gmc (pth->gmc_cls, 
455             0, NULL);
456   GNUNET_free (pth);
457 }
458
459
460 /**
461  * Transmit a message to the given peer as soon as possible.
462  * If the peer disconnects before the transmission can happen,
463  * the callback is invoked with a 'NULL' buffer.
464  *
465  * @param peer target peer
466  * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO)
467  * @param priority how important is this request?
468  * @param timeout when does this request timeout (call gmc with error)
469  * @param size number of bytes we would like to send to the peer
470  * @param gmc function to call to get the message
471  * @param gmc_cls closure for gmc
472  * @return handle to cancel request
473  */
474 struct GSF_PeerTransmitHandle *
475 GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
476                     int is_query,
477                     uint32_t priority,
478                     struct GNUNET_TIME_Relative timeout,
479                     size_t size,
480                     GSF_GetMessageCallback gmc,
481                     void *gmc_cls)
482 {
483   struct GSF_ConnectedPeer *cp;
484   struct GSF_PeerTransmitHandle *pth;
485   struct GSF_PeerTransmitHandle *pos;
486   struct GSF_PeerTransmitHandle *prev;
487   struct GNUNET_PeerIdentity target;
488   uint64_t ip;
489   int is_ready;
490
491   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
492                                           &peer->hashPubKey);
493   GNUNET_assert (NULL != cp);
494   pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
495   pth->transmission_request_start_time = GNUNET_TIME_absolute_now ();
496   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
497   pth->gmc = gmc;
498   pth->gmc_cls = gmc_cls;
499   pth->size = size;
500   pth->is_query = is_query;
501   pth->priority = priority;
502   pth->cp = cp;
503   /* insertion sort (by priority, descending) */
504   prev = NULL;
505   pos = cp->pth_head;
506   while ( (pos != NULL) &&
507           (pos->priority > priority) )
508     {
509       prev = pos;
510       pos = pos->next;
511     }
512   if (prev == NULL)
513     GNUNET_CONTAINER_DLL_insert_head (cp->pth_head,
514                                       cp->pth_tail,
515                                       pth);
516   else
517     GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
518                                        cp->pth_tail,
519                                        prev,
520                                        pth);
521   GNUNET_PEER_resolve (cp->pid,
522                        &target);
523   if (is_query)
524     {
525       /* query, need reservation */
526       if (NULL == cp->irc)
527         {
528           /* reservation already done! */
529           is_ready = GNUNET_YES;
530           ip = cp->inc_preference;
531           cp->inc_preference = 0;
532           cp->irc = GNUNET_CORE_peer_change_preference (core,
533                                                         peer,
534                                                         GNUNET_TIME_UNIT_FOREVER_REL,
535                                                         GNUNET_BANDWIDTH_VALUE_MAX,
536                                                         GNUNET_FS_DBLOCK_SIZE,
537                                                         ip,
538                                                         &core_reserve_callback,
539                                                         cp);      
540         }
541       else
542         {
543           /* still waiting for reservation */
544           is_ready = GNUNET_NO;
545         }
546     }
547   else
548     {
549       /* no reservation needed for content */
550       is_ready = GNUNET_YES;
551     }
552   if (is_ready)
553     {
554       pth->cth = GNUNET_CORE_notify_transmit_ready (core,
555                                                     priority,
556                                                     timeout,
557                                                     &target,
558                                                     size,
559                                                     &peer_transmit_ready_cb,
560                                                     pth);
561       /* pth->cth could be NULL here, that's OK, we'll try again
562          later... */
563     }
564   if (pth->cth == NULL)
565     {
566       /* if we're waiting for reservation OR if we could not do notify_transmit_ready,
567          install a timeout task to be on the safe side */
568       pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
569                                                         &peer_transmit_timeout,
570                                                         pth);
571     }
572   return pth;
573 }
574
575
576 /**
577  * Cancel an earlier request for transmission.
578  *
579  * @param pth request to cancel
580  */
581 void
582 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
583 {
584   struct GSF_PeerTransmitHandle *pth = cls;
585   struct GSF_ConnectedPeer *cp;
586
587   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
588     {
589       GNUNET_SCHEDULER_cancel (pth->timeout_task);
590       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
591     }
592   if (NULL != pth->cth)
593     {
594       GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
595       pth->cth = NULL;
596     }
597   cp = pth->cp;
598   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
599                                cp->pth_tail,
600                                pth);
601   if (pth->is_query)
602     GNUNET_assert (0 < cp->ppd.pending_queries--);    
603   else
604     GNUNET_assert (0 < cp->ppd.pending_replies--);
605   GNUNET_free (pth);
606 }
607
608
609 /**
610  * Report on receiving a reply; update the performance record of the given peer.
611  *
612  * @param cp responding peer (will be updated)
613  * @param request_time time at which the original query was transmitted
614  * @param request_priority priority of the original request
615  * @param initiator_client local client on responsible for query (or NULL)
616  * @param initiator_peer other peer responsible for query (or NULL)
617  */
618 void
619 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
620                               struct GNUNET_TIME_Absolute request_time,
621                               uint32_t request_priority,
622                               const struct GSF_LocalClient *initiator_client,
623                               const struct GSF_ConnectedPeer *initiator_peer)
624 {
625   struct GNUNET_TIME_Relative delay;
626   unsigned int i;
627
628   delay = GNUNET_TIME_absolute_get_duration (request_time);  
629   cp->ppd.avg_reply_delay = (cp->ppd.avg_reply_delay * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
630   cp->ppd.avg_priority = (cp->avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
631   if (NULL != initiator_client)
632     {
633       cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
634     }
635   else if (NULL != initiator_peer)
636     {
637       GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
638       cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid;
639       GNUNET_PEER_change_rc (initiator_peer->pid, 1);
640     }
641   else
642     GNUNET_break (0);
643 }
644
645
646 /**
647  * Method called whenever a given peer has a status change.
648  *
649  * @param cls closure
650  * @param peer peer identity this notification is about
651  * @param bandwidth_in available amount of inbound bandwidth
652  * @param bandwidth_out available amount of outbound bandwidth
653  * @param timeout absolute time when this peer will time out
654  *        unless we see some further activity from it
655  * @param atsi status information
656  */
657 void
658 GSF_peer_status_handler_ (void *cls,
659                           const struct GNUNET_PeerIdentity *peer,
660                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
661                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
662                           struct GNUNET_TIME_Absolute timeout,
663                           const struct GNUNET_TRANSPORT_ATS_Information *atsi)
664 {
665   struct GSF_ConnectedPeer *cp;
666
667   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
668                                           &peer->hashPubKey);
669   GNUNET_assert (NULL != cp);
670   update_atsi (cp, atsi);
671 }
672
673
674 /**
675  * A peer disconnected from us.  Tear down the connected peer
676  * record.
677  *
678  * @param cls unused
679  * @param peer identity of peer that connected
680  */
681 void
682 GSF_peer_disconnect_handler_ (void *cls,
683                               const struct GNUNET_PeerIdentity *peer)
684 {
685   struct GSF_ConnectedPeer *cp;
686   struct GSF_PeerTransmitHandle *pth;
687
688   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
689                                           &peer->hashPubKey);
690   GNUNET_assert (NULL != cp);
691   GNUNET_CONTAINER_multihashmap_remove (cp_map,
692                                         &peer->hashPubKey,
693                                         cp);
694   if (NULL != cp->irc)
695     {
696       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
697       cp->irc = NULL;
698     }
699   GSF_plan_notify_peer_disconnect_ (cp);
700   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
701   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
702   while (NULL != (pth = cp->pth_head))
703     {
704       if (NULL != pth->th)
705         {
706           GNUNET_CORE_notify_transmit_ready_cancel (pth->th);
707           pth->th = NULL;
708         }
709       GNUNET_CONTAINER_DLL_remove (cp->pth_head,
710                                    cp->pth_tail,
711                                    pth);
712       GNUNET_free (pth);
713     }
714   GNUNET_PEER_change_rc (cp->pid, -1);
715   GNUNET_free (cp);
716 }
717
718
719 /**
720  * Closure for 'call_iterator'.
721  */
722 struct IterationContext
723 {
724   /**
725    * Function to call on each entry.
726    */
727   GSF_ConnectedPeerIterator it;
728
729   /**
730    * Closure for 'it'.
731    */
732   void *it_cls;
733 };
734
735
736 /**
737  * Function that calls the callback for each peer.
738  *
739  * @param cls the 'struct IterationContext*'
740  * @param key identity of the peer
741  * @param value the 'struct GSF_ConnectedPeer*'
742  * @return GNUNET_YES to continue iteration
743  */
744 static int
745 call_iterator (void *cls,
746                const GNUNET_HashCode *key,
747                void *value)
748 {
749   struct IterationContext *ic = cls;
750   struct GSF_ConnectedPeer *cp = value;
751   
752   ic->it (ic->it_cls,
753           (const struct GNUNET_PeerIdentity*) key,
754           cp,
755           &cp->ppd);
756   return GNUNET_YES;
757 }
758
759
760 /**
761  * Iterate over all connected peers.
762  *
763  * @param it function to call for each peer
764  * @param it_cls closure for it
765  */
766 void
767 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
768                               void *it_cls)
769 {
770   struct IterationContext ic;
771
772   ic.it = it;
773   ic.it_cls = it_cls;
774   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
775                                          &call_iterator,
776                                          &ic);
777 }
778
779
780 /**
781  * Write host-trust information to a file - flush the buffer entry!
782  *
783  * @param cls closure, not used
784  * @param key host identity
785  * @param value the 'struct GSF_ConnectedPeer' to flush
786  * @return GNUNET_OK to continue iteration
787  */
788 static int
789 flush_trust (void *cls,
790              const GNUNET_HashCode *key,
791              void *value)
792 {
793   struct GSF_ConnectedPeer *cp = value;
794   char *fn;
795   uint32_t trust;
796   struct GNUNET_PeerIdentity pid;
797
798   if (cp->trust == cp->disk_trust)
799     return GNUNET_OK;                     /* unchanged */
800   GNUNET_PEER_resolve (cp->pid,
801                        &pid);
802   fn = get_trust_filename (&pid);
803   if (cp->trust == 0)
804     {
805       if ((0 != UNLINK (fn)) && (errno != ENOENT))
806         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
807                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
808     }
809   else
810     {
811       trust = htonl (cp->trust);
812       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
813                                                     sizeof(uint32_t),
814                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
815                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
816         cp->disk_trust = cp->trust;
817     }
818   GNUNET_free (fn);
819   return GNUNET_OK;
820 }
821
822
823 /**
824  * Notify core about a preference we have for the given peer
825  * (to allocate more resources towards it).  The change will
826  * be communicated the next time we reserve bandwidth with
827  * core (not instantly).
828  *
829  * @param cp peer to reserve bandwidth from
830  * @param pref preference change
831  */
832 void
833 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
834                                        uint64_t pref)
835 {
836   cp->inc_preference += pref;
837 }
838
839
840 /**
841  * Call this method periodically to flush trust information to disk.
842  *
843  * @param cls closure, not used
844  * @param tc task context, not used
845  */
846 static void
847 cron_flush_trust (void *cls,
848                   const struct GNUNET_SCHEDULER_TaskContext *tc)
849 {
850
851   if (NULL == cp_map)
852     return;
853   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
854                                          &flush_trust,
855                                          NULL);
856   if (NULL == tc)
857     return;
858   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
859     return;
860   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, 
861                                 &cron_flush_trust, 
862                                 NULL);
863 }
864
865
866 /**
867  * Initialize peer management subsystem.
868  *
869  * @param cfg configuration to use
870  */
871 void
872 GSF_connected_peer_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
873 {
874   cp_map = GNUNET_CONTAINER_multihashmap_create (128);
875   GNUNET_assert (GNUNET_OK ==
876                  GNUNET_CONFIGURATION_get_value_filename (cfg,
877                                                           "fs",
878                                                           "TRUST",
879                                                           &trustDirectory));
880   GNUNET_DISK_directory_create (trustDirectory);
881   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
882                                       &cron_flush_trust, NULL);
883 }
884
885
886 /**
887  * Iterator to free peer entries.
888  *
889  * @param cls closure, unused
890  * @param key current key code
891  * @param value value in the hash map (peer entry)
892  * @return GNUNET_YES (we should continue to iterate)
893  */
894 static int 
895 clean_peer (void *cls,
896             const GNUNET_HashCode * key,
897             void *value)
898 {
899   GSF_peer_disconnect_handler_ (NULL, 
900                                 (const struct GNUNET_PeerIdentity*) key);
901   return GNUNET_YES;
902 }
903
904
905 /**
906  * Shutdown peer management subsystem.
907  */
908 void
909 GSF_connected_peer_done_ ()
910 {
911   cron_flush_trust (NULL, NULL);
912   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
913                                          &clean_peer,
914                                          NULL);
915   GNUNET_CONTAINER_multihashmap_destroy (cp_map);
916   cp_map = NULL;
917   GNUNET_free (trustDirectory);
918   trustDirectory = NULL;
919 }
920
921
922 /**
923  * Iterator to remove references to LC entry.
924  *
925  * @param the 'struct GSF_LocalClient*' to look for
926  * @param key current key code
927  * @param value value in the hash map (peer entry)
928  * @return GNUNET_YES (we should continue to iterate)
929  */
930 static int 
931 clean_peer (void *cls,
932             const GNUNET_HashCode * key,
933             void *value)
934 {
935   const struct GSF_LocalClient *lc = cls;
936   struct GSF_ConnectedPeer *cp = value;
937   unsigned int i;
938
939   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
940     if (cp->ppd.last_client_replies[i] == lc)
941       cp->ppd.last_client_replies[i] = NULL;
942   return GNUNET_YES;
943 }
944
945
946 /**
947  * Notification that a local client disconnected.  Clean up all of our
948  * references to the given handle.
949  *
950  * @param lc handle to the local client (henceforth invalid)
951  */
952 void
953 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
954 {
955   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
956                                          &clean_local_client,
957                                          (void*) lc);
958 }
959
960
961 /* end of gnunet-service-fs_cp.c */