wip
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_pe.h"
31 #include "gnunet-service-fs_pr.h"
32 #include "gnunet-service-fs_push.h"
33
34 /**
35  * How often do we flush trust values to disk?
36  */
37 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
38
39 /**
40  * After how long do we discard a reply?
41  */
42 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
43
44
45 /**
46  * Handle to cancel a transmission request.
47  */
48 struct GSF_PeerTransmitHandle
49 {
50
51   /**
52    * Kept in a doubly-linked list.
53    */
54   struct GSF_PeerTransmitHandle *next;
55
56   /**
57    * Kept in a doubly-linked list.
58    */
59   struct GSF_PeerTransmitHandle *prev;
60
61   /**
62    * Handle for an active request for transmission to this
63    * peer, or NULL (if core queue was full).
64    */
65   struct GNUNET_CORE_TransmitHandle *cth;
66
67   /**
68    * Time when this transmission request was issued.
69    */
70   struct GNUNET_TIME_Absolute transmission_request_start_time;
71
72   /**
73    * Timeout for this request.
74    */
75   struct GNUNET_TIME_Absolute timeout;
76
77   /**
78    * Task called on timeout, or 0 for none.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
81
82   /**
83    * Function to call to get the actual message.
84    */
85   GSF_GetMessageCallback gmc;
86
87   /**
88    * Peer this request targets.
89    */
90   struct GSF_ConnectedPeer *cp;
91
92   /**
93    * Closure for 'gmc'.
94    */
95   void *gmc_cls;
96
97   /**
98    * Size of the message to be transmitted.
99    */
100   size_t size;
101
102   /**
103    * GNUNET_YES if this is a query, GNUNET_NO for content.
104    */
105   int is_query;
106
107   /**
108    * Did we get a reservation already?
109    */
110   int was_reserved;
111
112   /**
113    * Priority of this request.
114    */
115   uint32_t priority;
116
117 };
118
119
120 /**
121  * Information per peer and request.
122  */
123 struct PeerRequest
124 {
125
126   /**
127    * Handle to generic request.
128    */
129   struct GSF_PendingRequest *pr;
130   
131   /**
132    * Handle to specific peer.
133    */
134   struct GSF_ConnectedPeer *cp;
135
136   /**
137    * Task for asynchronous stopping of this request.
138    */
139   GNUNET_SCHEDULER_TaskIdentifier kill_task;
140
141 };
142
143
144 /**
145  * A connected peer.
146  */
147 struct GSF_ConnectedPeer 
148 {
149
150   /**
151    * Performance data for this peer.
152    */
153   struct GSF_PeerPerformanceData ppd;
154
155   /**
156    * Time until when we blocked this peer from migrating
157    * data to us.
158    */
159   struct GNUNET_TIME_Absolute last_migration_block;
160
161   /**
162    * Task scheduled to revive migration to this peer.
163    */
164   GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
165
166   /**
167    * Messages (replies, queries, content migration) we would like to
168    * send to this peer in the near future.  Sorted by priority, head.
169    */
170   struct GSF_PeerTransmitHandle *pth_head;
171
172   /**
173    * Messages (replies, queries, content migration) we would like to
174    * send to this peer in the near future.  Sorted by priority, tail.
175    */
176   struct GSF_PeerTransmitHandle *pth_tail;
177
178   /**
179    * Migration stop message in our queue, or NULL if we have none pending.
180    */
181   struct GSF_PeerTransmitHandle *migration_pth;
182
183   /**
184    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
185    */
186   struct GNUNET_CORE_InformationRequestContext *irc;
187
188   /**
189    * Task scheduled if we need to retry bandwidth reservation later.
190    */
191   GNUNET_SCHEDULER_TaskIdentifier irc_delay_task;
192
193   /**
194    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
195    */
196   struct GNUNET_CONTAINER_MultiHashMap *request_map;
197
198   /**
199    * Increase in traffic preference still to be submitted
200    * to the core service for this peer.
201    */
202   uint64_t inc_preference;
203
204   /**
205    * Trust rating for this peer on disk.
206    */
207   uint32_t disk_trust;
208
209   /**
210    * Which offset in "last_p2p_replies" will be updated next?
211    * (we go round-robin).
212    */
213   unsigned int last_p2p_replies_woff;
214
215   /**
216    * Which offset in "last_client_replies" will be updated next?
217    * (we go round-robin).
218    */
219   unsigned int last_client_replies_woff;
220
221   /**
222    * Current offset into 'last_request_times' ring buffer.
223    */
224   unsigned int last_request_times_off;
225
226   /**
227    * GNUNET_YES if we did successfully reserve 32k bandwidth,
228    * GNUNET_NO if not.
229    */
230   int did_reserve;
231
232 };
233
234
235 /**
236  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
237  */
238 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
239
240 /**
241  * Where do we store trust information?
242  */
243 static char *trustDirectory;
244
245
246 /**
247  * Get the filename under which we would store the GNUNET_HELLO_Message
248  * for the given host and protocol.
249  * @return filename of the form DIRECTORY/HOSTID
250  */
251 static char *
252 get_trust_filename (const struct GNUNET_PeerIdentity *id)
253 {
254   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
255   char *fn;
256
257   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
258   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
259   return fn;
260 }
261
262
263 /**
264  * Find latency information in 'atsi'.
265  *
266  * @param atsi performance data
267  * @return connection latency
268  */
269 static struct GNUNET_TIME_Relative
270 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
271 {
272   if (atsi == NULL)
273     return GNUNET_TIME_UNIT_SECONDS;
274   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
275           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
276     atsi++;
277   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
278     {
279       GNUNET_break (0);
280       /* how can we not have latency data? */
281       return GNUNET_TIME_UNIT_SECONDS;
282     }
283   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
284                                         ntohl (atsi->value));
285 }
286
287
288 /**
289  * Update the performance information kept for the given peer.
290  *
291  * @param cp peer record to update
292  * @param atsi transport performance data
293  */
294 static void
295 update_atsi (struct GSF_ConnectedPeer *cp,
296              const struct GNUNET_TRANSPORT_ATS_Information *atsi)
297 {
298   struct GNUNET_TIME_Relative latency;
299
300   latency = get_latency (atsi);
301   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
302                                  latency);
303   /* LATER: merge atsi into cp's performance data (if we ever care...) */
304 }
305
306
307 /**
308  * Return the performance data record for the given peer
309  * 
310  * @param cp peer to query
311  * @return performance data record for the peer
312  */
313 struct GSF_PeerPerformanceData *
314 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
315 {
316   return &cp->ppd;
317 }
318
319
320 /**
321  * Core is ready to transmit to a peer, get the message.
322  *
323  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
324  * @param size number of bytes core is willing to take
325  * @param buf where to copy the message
326  * @return number of bytes copied to buf
327  */
328 static size_t
329 peer_transmit_ready_cb (void *cls,
330                         size_t size,
331                         void *buf);
332
333
334
335
336 /**
337  * Function called by core upon success or failure of our bandwidth reservation request.
338  *
339  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
340  * @param peer identifies the peer
341  * @param bandwidth_out available amount of outbound bandwidth
342  * @param amount set to the amount that was actually reserved or unreserved;
343  *               either the full requested amount or zero (no partial reservations)
344  * @param res_delay if the reservation could not be satisfied (amount was 0), how
345  *        long should the client wait until re-trying?
346  * @param preference current traffic preference for the given peer
347  */
348 static void
349 core_reserve_callback (void *cls,
350                        const struct GNUNET_PeerIdentity *peer,
351                        struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
352                        int32_t amount,
353                        struct GNUNET_TIME_Relative res_delay,
354                        uint64_t preference);
355
356
357 /**
358  * If ready (bandwidth reserved), try to schedule transmission via
359  * core for the given handle.
360  *
361  * @param pth transmission handle to schedule
362  */
363 static void
364 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
365 {
366   struct GSF_ConnectedPeer *cp;
367   struct GNUNET_PeerIdentity target;
368   uint64_t ip;
369
370   if (NULL != pth->cth)
371     return; /* already done */
372   cp = pth->cp;
373   GNUNET_PEER_resolve (cp->ppd.pid,
374                        &target);
375   if ( (GNUNET_YES == pth->is_query) &&
376        (GNUNET_YES != pth->was_reserved) )
377     {
378       /* query, need reservation */
379       if (GNUNET_YES != cp->did_reserve)
380         return; /* not ready */
381       cp->did_reserve = GNUNET_NO;
382       /* reservation already done! */
383       pth->was_reserved = GNUNET_YES;
384       ip = cp->inc_preference;
385       cp->inc_preference = 0;
386       cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
387                                                     &target,
388                                                     GNUNET_TIME_UNIT_FOREVER_REL,
389                                                     GNUNET_BANDWIDTH_VALUE_MAX,
390                                                     DBLOCK_SIZE,
391                                                     ip,
392                                                     &core_reserve_callback,
393                                                     cp);          
394     }
395   pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
396                                                 GNUNET_YES,
397                                                 pth->priority,
398                                                 GNUNET_TIME_absolute_get_remaining (pth->timeout),
399                                                 &target,
400                                                 pth->size,
401                                                 &peer_transmit_ready_cb,
402                                                 pth);
403 }
404
405
406 /**
407  * Core is ready to transmit to a peer, get the message.
408  *
409  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
410  * @param size number of bytes core is willing to take
411  * @param buf where to copy the message
412  * @return number of bytes copied to buf
413  */
414 static size_t
415 peer_transmit_ready_cb (void *cls,
416                         size_t size,
417                         void *buf)
418 {
419   struct GSF_PeerTransmitHandle *pth = cls;
420   struct GSF_ConnectedPeer *cp;
421   size_t ret;
422
423   pth->cth = NULL;
424   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
425     {
426       GNUNET_SCHEDULER_cancel (pth->timeout_task);
427       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
428     }
429   cp = pth->cp;
430   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
431                                cp->pth_tail,
432                                pth);
433   if (GNUNET_YES == pth->is_query)
434     {
435       cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
436       GNUNET_assert (0 < cp->ppd.pending_queries--);    
437     }
438   else if (GNUNET_NO == pth->is_query)
439     {
440       GNUNET_assert (0 < cp->ppd.pending_replies--);
441     }
442   GNUNET_LOAD_update (cp->ppd.transmission_delay,
443                       GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value);
444   ret = pth->gmc (pth->gmc_cls, 
445                   size, buf);
446   GNUNET_free (pth);
447   for (pth = cp->pth_head; pth != NULL; pth = pth->next)
448     schedule_transmission (pth);
449   return ret;
450 }
451
452
453 /**
454  * (re)try to reserve bandwidth from the given peer.
455  *
456  * @param cls the 'struct GSF_ConnectedPeer' to reserve from
457  * @param tc scheduler context
458  */
459 static void
460 retry_reservation (void *cls,
461                    const struct GNUNET_SCHEDULER_TaskContext *tc)
462 {
463   struct GSF_ConnectedPeer *cp = cls;
464   uint64_t ip;
465   struct GNUNET_PeerIdentity target;
466
467   GNUNET_PEER_resolve (cp->ppd.pid,
468                        &target);
469   cp->irc_delay_task = GNUNET_SCHEDULER_NO_TASK;
470   ip = cp->inc_preference;
471   cp->inc_preference = 0;
472   cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
473                                                 &target,
474                                                 GNUNET_TIME_UNIT_FOREVER_REL,
475                                                 GNUNET_BANDWIDTH_VALUE_MAX,
476                                                 DBLOCK_SIZE,
477                                                 ip,
478                                                 &core_reserve_callback,
479                                                 cp);
480 }
481
482
483 /**
484  * Function called by core upon success or failure of our bandwidth reservation request.
485  *
486  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
487  * @param peer identifies the peer
488  * @param bandwidth_out available amount of outbound bandwidth
489  * @param amount set to the amount that was actually reserved or unreserved;
490  *               either the full requested amount or zero (no partial reservations)
491  * @param res_delay if the reservation could not be satisfied (amount was 0), how
492  *        long should the client wait until re-trying?
493  * @param preference current traffic preference for the given peer
494  */
495 static void
496 core_reserve_callback (void *cls,
497                        const struct GNUNET_PeerIdentity *peer,
498                        struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
499                        int32_t amount,
500                        struct GNUNET_TIME_Relative res_delay,
501                        uint64_t preference)
502 {
503   struct GSF_ConnectedPeer *cp = cls;
504   struct GSF_PeerTransmitHandle *pth;
505
506   cp->irc = NULL;
507   if (0 == amount)
508     {
509       cp->irc_delay_task = GNUNET_SCHEDULER_add_delayed (res_delay,
510                                                          &retry_reservation,
511                                                          cp);
512       return;
513     }
514   cp->did_reserve = GNUNET_YES;
515   pth = cp->pth_head;
516   if ( (NULL != pth) &&
517        (NULL == pth->cth) )
518     {
519       /* reservation success, try transmission now! */
520       pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
521                                                     GNUNET_YES,
522                                                     pth->priority,
523                                                     GNUNET_TIME_absolute_get_remaining (pth->timeout),
524                                                     peer,
525                                                     pth->size,
526                                                     &peer_transmit_ready_cb,
527                                                     pth);
528     }
529 }
530
531
532 /**
533  * A peer connected to us.  Setup the connected peer
534  * records.
535  *
536  * @param peer identity of peer that connected
537  * @param atsi performance data for the connection
538  * @return handle to connected peer entry
539  */
540 struct GSF_ConnectedPeer *
541 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
542                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
543 {
544   struct GSF_ConnectedPeer *cp;
545   char *fn;
546   uint32_t trust;
547
548   cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
549   cp->ppd.pid = GNUNET_PEER_intern (peer);
550   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
551   cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
552                                                 peer,
553                                                 GNUNET_TIME_UNIT_FOREVER_REL,
554                                                 GNUNET_BANDWIDTH_VALUE_MAX,
555                                                 DBLOCK_SIZE,
556                                                 0,
557                                                 &core_reserve_callback,
558                                                 cp);
559   fn = get_trust_filename (peer);
560   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
561       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
562     cp->disk_trust = cp->ppd.trust = ntohl (trust);
563   GNUNET_free (fn);
564   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
565   GNUNET_break (GNUNET_OK ==
566                 GNUNET_CONTAINER_multihashmap_put (cp_map,
567                                                    &peer->hashPubKey,
568                                                    cp,
569                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
570   update_atsi (cp, atsi);
571   GSF_push_start_ (cp);
572   return cp;
573 }
574
575
576 /**
577  * It may be time to re-start migrating content to this
578  * peer.  Check, and if so, restart migration.
579  *
580  * @param cls the 'struct GSF_ConnectedPeer'
581  * @param tc scheduler context
582  */
583 static void
584 revive_migration (void *cls,
585                   const struct GNUNET_SCHEDULER_TaskContext *tc)
586 {
587   struct GSF_ConnectedPeer *cp = cls;
588   struct GNUNET_TIME_Relative bt;
589   
590   cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
591   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
592   if (0 != bt.rel_value)
593     {
594       /* still time left... */
595       cp->mig_revive_task 
596         = GNUNET_SCHEDULER_add_delayed (bt,
597                                         &revive_migration,
598                                         cp);
599       return;
600     }
601   GSF_push_start_ (cp);
602 }
603
604
605 /**
606  * Get a handle for a connected peer.
607  *
608  * @param peer peer's identity
609  * @return NULL if the peer is not currently connected
610  */
611 struct GSF_ConnectedPeer *
612 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
613 {
614   return GNUNET_CONTAINER_multihashmap_get (cp_map,
615                                             &peer->hashPubKey);
616 }
617
618
619 /**
620  * Handle P2P "MIGRATION_STOP" message.
621  *
622  * @param cls closure, always NULL
623  * @param other the other peer involved (sender or receiver, NULL
624  *        for loopback messages where we are both sender and receiver)
625  * @param message the actual message
626  * @param atsi performance information
627  * @return GNUNET_OK to keep the connection open,
628  *         GNUNET_SYSERR to close it (signal serious error)
629  */
630 int
631 GSF_handle_p2p_migration_stop_ (void *cls,
632                                 const struct GNUNET_PeerIdentity *other,
633                                 const struct GNUNET_MessageHeader *message,
634                                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
635 {
636   struct GSF_ConnectedPeer *cp; 
637   const struct MigrationStopMessage *msm;
638   struct GNUNET_TIME_Relative bt;
639
640   msm = (const struct MigrationStopMessage*) message;
641   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
642                                           &other->hashPubKey);
643   if (cp == NULL)
644     {
645       GNUNET_break (0);
646       return GNUNET_OK;
647     }
648   bt = GNUNET_TIME_relative_ntoh (msm->duration);
649   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
650               _("Migration of content to peer `%s' blocked for %llu ms\n"),
651               GNUNET_i2s (other),
652               (unsigned long long) bt.rel_value);
653   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
654   if (cp->mig_revive_task == GNUNET_SCHEDULER_NO_TASK)
655     {
656       GSF_push_stop_ (cp);
657       cp->mig_revive_task 
658         = GNUNET_SCHEDULER_add_delayed (bt,
659                                         &revive_migration,
660                                         cp);
661     }
662   update_atsi (cp, atsi);
663   return GNUNET_OK;
664 }
665
666
667 /**
668  * Copy reply and free put message.
669  *
670  * @param cls the 'struct PutMessage'
671  * @param buf_size number of bytes available in buf
672  * @param buf where to copy the message, NULL on error (peer disconnect)
673  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
674  */
675 static size_t 
676 copy_reply (void *cls,
677             size_t buf_size,
678             void *buf)
679 {
680   struct PutMessage *pm = cls;
681   size_t size;
682
683   if (buf != NULL)
684     {
685       GNUNET_assert (buf_size >= ntohs (pm->header.size));
686       size = ntohs (pm->header.size);
687       memcpy (buf, pm, size); 
688       GNUNET_STATISTICS_update (GSF_stats,
689                                 gettext_noop ("# replies transmitted to other peers"),
690                                 1,
691                                 GNUNET_NO); 
692     }
693   else
694     {
695       size = 0;
696       GNUNET_STATISTICS_update (GSF_stats,
697                                 gettext_noop ("# replies dropped"),
698                                 1,
699                                 GNUNET_NO); 
700     }
701   GNUNET_free (pm);
702   return size;
703 }
704
705
706 /**
707  * Free the given client request.
708  *
709  * @param cls the client request to free
710  * @param tc task context
711  */ 
712 static void
713 peer_request_destroy (void *cls,
714                       const struct GNUNET_SCHEDULER_TaskContext *tc)
715 {
716   struct PeerRequest *peerreq = cls;
717   struct GSF_PendingRequest *pr = peerreq->pr;
718   struct GSF_ConnectedPeer *cp = peerreq->cp;
719   struct GSF_PendingRequestData *prd;
720
721   peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
722   prd = GSF_pending_request_get_data_ (pr);
723   GNUNET_STATISTICS_update (GSF_stats,
724                             gettext_noop ("# P2P searches active"),
725                             -1,
726                             GNUNET_NO);
727   GNUNET_break (GNUNET_OK ==
728                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
729                                                       &prd->query,
730                                                       peerreq));
731   GSF_pending_request_cancel_ (pr);
732   GNUNET_free (peerreq);
733 }
734
735
736 /**
737  * Handle a reply to a pending request.  Also called if a request
738  * expires (then with data == NULL).  The handler may be called
739  * many times (depending on the request type), but will not be
740  * called during or after a call to GSF_pending_request_cancel 
741  * and will also not be called anymore after a call signalling
742  * expiration.
743  *
744  * @param cls 'struct PeerRequest' this is an answer for
745  * @param eval evaluation of the result
746  * @param pr handle to the original pending request
747  * @param expiration when does 'data' expire?
748  * @param type type of the block
749  * @param data response data, NULL on request expiration
750  * @param data_len number of bytes in data
751  */
752 static void
753 handle_p2p_reply (void *cls,
754                   enum GNUNET_BLOCK_EvaluationResult eval,
755                   struct GSF_PendingRequest *pr,
756                   struct GNUNET_TIME_Absolute expiration,
757                   enum GNUNET_BLOCK_Type type,
758                   const void *data,
759                   size_t data_len)
760 {
761   struct PeerRequest *peerreq = cls;
762   struct GSF_ConnectedPeer *cp = peerreq->cp;
763   struct GSF_PendingRequestData *prd;
764   struct PutMessage *pm;
765   size_t msize;
766
767   GNUNET_assert (data_len < GNUNET_SERVER_MAX_MESSAGE_SIZE);
768   GNUNET_assert (peerreq->pr == pr);
769   prd = GSF_pending_request_get_data_ (pr);
770   if (NULL == data)
771     {
772       GNUNET_STATISTICS_update (GSF_stats,
773                                 gettext_noop ("# P2P searches active"),
774                                 -1,
775                                 GNUNET_NO);
776       GNUNET_break (GNUNET_OK ==
777                     GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
778                                                           &prd->query,
779                                                           peerreq));
780       GNUNET_free (peerreq);
781       return;
782     }  
783   GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY);
784   if ( (prd->type != type) &&
785        (prd->type != GNUNET_BLOCK_TYPE_ANY) )
786     {
787       GNUNET_break (0);
788       return;
789     }
790 #if DEBUG_FS
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "Transmitting result for query `%s' to peer\n",
793               GNUNET_h2s (&prd->query));
794 #endif  
795   GNUNET_STATISTICS_update (GSF_stats,
796                             gettext_noop ("# replies received for other peers"),
797                             1,
798                             GNUNET_NO); 
799   msize = sizeof (struct PutMessage) + data_len;
800   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
801     {
802       GNUNET_break (0);
803       return;
804     }
805   pm = GNUNET_malloc (msize);
806   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
807   pm->header.size = htons (msize);
808   pm->type = htonl (type);
809   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
810   memcpy (&pm[1], data, data_len);
811   (void) GSF_peer_transmit_ (cp, GNUNET_NO,
812                              UINT32_MAX,
813                              REPLY_TIMEOUT,
814                              msize,
815                              &copy_reply,
816                              pm);
817   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
818     return;
819   peerreq->kill_task = GNUNET_SCHEDULER_add_now (&peer_request_destroy,
820                                                  peerreq);
821 }
822
823
824 /**
825  * Increase the host credit by a value.
826  *
827  * @param cp which peer to change the trust value on
828  * @param value is the int value by which the
829  *  host credit is to be increased or decreased
830  * @returns the actual change in trust (positive or negative)
831  */
832 static int
833 change_host_trust (struct GSF_ConnectedPeer *cp, int value)
834 {
835   if (value == 0)
836     return 0;
837   GNUNET_assert (cp != NULL);
838   if (value > 0)
839     {
840       if (cp->ppd.trust + value < cp->ppd.trust)
841         {
842           value = UINT32_MAX - cp->ppd.trust;
843           cp->ppd.trust = UINT32_MAX;
844         }
845       else
846         cp->ppd.trust += value;
847     }
848   else
849     {
850       if (cp->ppd.trust < -value)
851         {
852           value = -cp->ppd.trust;
853           cp->ppd.trust = 0;
854         }
855       else
856         cp->ppd.trust += value;
857     }
858   return value;
859 }
860
861
862 /**
863  * We've received a request with the specified priority.  Bound it
864  * according to how much we trust the given peer.
865  * 
866  * @param prio_in requested priority
867  * @param cp the peer making the request
868  * @return effective priority
869  */
870 static int32_t
871 bound_priority (uint32_t prio_in,
872                 struct GSF_ConnectedPeer *cp)
873 {
874 #define N ((double)128.0)
875   uint32_t ret;
876   double rret;
877   int ld;
878
879   ld = GSF_test_get_load_too_high_ (0);
880   if (ld == GNUNET_SYSERR)
881     {
882       GNUNET_STATISTICS_update (GSF_stats,
883                                 gettext_noop ("# requests done for free (low load)"),
884                                 1,
885                                 GNUNET_NO);
886       return 0; /* excess resources */
887     }
888   if (prio_in > INT32_MAX)
889     prio_in = INT32_MAX;
890   ret = - change_host_trust (cp, - (int) prio_in);
891   if (ret > 0)
892     {
893       if (ret > GSF_current_priorities + N)
894         rret = GSF_current_priorities + N;
895       else
896         rret = ret;
897       GSF_current_priorities 
898         = (GSF_current_priorities * (N-1) + rret)/N;
899     }
900   if ( (ld == GNUNET_YES) && (ret > 0) )
901     {
902       /* try with charging */
903       ld = GSF_test_get_load_too_high_ (ret);
904     }
905   if (ld == GNUNET_YES)
906     {
907       GNUNET_STATISTICS_update (GSF_stats,
908                                 gettext_noop ("# request dropped, priority insufficient"),
909                                 1,
910                                 GNUNET_NO);
911       /* undo charge */
912       change_host_trust (cp, (int) ret);
913       return -1; /* not enough resources */
914     }
915   else
916     {
917       GNUNET_STATISTICS_update (GSF_stats,
918                                 gettext_noop ("# requests done for a price (normal load)"),
919                                 1,
920                                 GNUNET_NO);
921     }
922 #undef N
923   return ret;
924 }
925
926
927 /**
928  * The priority level imposes a bound on the maximum
929  * value for the ttl that can be requested.
930  *
931  * @param ttl_in requested ttl
932  * @param prio given priority
933  * @return ttl_in if ttl_in is below the limit,
934  *         otherwise the ttl-limit for the given priority
935  */
936 static int32_t
937 bound_ttl (int32_t ttl_in, uint32_t prio)
938 {
939   unsigned long long allowed;
940
941   if (ttl_in <= 0)
942     return ttl_in;
943   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
944   if (ttl_in > allowed)      
945     {
946       if (allowed >= (1 << 30))
947         return 1 << 30;
948       return allowed;
949     }
950   return ttl_in;
951 }
952
953
954 /**
955  * Handle P2P "QUERY" message.  Creates the pending request entry
956  * and sets up all of the data structures to that we will
957  * process replies properly.  Does not initiate forwarding or
958  * local database lookups.
959  *
960  * @param other the other peer involved (sender or receiver, NULL
961  *        for loopback messages where we are both sender and receiver)
962  * @param message the actual message
963  * @return pending request handle, NULL on error
964  */
965 struct GSF_PendingRequest *
966 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
967                        const struct GNUNET_MessageHeader *message)
968 {
969   struct PeerRequest *peerreq;
970   struct GSF_PendingRequest *pr;
971   struct GSF_PendingRequestData *prd;
972   struct GSF_ConnectedPeer *cp;
973   struct GSF_ConnectedPeer *cps;
974   const GNUNET_HashCode *namespace;
975   const struct GNUNET_PeerIdentity *target;
976   enum GSF_PendingRequestOptions options;                            
977   uint16_t msize;
978   const struct GetMessage *gm;
979   unsigned int bits;
980   const GNUNET_HashCode *opt;
981   uint32_t bm;
982   size_t bfsize;
983   uint32_t ttl_decrement;
984   int32_t priority;
985   int32_t ttl;
986   enum GNUNET_BLOCK_Type type;
987   GNUNET_PEER_Id spid;
988
989   msize = ntohs(message->size);
990   if (msize < sizeof (struct GetMessage))
991     {
992       GNUNET_break_op (0);
993       return NULL;
994     }
995   gm = (const struct GetMessage*) message;
996   type = ntohl (gm->type);
997   bm = ntohl (gm->hash_bitmap);
998   bits = 0;
999   while (bm > 0)
1000     {
1001       if (1 == (bm & 1))
1002         bits++;
1003       bm >>= 1;
1004     }
1005   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
1006     {
1007       GNUNET_break_op (0);
1008       return NULL;
1009     }  
1010   opt = (const GNUNET_HashCode*) &gm[1];
1011   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
1012   /* bfsize must be power of 2, check! */
1013   if (0 != ( (bfsize - 1) & bfsize))
1014     {
1015       GNUNET_break_op (0);
1016       return NULL;
1017     }
1018   GSF_cover_query_count++;
1019   bm = ntohl (gm->hash_bitmap);
1020   bits = 0;
1021   cps = GNUNET_CONTAINER_multihashmap_get (cp_map,
1022                                            &other->hashPubKey);
1023   if (NULL == cps)
1024     {
1025       /* peer must have just disconnected */
1026       GNUNET_STATISTICS_update (GSF_stats,
1027                                 gettext_noop ("# requests dropped due to initiator not being connected"),
1028                                 1,
1029                                 GNUNET_NO);
1030       return NULL;
1031     }
1032   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1033     cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1034                                             &opt[bits++]);
1035   else
1036     cp = cps;
1037   if (cp == NULL)
1038     {
1039 #if DEBUG_FS
1040       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1041         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1042                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
1043                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
1044       
1045       else
1046         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1047                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
1048                     GNUNET_i2s (other));
1049 #endif
1050       GNUNET_STATISTICS_update (GSF_stats,
1051                                 gettext_noop ("# requests dropped due to missing reverse route"),
1052                                 1,
1053                                 GNUNET_NO);
1054       return NULL;
1055     }
1056   /* note that we can really only check load here since otherwise
1057      peers could find out that we are overloaded by not being
1058      disconnected after sending us a malformed query... */
1059   priority = bound_priority (ntohl (gm->priority), cps);
1060   if (priority < 0)
1061     {
1062 #if DEBUG_FS
1063       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064                   "Dropping query from `%s', this peer is too busy.\n",
1065                   GNUNET_i2s (other));
1066 #endif
1067       return NULL;
1068     }
1069 #if DEBUG_FS 
1070   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
1072               GNUNET_h2s (&gm->query),
1073               (unsigned int) type,
1074               GNUNET_i2s (other),
1075               (unsigned int) bm);
1076 #endif
1077   namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
1078   if ( (type == GNUNET_BLOCK_TYPE_FS_SBLOCK) &&
1079        (namespace == NULL) )
1080     {
1081       GNUNET_break_op (0);
1082       return NULL;
1083     }
1084   if ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) &&
1085        (namespace != NULL) )
1086     {
1087       GNUNET_break_op (0);
1088       return NULL;
1089     }
1090   target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
1091   options = 0;
1092   spid = 0;
1093   if ( (GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority)) ||
1094        (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) > 
1095         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)) )
1096     {
1097       /* don't have BW to send to peer, or would likely take longer than we have for it,
1098          so at best indirect the query */
1099       priority = 0;
1100       options |= GSF_PRO_FORWARD_ONLY;
1101       spid = GNUNET_PEER_intern (other);
1102     }
1103   ttl = bound_ttl (ntohl (gm->ttl), priority);
1104   /* decrement ttl (always) */
1105   ttl_decrement = 2 * TTL_DECREMENT +
1106     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1107                               TTL_DECREMENT);
1108   if ( (ttl < 0) &&
1109        (((int32_t)(ttl - ttl_decrement)) > 0) )
1110     {
1111 #if DEBUG_FS
1112       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1114                   GNUNET_i2s (other),
1115                   ttl,
1116                   ttl_decrement);
1117 #endif
1118       GNUNET_STATISTICS_update (GSF_stats,
1119                                 gettext_noop ("# requests dropped due TTL underflow"),
1120                                 1,
1121                                 GNUNET_NO);
1122       /* integer underflow => drop (should be very rare)! */      
1123       return NULL;
1124     } 
1125   ttl -= ttl_decrement;
1126
1127   /* test if the request already exists */
1128   peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
1129                                                &gm->query);
1130   if (peerreq != NULL) 
1131     {      
1132       pr = peerreq->pr;
1133       prd = GSF_pending_request_get_data_ (pr);
1134       if ( (prd->type == type) &&
1135            ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) ||
1136              (0 == memcmp (&prd->namespace,
1137                            namespace,
1138                            sizeof (GNUNET_HashCode))) ) )
1139         {
1140           if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl)
1141             {
1142               /* existing request has higher TTL, drop new one! */
1143               prd->priority += priority;
1144 #if DEBUG_FS
1145               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1146                           "Have existing request with higher TTL, dropping new request.\n",
1147                           GNUNET_i2s (other));
1148 #endif
1149               GNUNET_STATISTICS_update (GSF_stats,
1150                                         gettext_noop ("# requests dropped due to higher-TTL request"),
1151                                         1,
1152                                         GNUNET_NO);
1153               return NULL;
1154             }
1155           /* existing request has lower TTL, drop old one! */
1156           priority += prd->priority;
1157           GSF_pending_request_cancel_ (pr);
1158           GNUNET_assert (GNUNET_YES ==
1159                          GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
1160                                                                &gm->query,
1161                                                                peerreq));
1162           if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
1163             GNUNET_SCHEDULER_cancel (peerreq->kill_task);
1164           GNUNET_free (peerreq);
1165         }
1166     }
1167   
1168   peerreq = GNUNET_malloc (sizeof (struct PeerRequest));
1169   peerreq->cp = cp; 
1170   pr = GSF_pending_request_create_ (options,
1171                                     type,
1172                                     &gm->query,
1173                                     namespace,
1174                                     target,
1175                                     (bfsize > 0) ? (const char*)&opt[bits] : NULL,
1176                                     bfsize,
1177                                     ntohl (gm->filter_mutator),
1178                                     1 /* anonymity */,
1179                                     (uint32_t) priority,
1180                                     ttl,
1181                                     spid,
1182                                     NULL, 0, /* replies_seen */
1183                                     &handle_p2p_reply,
1184                                     peerreq);
1185   peerreq->pr = pr;
1186   GNUNET_break (GNUNET_OK ==
1187                 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1188                                                    &gm->query,
1189                                                    peerreq,
1190                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1191   GNUNET_STATISTICS_update (GSF_stats,
1192                             gettext_noop ("# P2P searches received"),
1193                             1,
1194                             GNUNET_NO);
1195   GNUNET_STATISTICS_update (GSF_stats,
1196                             gettext_noop ("# P2P searches active"),
1197                             1,
1198                             GNUNET_NO);
1199   return pr;
1200 }
1201
1202
1203 /**
1204  * Function called if there has been a timeout trying to satisfy
1205  * a transmission request.
1206  *
1207  * @param cls the 'struct GSF_PeerTransmitHandle' of the request 
1208  * @param tc scheduler context
1209  */
1210 static void
1211 peer_transmit_timeout (void *cls,
1212                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1213 {
1214   struct GSF_PeerTransmitHandle *pth = cls;
1215   struct GSF_ConnectedPeer *cp;
1216
1217 #if DEBUG_FS
1218   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1219               "Timeout trying to transmit to other peer\n");
1220 #endif  
1221   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1222   cp = pth->cp;
1223   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1224                                cp->pth_tail,
1225                                pth);
1226   if (GNUNET_YES == pth->is_query)
1227     GNUNET_assert (0 < cp->ppd.pending_queries--);    
1228   else if (GNUNET_NO == pth->is_query)
1229     GNUNET_assert (0 < cp->ppd.pending_replies--);
1230   GNUNET_LOAD_update (cp->ppd.transmission_delay,
1231                       UINT64_MAX);
1232   pth->gmc (pth->gmc_cls, 
1233             0, NULL);
1234   GNUNET_free (pth);
1235 }
1236
1237
1238 /**
1239  * Transmit a message to the given peer as soon as possible.
1240  * If the peer disconnects before the transmission can happen,
1241  * the callback is invoked with a 'NULL' buffer.
1242  *
1243  * @param cp target peer
1244  * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
1245  * @param priority how important is this request?
1246  * @param timeout when does this request timeout (call gmc with error)
1247  * @param size number of bytes we would like to send to the peer
1248  * @param gmc function to call to get the message
1249  * @param gmc_cls closure for gmc
1250  * @return handle to cancel request
1251  */
1252 struct GSF_PeerTransmitHandle *
1253 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1254                     int is_query,
1255                     uint32_t priority,
1256                     struct GNUNET_TIME_Relative timeout,
1257                     size_t size,
1258                     GSF_GetMessageCallback gmc,
1259                     void *gmc_cls)
1260 {
1261   struct GSF_PeerTransmitHandle *pth;
1262   struct GSF_PeerTransmitHandle *pos;
1263   struct GSF_PeerTransmitHandle *prev;
1264
1265   pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
1266   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1267   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1268   pth->gmc = gmc;
1269   pth->gmc_cls = gmc_cls;
1270   pth->size = size;
1271   pth->is_query = is_query;
1272   pth->priority = priority;
1273   pth->cp = cp;
1274   /* insertion sort (by priority, descending) */
1275   prev = NULL;
1276   pos = cp->pth_head;
1277   while ( (pos != NULL) &&
1278           (pos->priority > priority) )
1279     {
1280       prev = pos;
1281       pos = pos->next;
1282     }
1283   if (prev == NULL)
1284     GNUNET_CONTAINER_DLL_insert (cp->pth_head,
1285                                  cp->pth_tail,
1286                                  pth);
1287   else
1288     GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1289                                        cp->pth_tail,
1290                                        prev,
1291                                        pth);
1292   if (GNUNET_YES == is_query)
1293     cp->ppd.pending_queries++;
1294   else if (GNUNET_NO == is_query)
1295     cp->ppd.pending_replies++;
1296
1297   pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
1298                                                     &peer_transmit_timeout,
1299                                                     pth);
1300   schedule_transmission (pth);
1301   return pth;
1302 }
1303
1304
1305 /**
1306  * Cancel an earlier request for transmission.
1307  *
1308  * @param pth request to cancel
1309  */
1310 void
1311 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1312 {
1313   struct GSF_ConnectedPeer *cp;
1314
1315   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1316     {
1317       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1318       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1319     }
1320   if (NULL != pth->cth)
1321     {
1322       GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
1323       pth->cth = NULL;
1324     }
1325   cp = pth->cp;
1326   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1327                                cp->pth_tail,
1328                                pth);
1329   if (GNUNET_YES == pth->is_query)
1330     GNUNET_assert (0 < cp->ppd.pending_queries--);    
1331   else if (GNUNET_NO == pth->is_query)
1332     GNUNET_assert (0 < cp->ppd.pending_replies--);
1333   GNUNET_free (pth);
1334 }
1335
1336
1337 /**
1338  * Report on receiving a reply; update the performance record of the given peer.
1339  *
1340  * @param cp responding peer (will be updated)
1341  * @param request_time time at which the original query was transmitted
1342  * @param request_priority priority of the original request
1343  */
1344 void
1345 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1346                               struct GNUNET_TIME_Absolute request_time,
1347                               uint32_t request_priority)
1348 {
1349   struct GNUNET_TIME_Relative delay;
1350
1351   delay = GNUNET_TIME_absolute_get_duration (request_time);  
1352   cp->ppd.avg_reply_delay.rel_value = (cp->ppd.avg_reply_delay.rel_value * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
1353   cp->ppd.avg_priority = (cp->ppd.avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
1354 }
1355
1356
1357 /**
1358  * Report on receiving a reply in response to an initiating client.
1359  * Remember that this peer is good for this client.
1360  *
1361  * @param cp responding peer (will be updated)
1362  * @param initiator_client local client on responsible for query
1363  */
1364 void
1365 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1366                                    struct GSF_LocalClient *initiator_client)
1367 {
1368   cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1369 }
1370
1371
1372 /**
1373  * Report on receiving a reply in response to an initiating peer.
1374  * Remember that this peer is good for this initiating peer.
1375  *
1376  * @param cp responding peer (will be updated)
1377  * @param initiator_peer other peer responsible for query
1378  */
1379 void
1380 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1381                                  const struct GSF_ConnectedPeer *initiator_peer)
1382 {
1383   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
1384   cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->ppd.pid;
1385   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1386 }
1387
1388
1389 /**
1390  * Method called whenever a given peer has a status change.
1391  *
1392  * @param cls closure
1393  * @param peer peer identity this notification is about
1394  * @param bandwidth_in available amount of inbound bandwidth
1395  * @param bandwidth_out available amount of outbound bandwidth
1396  * @param timeout absolute time when this peer will time out
1397  *        unless we see some further activity from it
1398  * @param atsi status information
1399  */
1400 void
1401 GSF_peer_status_handler_ (void *cls,
1402                           const struct GNUNET_PeerIdentity *peer,
1403                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1404                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1405                           struct GNUNET_TIME_Absolute timeout,
1406                           const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1407 {
1408   struct GSF_ConnectedPeer *cp;
1409
1410   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1411                                           &peer->hashPubKey);
1412   GNUNET_assert (NULL != cp);
1413   update_atsi (cp, atsi);
1414 }
1415
1416
1417 /**
1418  * Cancel all requests associated with the peer.
1419  *
1420  * @param cls unused
1421  * @param query hash code of the request
1422  * @param value the 'struct GSF_PendingRequest'
1423  * @return GNUNET_YES (continue to iterate)
1424  */
1425 static int
1426 cancel_pending_request (void *cls,
1427                         const GNUNET_HashCode *query,
1428                         void *value)
1429 {
1430   struct PeerRequest *peerreq = cls;
1431   struct GSF_PendingRequest *pr = peerreq->pr;
1432
1433   GSF_pending_request_cancel_ (pr);
1434   if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
1435     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
1436   GNUNET_free (peerreq);
1437   return GNUNET_OK;
1438 }
1439
1440
1441 /**
1442  * A peer disconnected from us.  Tear down the connected peer
1443  * record.
1444  *
1445  * @param cls unused
1446  * @param peer identity of peer that connected
1447  */
1448 void
1449 GSF_peer_disconnect_handler_ (void *cls,
1450                               const struct GNUNET_PeerIdentity *peer)
1451 {
1452   struct GSF_ConnectedPeer *cp;
1453   struct GSF_PeerTransmitHandle *pth;
1454
1455   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1456                                           &peer->hashPubKey);
1457   if (NULL == cp)
1458     return; /* must have been disconnect from core with
1459                'peer' == my_id, ignore */
1460   GNUNET_CONTAINER_multihashmap_remove (cp_map,
1461                                         &peer->hashPubKey,
1462                                         cp);
1463   if (NULL != cp->migration_pth)
1464     {
1465       GSF_peer_transmit_cancel_ (cp->migration_pth);
1466       cp->migration_pth = NULL;
1467     }
1468   if (NULL != cp->irc)
1469     {
1470       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1471       cp->irc = NULL;
1472     }
1473   if (GNUNET_SCHEDULER_NO_TASK != cp->irc_delay_task)
1474     {
1475       GNUNET_SCHEDULER_cancel (cp->irc_delay_task);
1476       cp->irc_delay_task = GNUNET_SCHEDULER_NO_TASK;
1477     }
1478   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1479                                          &cancel_pending_request,
1480                                          cp);
1481   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1482   cp->request_map = NULL;
1483   GSF_plan_notify_peer_disconnect_ (cp);
1484   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1485   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1486   while (NULL != (pth = cp->pth_head))
1487     {
1488       if (NULL != pth->cth)
1489         {
1490           GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
1491           pth->cth = NULL;
1492         }
1493       GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1494                                    cp->pth_tail,
1495                                    pth);
1496       GNUNET_free (pth);
1497     }
1498   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1499   if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1500     {
1501       GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1502       cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
1503     }
1504   GNUNET_free (cp);
1505 }
1506
1507
1508 /**
1509  * Closure for 'call_iterator'.
1510  */
1511 struct IterationContext
1512 {
1513   /**
1514    * Function to call on each entry.
1515    */
1516   GSF_ConnectedPeerIterator it;
1517
1518   /**
1519    * Closure for 'it'.
1520    */
1521   void *it_cls;
1522 };
1523
1524
1525 /**
1526  * Function that calls the callback for each peer.
1527  *
1528  * @param cls the 'struct IterationContext*'
1529  * @param key identity of the peer
1530  * @param value the 'struct GSF_ConnectedPeer*'
1531  * @return GNUNET_YES to continue iteration
1532  */
1533 static int
1534 call_iterator (void *cls,
1535                const GNUNET_HashCode *key,
1536                void *value)
1537 {
1538   struct IterationContext *ic = cls;
1539   struct GSF_ConnectedPeer *cp = value;
1540   
1541   ic->it (ic->it_cls,
1542           (const struct GNUNET_PeerIdentity*) key,
1543           cp,
1544           &cp->ppd);
1545   return GNUNET_YES;
1546 }
1547
1548
1549 /**
1550  * Iterate over all connected peers.
1551  *
1552  * @param it function to call for each peer
1553  * @param it_cls closure for it
1554  */
1555 void
1556 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1557                               void *it_cls)
1558 {
1559   struct IterationContext ic;
1560
1561   ic.it = it;
1562   ic.it_cls = it_cls;
1563   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1564                                          &call_iterator,
1565                                          &ic);
1566 }
1567
1568
1569 /**
1570  * Obtain the identity of a connected peer.
1571  *
1572  * @param cp peer to reserve bandwidth from
1573  * @param id identity to set (written to)
1574  */
1575 void
1576 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1577                                   struct GNUNET_PeerIdentity *id)
1578 {
1579   GNUNET_PEER_resolve (cp->ppd.pid,
1580                        id);
1581 }
1582
1583
1584 /**
1585  * Assemble a migration stop message for transmission.
1586  *
1587  * @param cls the 'struct GSF_ConnectedPeer' to use
1588  * @param size number of bytes we're allowed to write to buf
1589  * @param buf where to copy the message
1590  * @return number of bytes copied to buf
1591  */
1592 static size_t
1593 create_migration_stop_message (void *cls,
1594                                size_t size,
1595                                void *buf)
1596 {
1597   struct GSF_ConnectedPeer *cp = cls;
1598   struct MigrationStopMessage msm;
1599
1600   cp->migration_pth = NULL;
1601   if (NULL == buf)
1602     return 0;
1603   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1604   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1605   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1606   msm.reserved = htonl (0);
1607   msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block));
1608   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1609   return sizeof (struct MigrationStopMessage);
1610 }
1611
1612
1613 /**
1614  * Ask a peer to stop migrating data to us until the given point
1615  * in time.
1616  * 
1617  * @param cp peer to ask
1618  * @param block_time until when to block
1619  */
1620 void
1621 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1622                            struct GNUNET_TIME_Relative block_time)
1623 {
1624   if (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block).rel_value > block_time.rel_value)
1625     {
1626 #if DEBUG_FS && 0
1627       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1628           "Migration already blocked for another %llu ms\n",
1629                   (unsigned long long) GNUNET_TIME_absolute_get_remaining (cp->last_migration_block).rel_value);
1630 #endif
1631       return; /* already blocked */
1632     }
1633 #if DEBUG_FS && 0
1634   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1635               "Asking to stop migration for %llu ms\n",
1636               (unsigned long long) block_time.rel_value);
1637 #endif
1638   cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
1639   if (cp->migration_pth != NULL)
1640     GSF_peer_transmit_cancel_ (cp->migration_pth);
1641   cp->migration_pth 
1642     = GSF_peer_transmit_ (cp,
1643                           GNUNET_SYSERR,
1644                           UINT32_MAX,
1645                           GNUNET_TIME_UNIT_FOREVER_REL,
1646                           sizeof (struct MigrationStopMessage),
1647                           &create_migration_stop_message,
1648                           cp);
1649 }
1650
1651
1652 /**
1653  * Write host-trust information to a file - flush the buffer entry!
1654  *
1655  * @param cls closure, not used
1656  * @param key host identity
1657  * @param value the 'struct GSF_ConnectedPeer' to flush
1658  * @return GNUNET_OK to continue iteration
1659  */
1660 static int
1661 flush_trust (void *cls,
1662              const GNUNET_HashCode *key,
1663              void *value)
1664 {
1665   struct GSF_ConnectedPeer *cp = value;
1666   char *fn;
1667   uint32_t trust;
1668   struct GNUNET_PeerIdentity pid;
1669
1670   if (cp->ppd.trust == cp->disk_trust)
1671     return GNUNET_OK;                     /* unchanged */
1672   GNUNET_PEER_resolve (cp->ppd.pid,
1673                        &pid);
1674   fn = get_trust_filename (&pid);
1675   if (cp->ppd.trust == 0)
1676     {
1677       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1678         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1679                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1680     }
1681   else
1682     {
1683       trust = htonl (cp->ppd.trust);
1684       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1685                                                     sizeof(uint32_t),
1686                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1687                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1688         cp->disk_trust = cp->ppd.trust;
1689     }
1690   GNUNET_free (fn);
1691   return GNUNET_OK;
1692 }
1693
1694
1695 /**
1696  * Notify core about a preference we have for the given peer
1697  * (to allocate more resources towards it).  The change will
1698  * be communicated the next time we reserve bandwidth with
1699  * core (not instantly).
1700  *
1701  * @param cp peer to reserve bandwidth from
1702  * @param pref preference change
1703  */
1704 void
1705 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1706                                        uint64_t pref)
1707 {
1708   cp->inc_preference += pref;
1709 }
1710
1711
1712 /**
1713  * Call this method periodically to flush trust information to disk.
1714  *
1715  * @param cls closure, not used
1716  * @param tc task context, not used
1717  */
1718 static void
1719 cron_flush_trust (void *cls,
1720                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1721 {
1722
1723   if (NULL == cp_map)
1724     return;
1725   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1726                                          &flush_trust,
1727                                          NULL);
1728   if (NULL == tc)
1729     return;
1730   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1731     return;
1732   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, 
1733                                 &cron_flush_trust, 
1734                                 NULL);
1735 }
1736
1737
1738 /**
1739  * Initialize peer management subsystem.
1740  */
1741 void
1742 GSF_connected_peer_init_ ()
1743 {
1744   cp_map = GNUNET_CONTAINER_multihashmap_create (128);
1745   GNUNET_assert (GNUNET_OK ==
1746                  GNUNET_CONFIGURATION_get_value_filename (GSF_cfg,
1747                                                           "fs",
1748                                                           "TRUST",
1749                                                           &trustDirectory));
1750   GNUNET_DISK_directory_create (trustDirectory);
1751   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1752                                       &cron_flush_trust, NULL);
1753 }
1754
1755
1756 /**
1757  * Iterator to free peer entries.
1758  *
1759  * @param cls closure, unused
1760  * @param key current key code
1761  * @param value value in the hash map (peer entry)
1762  * @return GNUNET_YES (we should continue to iterate)
1763  */
1764 static int 
1765 clean_peer (void *cls,
1766             const GNUNET_HashCode * key,
1767             void *value)
1768 {
1769   GSF_peer_disconnect_handler_ (NULL, 
1770                                 (const struct GNUNET_PeerIdentity*) key);
1771   return GNUNET_YES;
1772 }
1773
1774
1775 /**
1776  * Shutdown peer management subsystem.
1777  */
1778 void
1779 GSF_connected_peer_done_ ()
1780 {
1781   cron_flush_trust (NULL, NULL);
1782   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1783                                          &clean_peer,
1784                                          NULL);
1785   GNUNET_CONTAINER_multihashmap_destroy (cp_map);
1786   cp_map = NULL;
1787   GNUNET_free (trustDirectory);
1788   trustDirectory = NULL;
1789 }
1790
1791
1792 /**
1793  * Iterator to remove references to LC entry.
1794  *
1795  * @param cls the 'struct GSF_LocalClient*' to look for
1796  * @param key current key code
1797  * @param value value in the hash map (peer entry)
1798  * @return GNUNET_YES (we should continue to iterate)
1799  */
1800 static int 
1801 clean_local_client (void *cls,
1802                     const GNUNET_HashCode * key,
1803                     void *value)
1804 {
1805   const struct GSF_LocalClient *lc = cls;
1806   struct GSF_ConnectedPeer *cp = value;
1807   unsigned int i;
1808
1809   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1810     if (cp->ppd.last_client_replies[i] == lc)
1811       cp->ppd.last_client_replies[i] = NULL;
1812   return GNUNET_YES;
1813 }
1814
1815
1816 /**
1817  * Notification that a local client disconnected.  Clean up all of our
1818  * references to the given handle.
1819  *
1820  * @param lc handle to the local client (henceforth invalid)
1821  */
1822 void
1823 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1824 {
1825   if (NULL == cp_map)
1826     return; /* already cleaned up */
1827   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1828                                          &clean_local_client,
1829                                          (void*) lc);
1830 }
1831
1832
1833 /* end of gnunet-service-fs_cp.c */