cleaning
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29
30 /**
31  * How often do we flush trust values to disk?
32  */
33 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
34
35
36 struct GSF_PeerTransmitHandle
37 {
38
39   /**
40    * Handle for an active request for transmission to this
41    * peer, or NULL (if core queue was full).
42    */
43   struct GNUNET_CORE_TransmitHandle *cth;
44
45   /**
46    * Time when this transmission request was issued.
47    */
48   struct GNUNET_TIME_Absolute transmission_request_start_time;
49
50   /**
51    * Timeout for this request.
52    */
53   struct GNUNET_TIME_Absolute timeout;
54
55   /**
56    * Task called on timeout, or 0 for none.
57    */
58   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
59
60   /**
61    * Function to call to get the actual message.
62    */
63   GSF_GetMessageCallback gmc;
64
65   /**
66    * Peer this request targets.
67    */
68   struct GSF_ConnectedPeer *cp;
69
70   /**
71    * Closure for 'gmc'.
72    */
73   void *gmc_cls;
74
75   /**
76    * Size of the message to be transmitted.
77    */
78   size_t size;
79
80   /**
81    * GNUNET_YES if this is a query, GNUNET_NO for content.
82    */
83   int is_query;
84
85   /**
86    * Priority of this request.
87    */
88   uint32_t priority;
89
90 };
91
92
93 /**
94  * A connected peer.
95  */
96 struct GSF_ConnectedPeer 
97 {
98
99   /**
100    * Performance data for this peer.
101    */
102   struct GSF_PeerPerformanceData ppd;
103
104   /**
105    * Time until when we blocked this peer from migrating
106    * data to us.
107    */
108   struct GNUNET_TIME_Absolute last_migration_block;
109
110   /**
111    * Messages (replies, queries, content migration) we would like to
112    * send to this peer in the near future.  Sorted by priority, head.
113    */
114   struct GSF_PeerTransmitHandle *pth_head;
115
116   /**
117    * Messages (replies, queries, content migration) we would like to
118    * send to this peer in the near future.  Sorted by priority, tail.
119    */
120   struct GSF_PeerTransmitHandle *pth_tail;
121
122   /**
123    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
124    * NULL if we have successfully reserved 32k, otherwise non-NULL.
125    */
126   struct GNUNET_CORE_InformationRequestContext *irc;
127
128   /**
129    * ID of delay task for scheduling transmission.
130    */
131   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: unused!
132
133   /**
134    * Increase in traffic preference still to be submitted
135    * to the core service for this peer.
136    */
137   uint64_t inc_preference;
138
139   /**
140    * Trust rating for this peer
141    */
142   uint32_t trust;
143
144   /**
145    * Trust rating for this peer on disk.
146    */
147   uint32_t disk_trust;
148
149   /**
150    * The peer's identity.
151    */
152   GNUNET_PEER_Id pid;
153
154   /**
155    * Which offset in "last_p2p_replies" will be updated next?
156    * (we go round-robin).
157    */
158   unsigned int last_p2p_replies_woff;
159
160   /**
161    * Which offset in "last_client_replies" will be updated next?
162    * (we go round-robin).
163    */
164   unsigned int last_client_replies_woff;
165
166   /**
167    * Current offset into 'last_request_times' ring buffer.
168    */
169   unsigned int last_request_times_off;
170
171 };
172
173
174 /**
175  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
176  */
177 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
178
179
180 /**
181  * Where do we store trust information?
182  */
183 static char *trustDirectory;
184
185
186 /**
187  * Get the filename under which we would store the GNUNET_HELLO_Message
188  * for the given host and protocol.
189  * @return filename of the form DIRECTORY/HOSTID
190  */
191 static char *
192 get_trust_filename (const struct GNUNET_PeerIdentity *id)
193 {
194   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
195   char *fn;
196
197   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
198   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
199   return fn;
200 }
201
202
203 /**
204  * Find latency information in 'atsi'.
205  *
206  * @param atsi performance data
207  * @return connection latency
208  */
209 static struct GNUNET_TIME_Relative
210 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
211 {
212   if (atsi == NULL)
213     return GNUNET_TIME_UNIT_SECONDS;
214   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
215           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
216     atsi++;
217   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
218     {
219       GNUNET_break (0);
220       /* how can we not have latency data? */
221       return GNUNET_TIME_UNIT_SECONDS;
222     }
223   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
224                                         ntohl (atsi->value));
225 }
226
227
228 /**
229  * Update the performance information kept for the given peer.
230  *
231  * @param cp peer record to update
232  * @param atsi transport performance data
233  */
234 static void
235 update_atsi (struct GSF_ConnectedPeer *cp,
236              const struct GNUNET_TRANSPORT_ATS_Information *atsi)
237 {
238   struct GNUNET_TIME_Relative latency;
239
240   latency = get_latency (atsi);
241   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
242                                  latency);
243   /* LATER: merge atsi into cp's performance data (if we ever care...) */
244 }
245
246
247 /**
248  * Core is ready to transmit to a peer, get the message.
249  *
250  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
251  * @param size number of bytes core is willing to take
252  * @param buf where to copy the message
253  * @return number of bytes copied to buf
254  */
255 static size_t
256 peer_transmit_ready_cb (void *cls,
257                         size_t size,
258                         void *buf)
259 {
260   struct GSF_PeerTransmitHandle *pth = cls;
261   struct GSF_ConnectedPeer *cp;
262   size_t ret;
263
264   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
265     {
266       GNUNET_SCHEDULER_cancel (pth->timeout_task);
267       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
268     }
269   cp = pth->cp;
270   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
271                                cp->pth_tail,
272                                pth);
273   if (pth->is_query)
274     {
275       cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
276       GNUNET_assert (0 < cp->ppd.pending_queries--);    
277     }
278   else
279     {
280       GNUNET_assert (0 < cp->ppd.pending_replies--);
281     }
282   GNUNET_LOAD_update (cp->ppd.transmission_delay,
283                       GNUNET_TIME_absolute_get_duration (pth->request_start_time).rel_value);  
284   ret = pth->gmc (pth->gmc_cls, 
285                   0, NULL);
286   GNUNET_free (pth);  
287   return ret;
288 }
289
290
291 /**
292  * Function called by core upon success or failure of our bandwidth reservation request.
293  *
294  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
295  * @param peer identifies the peer
296  * @param bandwidth_out available amount of outbound bandwidth
297  * @param amount set to the amount that was actually reserved or unreserved;
298  *               either the full requested amount or zero (no partial reservations)
299  * @param preference current traffic preference for the given peer
300  */
301 static void
302 core_reserve_callback (void *cls,
303                        const struct GNUNET_PeerIdentity * peer,
304                        struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
305                        int amount,
306                        uint64_t preference)
307 {
308   struct GSF_ConnectedPeer *cp = cls;
309   uint64_t ip;
310
311   cp->irc = NULL;
312   if (0 == amount)
313     {
314       /* failed; retry! (how did we get here!?) */
315       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
316                   _("Failed to reserve bandwidth to peer `%s'\n"),
317                   GNUNET_i2s (peer));
318       ip = cp->inc_preference;
319       cp->inc_preference = 0;
320       cp->irc = GNUNET_CORE_peer_change_preference (core,
321                                                     peer,
322                                                     GNUNET_TIME_UNIT_FOREVER_REL,
323                                                     GNUNET_BANDWIDTH_VALUE_MAX,
324                                                     GNUNET_FS_DBLOCK_SIZE,
325                                                     ip,
326                                                     &core_reserve_callback,
327                                                     cp);
328       return;
329     }
330   pth = cp->pth_head;
331   if ( (NULL != pth) &&
332        (NULL == pth->cth) )
333     {
334       /* reservation success, try transmission now! */
335       pth->cth = GNUNET_CORE_notify_transmit_ready (core,
336                                                     priority,
337                                                     GNUNET_TIME_absolute_get_remaining (pth->timeout),
338                                                     &target,
339                                                     size,
340                                                     &peer_transmit_ready_cb,
341                                                     pth);
342     }
343 }
344
345
346 /**
347  * A peer connected to us.  Setup the connected peer
348  * records.
349  *
350  * @param peer identity of peer that connected
351  * @param atsi performance data for the connection
352  * @return handle to connected peer entry
353  */
354 struct GSF_ConnectedPeer *
355 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
356                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
357 {
358   struct GSF_ConnectedPeer *cp;
359   char *fn;
360   uint32_t trust;
361   struct GNUNET_TIME_Relative latency;
362
363   cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
364   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
365   cp->pid = GNUNET_PEER_intern (peer);
366   cp->transmission_delay = GNUNET_LOAD_value_init (0);
367   cp->irc = GNUNET_CORE_peer_change_preference (core,
368                                                 peer,
369                                                 GNUNET_TIME_UNIT_FOREVER_REL,
370                                                 GNUNET_BANDWIDTH_VALUE_MAX,
371                                                 GNUNET_FS_DBLOCK_SIZE,
372                                                 0,
373                                                 &core_reserve_callback,
374                                                 cp);
375   fn = get_trust_filename (peer);
376   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
377       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
378     cp->disk_trust = cp->trust = ntohl (trust);
379   GNUNET_free (fn);
380   GNUNET_break (GNUNET_OK ==
381                 GNUNET_CONTAINER_multihashmap_put (cp_map,
382                                                    &peer->hashPubKey,
383                                                    cp,
384                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
385   update_atsi (cp, atsi);
386   GSF_plan_notify_new_peer_ (cp);
387   return cp;
388 }
389
390
391 /**
392  * Handle P2P "MIGRATION_STOP" message.
393  *
394  * @param cls closure, always NULL
395  * @param other the other peer involved (sender or receiver, NULL
396  *        for loopback messages where we are both sender and receiver)
397  * @param message the actual message
398  * @param atsi performance information
399  * @return GNUNET_OK to keep the connection open,
400  *         GNUNET_SYSERR to close it (signal serious error)
401  */
402 int
403 GSF_handle_p2p_migration_stop_ (void *cls,
404                                 const struct GNUNET_PeerIdentity *other,
405                                 const struct GNUNET_MessageHeader *message,
406                                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
407 {
408   struct GSF_ConnectedPeer *cp; 
409   const struct MigrationStopMessage *msm;
410
411   msm = (const struct MigrationStopMessage*) message;
412   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
413                                           &other->hashPubKey);
414   if (cp == NULL)
415     {
416       GNUNET_break (0);
417       return GNUNET_OK;
418     }
419   cp->ppd.migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
420   update_atsi (cp, atsi);
421   return GNUNET_OK;
422 }
423
424
425 /**
426  * Function called if there has been a timeout trying to satisfy
427  * a transmission request.
428  *
429  * @param cls the 'struct GSF_PeerTransmitHandle' of the request 
430  * @param tc scheduler context
431  */
432 static void
433 peer_transmit_timeout (void *cls,
434                        const struct GNUNET_SCHEDULER_TaskContext *tc)
435 {
436   struct GSF_PeerTransmitHandle *pth = cls;
437   struct GSF_ConnectedPeer *cp;
438   
439   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
440   cp = pth->cp;
441   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
442                                cp->pth_tail,
443                                pth);
444   if (pth->is_query)
445     GNUNET_assert (0 < cp->ppd.pending_queries--);    
446   else
447     GNUNET_assert (0 < cp->ppd.pending_replies--);
448   GNUNET_LOAD_update (cp->ppd.transmission_delay,
449                       UINT64_MAX);
450   pth->gmc (pth->gmc_cls, 
451             0, NULL);
452   GNUNET_free (pth);
453 }
454
455
456 /**
457  * Transmit a message to the given peer as soon as possible.
458  * If the peer disconnects before the transmission can happen,
459  * the callback is invoked with a 'NULL' buffer.
460  *
461  * @param peer target peer
462  * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO)
463  * @param priority how important is this request?
464  * @param timeout when does this request timeout (call gmc with error)
465  * @param size number of bytes we would like to send to the peer
466  * @param gmc function to call to get the message
467  * @param gmc_cls closure for gmc
468  * @return handle to cancel request
469  */
470 struct GSF_PeerTransmitHandle *
471 GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
472                     int is_query,
473                     uint32_t priority,
474                     struct GNUNET_TIME_Relative timeout,
475                     size_t size,
476                     GSF_GetMessageCallback gmc,
477                     void *gmc_cls)
478 {
479   struct GSF_ConnectedPeer *cp;
480   struct GSF_PeerTransmitHandle *pth;
481   struct GSF_PeerTransmitHandle *pos;
482   struct GSF_PeerTransmitHandle *prev;
483   struct GNUNET_PeerIdentity target;
484   uint64_t ip;
485   int is_ready;
486
487   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
488                                           &peer->hashPubKey);
489   GNUNET_assert (NULL != cp);
490   pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
491   pth->transmission_request_start_time = GNUNET_TIME_absolute_now ();
492   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
493   pth->gmc = gmc;
494   pth->gmc_cls = gmc_cls;
495   pth->size = size;
496   pth->is_query = is_query;
497   pth->priority = priority;
498   pth->cp = cp;
499   /* insertion sort (by priority, descending) */
500   prev = NULL;
501   pos = cp->pth_head;
502   while ( (pos != NULL) &&
503           (pos->priority > priority) )
504     {
505       prev = pos;
506       pos = pos->next;
507     }
508   if (prev == NULL)
509     GNUNET_CONTAINER_DLL_insert_head (cp->pth_head,
510                                       cp->pth_tail,
511                                       pth);
512   else
513     GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
514                                        cp->pth_tail,
515                                        prev,
516                                        pth);
517   GNUNET_PEER_resolve (cp->pid,
518                        &target);
519   if (is_query)
520     {
521       /* query, need reservation */
522       if (NULL == cp->irc)
523         {
524           /* reservation already done! */
525           is_ready = GNUNET_YES;
526           ip = cp->inc_preference;
527           cp->inc_preference = 0;
528           cp->irc = GNUNET_CORE_peer_change_preference (core,
529                                                         peer,
530                                                         GNUNET_TIME_UNIT_FOREVER_REL,
531                                                         GNUNET_BANDWIDTH_VALUE_MAX,
532                                                         GNUNET_FS_DBLOCK_SIZE,
533                                                         ip,
534                                                         &core_reserve_callback,
535                                                         cp);      
536         }
537       else
538         {
539           /* still waiting for reservation */
540           is_ready = GNUNET_NO;
541         }
542     }
543   else
544     {
545       /* no reservation needed for content */
546       is_ready = GNUNET_YES;
547     }
548   if (is_ready)
549     {
550       pth->cth = GNUNET_CORE_notify_transmit_ready (core,
551                                                     priority,
552                                                     timeout,
553                                                     &target,
554                                                     size,
555                                                     &peer_transmit_ready_cb,
556                                                     pth);
557       /* pth->cth could be NULL here, that's OK, we'll try again
558          later... */
559     }
560   if (pth->cth == NULL)
561     {
562       /* if we're waiting for reservation OR if we could not do notify_transmit_ready,
563          install a timeout task to be on the safe side */
564       pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
565                                                         &peer_transmit_timeout,
566                                                         pth);
567     }
568   return pth;
569 }
570
571
572 /**
573  * Cancel an earlier request for transmission.
574  *
575  * @param pth request to cancel
576  */
577 void
578 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
579 {
580   struct GSF_PeerTransmitHandle *pth = cls;
581   struct GSF_ConnectedPeer *cp;
582
583   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
584     {
585       GNUNET_SCHEDULER_cancel (pth->timeout_task);
586       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
587     }
588   if (NULL != pth->cth)
589     {
590       GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
591       pth->cth = NULL;
592     }
593   cp = pth->cp;
594   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
595                                cp->pth_tail,
596                                pth);
597   if (pth->is_query)
598     GNUNET_assert (0 < cp->ppd.pending_queries--);    
599   else
600     GNUNET_assert (0 < cp->ppd.pending_replies--);
601   GNUNET_free (pth);
602 }
603
604
605 /**
606  * Report on receiving a reply; update the performance record of the given peer.
607  *
608  * @param cp responding peer (will be updated)
609  * @param request_time time at which the original query was transmitted
610  * @param request_priority priority of the original request
611  * @param initiator_client local client on responsible for query (or NULL)
612  * @param initiator_peer other peer responsible for query (or NULL)
613  */
614 void
615 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
616                               struct GNUNET_TIME_Absolute request_time,
617                               uint32_t request_priority,
618                               const struct GSF_LocalClient *initiator_client,
619                               const struct GSF_ConnectedPeer *initiator_peer)
620 {
621   struct GNUNET_TIME_Relative delay;
622   unsigned int i;
623
624   delay = GNUNET_TIME_absolute_get_duration (request_time);  
625   cp->ppd.avg_reply_delay = (cp->ppd.avg_reply_delay * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
626   cp->ppd.avg_priority = (cp->avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
627   if (NULL != initiator_client)
628     {
629       cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
630     }
631   else if (NULL != initiator_peer)
632     {
633       GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
634       cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid;
635       GNUNET_PEER_change_rc (initiator_peer->pid, 1);
636     }
637   else
638     GNUNET_break (0);
639 }
640
641
642 /**
643  * Method called whenever a given peer has a status change.
644  *
645  * @param cls closure
646  * @param peer peer identity this notification is about
647  * @param bandwidth_in available amount of inbound bandwidth
648  * @param bandwidth_out available amount of outbound bandwidth
649  * @param timeout absolute time when this peer will time out
650  *        unless we see some further activity from it
651  * @param atsi status information
652  */
653 void
654 GSF_peer_status_handler_ (void *cls,
655                           const struct GNUNET_PeerIdentity *peer,
656                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
657                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
658                           struct GNUNET_TIME_Absolute timeout,
659                           const struct GNUNET_TRANSPORT_ATS_Information *atsi)
660 {
661   struct GSF_ConnectedPeer *cp;
662
663   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
664                                           &peer->hashPubKey);
665   GNUNET_assert (NULL != cp);
666   update_atsi (cp, atsi);
667 }
668
669
670 /**
671  * A peer disconnected from us.  Tear down the connected peer
672  * record.
673  *
674  * @param cls unused
675  * @param peer identity of peer that connected
676  */
677 void
678 GSF_peer_disconnect_handler_ (void *cls,
679                               const struct GNUNET_PeerIdentity *peer)
680 {
681   struct GSF_ConnectedPeer *cp;
682   struct GSF_PeerTransmitHandle *pth;
683
684   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
685                                           &peer->hashPubKey);
686   GNUNET_assert (NULL != cp);
687   GNUNET_CONTAINER_multihashmap_remove (cp_map,
688                                         &peer->hashPubKey,
689                                         cp);
690   if (NULL != cp->irc)
691     {
692       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
693       cp->irc = NULL;
694     }
695   GSF_plan_notify_peer_disconnect_ (cp);
696   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
697   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
698   while (NULL != (pth = cp->pth_head))
699     {
700       if (NULL != pth->th)
701         {
702           GNUNET_CORE_notify_transmit_ready_cancel (pth->th);
703           pth->th = NULL;
704         }
705       GNUNET_CONTAINER_DLL_remove (cp->pth_head,
706                                    cp->pth_tail,
707                                    pth);
708       GNUNET_free (pth);
709     }
710   GNUNET_PEER_change_rc (cp->pid, -1);
711   GNUNET_free (cp);
712 }
713
714
715 /**
716  * Closure for 'call_iterator'.
717  */
718 struct IterationContext
719 {
720   /**
721    * Function to call on each entry.
722    */
723   GSF_ConnectedPeerIterator it;
724
725   /**
726    * Closure for 'it'.
727    */
728   void *it_cls;
729 };
730
731
732 /**
733  * Function that calls the callback for each peer.
734  *
735  * @param cls the 'struct IterationContext*'
736  * @param key identity of the peer
737  * @param value the 'struct GSF_ConnectedPeer*'
738  * @return GNUNET_YES to continue iteration
739  */
740 static int
741 call_iterator (void *cls,
742                const GNUNET_HashCode *key,
743                void *value)
744 {
745   struct IterationContext *ic = cls;
746   struct GSF_ConnectedPeer *cp = value;
747   
748   ic->it (ic->it_cls,
749           (const struct GNUNET_PeerIdentity*) key,
750           cp,
751           &cp->ppd);
752   return GNUNET_YES;
753 }
754
755
756 /**
757  * Iterate over all connected peers.
758  *
759  * @param it function to call for each peer
760  * @param it_cls closure for it
761  */
762 void
763 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
764                               void *it_cls)
765 {
766   struct IterationContext ic;
767
768   ic.it = it;
769   ic.it_cls = it_cls;
770   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
771                                          &call_iterator,
772                                          &ic);
773 }
774
775
776 /**
777  * Write host-trust information to a file - flush the buffer entry!
778  *
779  * @param cls closure, not used
780  * @param key host identity
781  * @param value the 'struct GSF_ConnectedPeer' to flush
782  * @return GNUNET_OK to continue iteration
783  */
784 static int
785 flush_trust (void *cls,
786              const GNUNET_HashCode *key,
787              void *value)
788 {
789   struct GSF_ConnectedPeer *cp = value;
790   char *fn;
791   uint32_t trust;
792   struct GNUNET_PeerIdentity pid;
793
794   if (cp->trust == cp->disk_trust)
795     return GNUNET_OK;                     /* unchanged */
796   GNUNET_PEER_resolve (cp->pid,
797                        &pid);
798   fn = get_trust_filename (&pid);
799   if (cp->trust == 0)
800     {
801       if ((0 != UNLINK (fn)) && (errno != ENOENT))
802         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
803                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
804     }
805   else
806     {
807       trust = htonl (cp->trust);
808       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
809                                                     sizeof(uint32_t),
810                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
811                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
812         cp->disk_trust = cp->trust;
813     }
814   GNUNET_free (fn);
815   return GNUNET_OK;
816 }
817
818
819 /**
820  * Notify core about a preference we have for the given peer
821  * (to allocate more resources towards it).  The change will
822  * be communicated the next time we reserve bandwidth with
823  * core (not instantly).
824  *
825  * @param cp peer to reserve bandwidth from
826  * @param pref preference change
827  */
828 void
829 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
830                                        uint64_t pref)
831 {
832   cp->inc_preference += pref;
833 }
834
835
836 /**
837  * Call this method periodically to flush trust information to disk.
838  *
839  * @param cls closure, not used
840  * @param tc task context, not used
841  */
842 static void
843 cron_flush_trust (void *cls,
844                   const struct GNUNET_SCHEDULER_TaskContext *tc)
845 {
846
847   if (NULL == cp_map)
848     return;
849   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
850                                          &flush_trust,
851                                          NULL);
852   if (NULL == tc)
853     return;
854   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
855     return;
856   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, 
857                                 &cron_flush_trust, 
858                                 NULL);
859 }
860
861
862 /**
863  * Initialize peer management subsystem.
864  *
865  * @param cfg configuration to use
866  */
867 void
868 GSF_connected_peer_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
869 {
870   cp_map = GNUNET_CONTAINER_multihashmap_create (128);
871   GNUNET_assert (GNUNET_OK ==
872                  GNUNET_CONFIGURATION_get_value_filename (cfg,
873                                                           "fs",
874                                                           "TRUST",
875                                                           &trustDirectory));
876   GNUNET_DISK_directory_create (trustDirectory);
877   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
878                                       &cron_flush_trust, NULL);
879 }
880
881
882 /**
883  * Iterator to free peer entries.
884  *
885  * @param cls closure, unused
886  * @param key current key code
887  * @param value value in the hash map (peer entry)
888  * @return GNUNET_YES (we should continue to iterate)
889  */
890 static int 
891 clean_peer (void *cls,
892             const GNUNET_HashCode * key,
893             void *value)
894 {
895   GSF_peer_disconnect_handler_ (NULL, 
896                                 (const struct GNUNET_PeerIdentity*) key);
897   return GNUNET_YES;
898 }
899
900
901 /**
902  * Shutdown peer management subsystem.
903  */
904 void
905 GSF_connected_peer_done_ ()
906 {
907   cron_flush_trust (NULL, NULL);
908   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
909                                          &clean_peer,
910                                          NULL);
911   GNUNET_CONTAINER_multihashmap_destroy (cp_map);
912   cp_map = NULL;
913   GNUNET_free (trustDirectory);
914   trustDirectory = NULL;
915 }
916
917
918 /**
919  * Iterator to remove references to LC entry.
920  *
921  * @param the 'struct GSF_LocalClient*' to look for
922  * @param key current key code
923  * @param value value in the hash map (peer entry)
924  * @return GNUNET_YES (we should continue to iterate)
925  */
926 static int 
927 clean_peer (void *cls,
928             const GNUNET_HashCode * key,
929             void *value)
930 {
931   const struct GSF_LocalClient *lc = cls;
932   struct GSF_ConnectedPeer *cp = value;
933   unsigned int i;
934
935   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
936     if (cp->ppd.last_client_replies[i] == lc)
937       cp->ppd.last_client_replies[i] = NULL;
938   return GNUNET_YES;
939 }
940
941
942 /**
943  * Notification that a local client disconnected.  Clean up all of our
944  * references to the given handle.
945  *
946  * @param lc handle to the local client (henceforth invalid)
947  */
948 void
949 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
950 {
951   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
952                                          &clean_local_client,
953                                          (void*) lc);
954 }
955
956
957 #endif
958 /* end of gnunet-service-fs_cp.h */