fix
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_pe.h"
31 #include "gnunet-service-fs_pr.h"
32 #include "gnunet-service-fs_push.h"
33
34 /**
35  * How often do we flush trust values to disk?
36  */
37 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
38
39 /**
40  * After how long do we discard a reply?
41  */
42 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
43
44
45 /**
46  * Handle to cancel a transmission request.
47  */
48 struct GSF_PeerTransmitHandle
49 {
50
51   /**
52    * Kept in a doubly-linked list.
53    */
54   struct GSF_PeerTransmitHandle *next;
55
56   /**
57    * Kept in a doubly-linked list.
58    */
59   struct GSF_PeerTransmitHandle *prev;
60
61   /**
62    * Handle for an active request for transmission to this
63    * peer, or NULL (if core queue was full).
64    */
65   struct GNUNET_CORE_TransmitHandle *cth;
66
67   /**
68    * Time when this transmission request was issued.
69    */
70   struct GNUNET_TIME_Absolute transmission_request_start_time;
71
72   /**
73    * Timeout for this request.
74    */
75   struct GNUNET_TIME_Absolute timeout;
76
77   /**
78    * Task called on timeout, or 0 for none.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
81
82   /**
83    * Function to call to get the actual message.
84    */
85   GSF_GetMessageCallback gmc;
86
87   /**
88    * Peer this request targets.
89    */
90   struct GSF_ConnectedPeer *cp;
91
92   /**
93    * Closure for 'gmc'.
94    */
95   void *gmc_cls;
96
97   /**
98    * Size of the message to be transmitted.
99    */
100   size_t size;
101
102   /**
103    * Set to 1 if we're currently in the process of calling
104    * 'GNUNET_CORE_notify_transmit_ready' (so while cth is
105    * NULL, we should not call notify_transmit_ready for this
106    * handle right now).
107    */
108   unsigned int cth_in_progress;
109
110   /**
111    * GNUNET_YES if this is a query, GNUNET_NO for content.
112    */
113   int is_query;
114
115   /**
116    * Did we get a reservation already?
117    */
118   int was_reserved;
119
120   /**
121    * Priority of this request.
122    */
123   uint32_t priority;
124
125 };
126
127
128 /**
129  * Information per peer and request.
130  */
131 struct PeerRequest
132 {
133
134   /**
135    * Handle to generic request.
136    */
137   struct GSF_PendingRequest *pr;
138   
139   /**
140    * Handle to specific peer.
141    */
142   struct GSF_ConnectedPeer *cp;
143
144   /**
145    * Task for asynchronous stopping of this request.
146    */
147   GNUNET_SCHEDULER_TaskIdentifier kill_task;
148
149 };
150
151
152 /**
153  * A connected peer.
154  */
155 struct GSF_ConnectedPeer 
156 {
157
158   /**
159    * Performance data for this peer.
160    */
161   struct GSF_PeerPerformanceData ppd;
162
163   /**
164    * Time until when we blocked this peer from migrating
165    * data to us.
166    */
167   struct GNUNET_TIME_Absolute last_migration_block;
168
169   /**
170    * Task scheduled to revive migration to this peer.
171    */
172   GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
173
174   /**
175    * Messages (replies, queries, content migration) we would like to
176    * send to this peer in the near future.  Sorted by priority, head.
177    */
178   struct GSF_PeerTransmitHandle *pth_head;
179
180   /**
181    * Messages (replies, queries, content migration) we would like to
182    * send to this peer in the near future.  Sorted by priority, tail.
183    */
184   struct GSF_PeerTransmitHandle *pth_tail;
185
186   /**
187    * Migration stop message in our queue, or NULL if we have none pending.
188    */
189   struct GSF_PeerTransmitHandle *migration_pth;
190
191   /**
192    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
193    */
194   struct GNUNET_CORE_InformationRequestContext *irc;
195
196   /**
197    * Task scheduled if we need to retry bandwidth reservation later.
198    */
199   GNUNET_SCHEDULER_TaskIdentifier irc_delay_task;
200
201   /**
202    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
203    */
204   struct GNUNET_CONTAINER_MultiHashMap *request_map;
205
206   /**
207    * Increase in traffic preference still to be submitted
208    * to the core service for this peer.
209    */
210   uint64_t inc_preference;
211
212   /**
213    * Trust rating for this peer on disk.
214    */
215   uint32_t disk_trust;
216
217   /**
218    * Which offset in "last_p2p_replies" will be updated next?
219    * (we go round-robin).
220    */
221   unsigned int last_p2p_replies_woff;
222
223   /**
224    * Which offset in "last_client_replies" will be updated next?
225    * (we go round-robin).
226    */
227   unsigned int last_client_replies_woff;
228
229   /**
230    * Current offset into 'last_request_times' ring buffer.
231    */
232   unsigned int last_request_times_off;
233
234   /**
235    * GNUNET_YES if we did successfully reserve 32k bandwidth,
236    * GNUNET_NO if not.
237    */
238   int did_reserve;
239
240 };
241
242
243 /**
244  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
245  */
246 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
247
248 /**
249  * Where do we store trust information?
250  */
251 static char *trustDirectory;
252
253
254 /**
255  * Get the filename under which we would store the GNUNET_HELLO_Message
256  * for the given host and protocol.
257  * @return filename of the form DIRECTORY/HOSTID
258  */
259 static char *
260 get_trust_filename (const struct GNUNET_PeerIdentity *id)
261 {
262   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
263   char *fn;
264
265   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
266   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
267   return fn;
268 }
269
270
271 /**
272  * Find latency information in 'atsi'.
273  *
274  * @param atsi performance data
275  * @return connection latency
276  */
277 static struct GNUNET_TIME_Relative
278 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
279 {
280   if (atsi == NULL)
281     return GNUNET_TIME_UNIT_SECONDS;
282   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
283           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
284     atsi++;
285   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
286     {
287       GNUNET_break (0);
288       /* how can we not have latency data? */
289       return GNUNET_TIME_UNIT_SECONDS;
290     }
291   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
292                                         ntohl (atsi->value));
293 }
294
295
296 /**
297  * Update the performance information kept for the given peer.
298  *
299  * @param cp peer record to update
300  * @param atsi transport performance data
301  */
302 static void
303 update_atsi (struct GSF_ConnectedPeer *cp,
304              const struct GNUNET_TRANSPORT_ATS_Information *atsi)
305 {
306   struct GNUNET_TIME_Relative latency;
307
308   latency = get_latency (atsi);
309   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
310                                  latency);
311   /* LATER: merge atsi into cp's performance data (if we ever care...) */
312 }
313
314
315 /**
316  * Return the performance data record for the given peer
317  * 
318  * @param cp peer to query
319  * @return performance data record for the peer
320  */
321 struct GSF_PeerPerformanceData *
322 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
323 {
324   return &cp->ppd;
325 }
326
327
328 /**
329  * Core is ready to transmit to a peer, get the message.
330  *
331  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
332  * @param size number of bytes core is willing to take
333  * @param buf where to copy the message
334  * @return number of bytes copied to buf
335  */
336 static size_t
337 peer_transmit_ready_cb (void *cls,
338                         size_t size,
339                         void *buf);
340
341
342
343
344 /**
345  * Function called by core upon success or failure of our bandwidth reservation request.
346  *
347  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
348  * @param peer identifies the peer
349  * @param bandwidth_out available amount of outbound bandwidth
350  * @param amount set to the amount that was actually reserved or unreserved;
351  *               either the full requested amount or zero (no partial reservations)
352  * @param res_delay if the reservation could not be satisfied (amount was 0), how
353  *        long should the client wait until re-trying?
354  * @param preference current traffic preference for the given peer
355  */
356 static void
357 core_reserve_callback (void *cls,
358                        const struct GNUNET_PeerIdentity *peer,
359                        struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
360                        int32_t amount,
361                        struct GNUNET_TIME_Relative res_delay,
362                        uint64_t preference);
363
364
365 /**
366  * If ready (bandwidth reserved), try to schedule transmission via
367  * core for the given handle.
368  *
369  * @param pth transmission handle to schedule
370  */
371 static void
372 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
373 {
374   struct GSF_ConnectedPeer *cp;
375   struct GNUNET_PeerIdentity target;
376   uint64_t ip;
377
378   if ( (NULL != pth->cth) ||
379        (0 != pth->cth_in_progress) )
380     return; /* already done */
381   cp = pth->cp;
382   GNUNET_PEER_resolve (cp->ppd.pid,
383                        &target);
384   if ( (GNUNET_YES == pth->is_query) &&
385        (GNUNET_YES != pth->was_reserved) )
386     {
387       /* query, need reservation */
388       if (GNUNET_YES != cp->did_reserve)
389         return; /* not ready */
390       cp->did_reserve = GNUNET_NO;
391       /* reservation already done! */
392       pth->was_reserved = GNUNET_YES;
393       ip = cp->inc_preference;
394       cp->inc_preference = 0;
395       cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
396                                                     &target,
397                                                     GNUNET_TIME_UNIT_FOREVER_REL,
398                                                     GNUNET_BANDWIDTH_VALUE_MAX,
399                                                     DBLOCK_SIZE,
400                                                     ip,
401                                                     &core_reserve_callback,
402                                                     cp);          
403     }
404   GNUNET_assert (pth->cth == NULL);
405   pth->cth_in_progress++;
406   pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
407                                                 GNUNET_YES,
408                                                 pth->priority,
409                                                 GNUNET_TIME_absolute_get_remaining (pth->timeout),
410                                                 &target,
411                                                 pth->size,
412                                                 &peer_transmit_ready_cb,
413                                                 pth);
414   GNUNET_assert (0 < pth->cth_in_progress--);
415 }
416
417
418 /**
419  * Core is ready to transmit to a peer, get the message.
420  *
421  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
422  * @param size number of bytes core is willing to take
423  * @param buf where to copy the message
424  * @return number of bytes copied to buf
425  */
426 static size_t
427 peer_transmit_ready_cb (void *cls,
428                         size_t size,
429                         void *buf)
430 {
431   struct GSF_PeerTransmitHandle *pth = cls;
432   struct GSF_PeerTransmitHandle *pos;
433   struct GSF_ConnectedPeer *cp;
434   size_t ret;
435
436   GNUNET_assert ( (NULL == buf) ||
437                   (pth->size <= size) );
438   pth->cth = NULL;
439   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
440     {
441       GNUNET_SCHEDULER_cancel (pth->timeout_task);
442       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
443     }
444   cp = pth->cp;
445   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
446                                cp->pth_tail,
447                                pth);
448   if (GNUNET_YES == pth->is_query)
449     {
450       cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
451       GNUNET_assert (0 < cp->ppd.pending_queries--);    
452     }
453   else if (GNUNET_NO == pth->is_query)
454     {
455       GNUNET_assert (0 < cp->ppd.pending_replies--);
456     }
457   GNUNET_LOAD_update (cp->ppd.transmission_delay,
458                       GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value);
459   ret = pth->gmc (pth->gmc_cls, 
460                   size, buf);
461   GNUNET_assert (NULL == pth->cth);
462   for (pos = cp->pth_head; pos != NULL; pos = pos->next)
463     {
464       GNUNET_assert (pos != pth);
465       schedule_transmission (pos);
466     }
467   GNUNET_assert (pth->cth == NULL);
468   GNUNET_assert (pth->cth_in_progress == 0);
469   GNUNET_free (pth);
470   return ret;
471 }
472
473
474 /**
475  * (re)try to reserve bandwidth from the given peer.
476  *
477  * @param cls the 'struct GSF_ConnectedPeer' to reserve from
478  * @param tc scheduler context
479  */
480 static void
481 retry_reservation (void *cls,
482                    const struct GNUNET_SCHEDULER_TaskContext *tc)
483 {
484   struct GSF_ConnectedPeer *cp = cls;
485   uint64_t ip;
486   struct GNUNET_PeerIdentity target;
487
488   GNUNET_PEER_resolve (cp->ppd.pid,
489                        &target);
490   cp->irc_delay_task = GNUNET_SCHEDULER_NO_TASK;
491   ip = cp->inc_preference;
492   cp->inc_preference = 0;
493   cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
494                                                 &target,
495                                                 GNUNET_TIME_UNIT_FOREVER_REL,
496                                                 GNUNET_BANDWIDTH_VALUE_MAX,
497                                                 DBLOCK_SIZE,
498                                                 ip,
499                                                 &core_reserve_callback,
500                                                 cp);
501 }
502
503
504 /**
505  * Function called by core upon success or failure of our bandwidth reservation request.
506  *
507  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
508  * @param peer identifies the peer
509  * @param bandwidth_out available amount of outbound bandwidth
510  * @param amount set to the amount that was actually reserved or unreserved;
511  *               either the full requested amount or zero (no partial reservations)
512  * @param res_delay if the reservation could not be satisfied (amount was 0), how
513  *        long should the client wait until re-trying?
514  * @param preference current traffic preference for the given peer
515  */
516 static void
517 core_reserve_callback (void *cls,
518                        const struct GNUNET_PeerIdentity *peer,
519                        struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
520                        int32_t amount,
521                        struct GNUNET_TIME_Relative res_delay,
522                        uint64_t preference)
523 {
524   struct GSF_ConnectedPeer *cp = cls;
525   struct GSF_PeerTransmitHandle *pth;
526
527   cp->irc = NULL;
528   if (0 == amount)
529     {
530       cp->irc_delay_task = GNUNET_SCHEDULER_add_delayed (res_delay,
531                                                          &retry_reservation,
532                                                          cp);
533       return;
534     }
535   cp->did_reserve = GNUNET_YES;
536   pth = cp->pth_head;
537   if ( (NULL != pth) &&
538        (NULL == pth->cth) )
539     {
540       /* reservation success, try transmission now! */
541       pth->cth_in_progress++;
542       pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
543                                                     GNUNET_YES,
544                                                     pth->priority,
545                                                     GNUNET_TIME_absolute_get_remaining (pth->timeout),
546                                                     peer,
547                                                     pth->size,
548                                                     &peer_transmit_ready_cb,
549                                                     pth);
550       GNUNET_assert (0 < pth->cth_in_progress--);
551     }
552 }
553
554
555 /**
556  * A peer connected to us.  Setup the connected peer
557  * records.
558  *
559  * @param peer identity of peer that connected
560  * @param atsi performance data for the connection
561  * @return handle to connected peer entry
562  */
563 struct GSF_ConnectedPeer *
564 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
565                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
566 {
567   struct GSF_ConnectedPeer *cp;
568   char *fn;
569   uint32_t trust;
570
571   cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
572   cp->ppd.pid = GNUNET_PEER_intern (peer);
573   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
574   cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
575                                                 peer,
576                                                 GNUNET_TIME_UNIT_FOREVER_REL,
577                                                 GNUNET_BANDWIDTH_VALUE_MAX,
578                                                 DBLOCK_SIZE,
579                                                 0,
580                                                 &core_reserve_callback,
581                                                 cp);
582   fn = get_trust_filename (peer);
583   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
584       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
585     cp->disk_trust = cp->ppd.trust = ntohl (trust);
586   GNUNET_free (fn);
587   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
588   GNUNET_break (GNUNET_OK ==
589                 GNUNET_CONTAINER_multihashmap_put (cp_map,
590                                                    &peer->hashPubKey,
591                                                    cp,
592                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
593   update_atsi (cp, atsi);
594   GSF_push_start_ (cp);
595   return cp;
596 }
597
598
599 /**
600  * It may be time to re-start migrating content to this
601  * peer.  Check, and if so, restart migration.
602  *
603  * @param cls the 'struct GSF_ConnectedPeer'
604  * @param tc scheduler context
605  */
606 static void
607 revive_migration (void *cls,
608                   const struct GNUNET_SCHEDULER_TaskContext *tc)
609 {
610   struct GSF_ConnectedPeer *cp = cls;
611   struct GNUNET_TIME_Relative bt;
612   
613   cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
614   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
615   if (0 != bt.rel_value)
616     {
617       /* still time left... */
618       cp->mig_revive_task 
619         = GNUNET_SCHEDULER_add_delayed (bt,
620                                         &revive_migration,
621                                         cp);
622       return;
623     }
624   GSF_push_start_ (cp);
625 }
626
627
628 /**
629  * Get a handle for a connected peer.
630  *
631  * @param peer peer's identity
632  * @return NULL if the peer is not currently connected
633  */
634 struct GSF_ConnectedPeer *
635 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
636 {
637   return GNUNET_CONTAINER_multihashmap_get (cp_map,
638                                             &peer->hashPubKey);
639 }
640
641
642 /**
643  * Handle P2P "MIGRATION_STOP" message.
644  *
645  * @param cls closure, always NULL
646  * @param other the other peer involved (sender or receiver, NULL
647  *        for loopback messages where we are both sender and receiver)
648  * @param message the actual message
649  * @param atsi performance information
650  * @return GNUNET_OK to keep the connection open,
651  *         GNUNET_SYSERR to close it (signal serious error)
652  */
653 int
654 GSF_handle_p2p_migration_stop_ (void *cls,
655                                 const struct GNUNET_PeerIdentity *other,
656                                 const struct GNUNET_MessageHeader *message,
657                                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
658 {
659   struct GSF_ConnectedPeer *cp; 
660   const struct MigrationStopMessage *msm;
661   struct GNUNET_TIME_Relative bt;
662
663   msm = (const struct MigrationStopMessage*) message;
664   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
665                                           &other->hashPubKey);
666   if (cp == NULL)
667     {
668       GNUNET_break (0);
669       return GNUNET_OK;
670     }
671   bt = GNUNET_TIME_relative_ntoh (msm->duration);
672   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
673               _("Migration of content to peer `%s' blocked for %llu ms\n"),
674               GNUNET_i2s (other),
675               (unsigned long long) bt.rel_value);
676   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
677   if (cp->mig_revive_task == GNUNET_SCHEDULER_NO_TASK)
678     {
679       GSF_push_stop_ (cp);
680       cp->mig_revive_task 
681         = GNUNET_SCHEDULER_add_delayed (bt,
682                                         &revive_migration,
683                                         cp);
684     }
685   update_atsi (cp, atsi);
686   return GNUNET_OK;
687 }
688
689
690 /**
691  * Copy reply and free put message.
692  *
693  * @param cls the 'struct PutMessage'
694  * @param buf_size number of bytes available in buf
695  * @param buf where to copy the message, NULL on error (peer disconnect)
696  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
697  */
698 static size_t 
699 copy_reply (void *cls,
700             size_t buf_size,
701             void *buf)
702 {
703   struct PutMessage *pm = cls;
704   size_t size;
705
706   if (buf != NULL)
707     {
708       GNUNET_assert (buf_size >= ntohs (pm->header.size));
709       size = ntohs (pm->header.size);
710       memcpy (buf, pm, size); 
711       GNUNET_STATISTICS_update (GSF_stats,
712                                 gettext_noop ("# replies transmitted to other peers"),
713                                 1,
714                                 GNUNET_NO); 
715     }
716   else
717     {
718       size = 0;
719       GNUNET_STATISTICS_update (GSF_stats,
720                                 gettext_noop ("# replies dropped"),
721                                 1,
722                                 GNUNET_NO); 
723     }
724   GNUNET_free (pm);
725   return size;
726 }
727
728
729 /**
730  * Free the given request.
731  *
732  * @param cls the request to free
733  * @param tc task context
734  */ 
735 static void
736 peer_request_destroy (void *cls,
737                       const struct GNUNET_SCHEDULER_TaskContext *tc)
738 {
739   struct PeerRequest *peerreq = cls;
740   struct GSF_PendingRequest *pr = peerreq->pr;
741   struct GSF_ConnectedPeer *cp = peerreq->cp;
742   struct GSF_PendingRequestData *prd;
743
744   peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
745   prd = GSF_pending_request_get_data_ (pr);
746   GNUNET_STATISTICS_update (GSF_stats,
747                             gettext_noop ("# P2P searches active"),
748                             -1,
749                             GNUNET_NO);
750   GNUNET_break (GNUNET_OK ==
751                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
752                                                       &prd->query,
753                                                       peerreq));
754   GSF_pending_request_cancel_ (pr);
755   GNUNET_free (peerreq);
756 }
757
758
759 /**
760  * Handle a reply to a pending request.  Also called if a request
761  * expires (then with data == NULL).  The handler may be called
762  * many times (depending on the request type), but will not be
763  * called during or after a call to GSF_pending_request_cancel 
764  * and will also not be called anymore after a call signalling
765  * expiration.
766  *
767  * @param cls 'struct PeerRequest' this is an answer for
768  * @param eval evaluation of the result
769  * @param pr handle to the original pending request
770  * @param expiration when does 'data' expire?
771  * @param type type of the block
772  * @param data response data, NULL on request expiration
773  * @param data_len number of bytes in data
774  */
775 static void
776 handle_p2p_reply (void *cls,
777                   enum GNUNET_BLOCK_EvaluationResult eval,
778                   struct GSF_PendingRequest *pr,
779                   struct GNUNET_TIME_Absolute expiration,
780                   enum GNUNET_BLOCK_Type type,
781                   const void *data,
782                   size_t data_len)
783 {
784   struct PeerRequest *peerreq = cls;
785   struct GSF_ConnectedPeer *cp = peerreq->cp;
786   struct GSF_PendingRequestData *prd;
787   struct PutMessage *pm;
788   size_t msize;
789
790   GNUNET_assert (data_len + sizeof (struct PutMessage) < GNUNET_SERVER_MAX_MESSAGE_SIZE);
791   GNUNET_assert (peerreq->pr == pr);
792   prd = GSF_pending_request_get_data_ (pr);
793   if (NULL == data)
794     {
795       GNUNET_STATISTICS_update (GSF_stats,
796                                 gettext_noop ("# P2P searches active"),
797                                 -1,
798                                 GNUNET_NO);
799       GNUNET_break (GNUNET_OK ==
800                     GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
801                                                           &prd->query,
802                                                           peerreq));
803       GNUNET_free (peerreq);
804       return;
805     }  
806   GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY);
807   if ( (prd->type != type) &&
808        (prd->type != GNUNET_BLOCK_TYPE_ANY) )
809     {
810       GNUNET_break (0);
811       return;
812     }
813 #if DEBUG_FS
814   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815               "Transmitting result for query `%s' to peer\n",
816               GNUNET_h2s (&prd->query));
817 #endif  
818   GNUNET_STATISTICS_update (GSF_stats,
819                             gettext_noop ("# replies received for other peers"),
820                             1,
821                             GNUNET_NO); 
822   msize = sizeof (struct PutMessage) + data_len;
823   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
824     {
825       GNUNET_break (0);
826       return;
827     }
828   pm = GNUNET_malloc (msize);
829   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
830   pm->header.size = htons (msize);
831   pm->type = htonl (type);
832   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
833   memcpy (&pm[1], data, data_len);
834   (void) GSF_peer_transmit_ (cp, GNUNET_NO,
835                              UINT32_MAX,
836                              REPLY_TIMEOUT,
837                              msize,
838                              &copy_reply,
839                              pm);
840   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
841     return;
842   if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task)
843     peerreq->kill_task = GNUNET_SCHEDULER_add_now (&peer_request_destroy,
844                                                    peerreq);
845 }
846
847
848 /**
849  * Increase the host credit by a value.
850  *
851  * @param cp which peer to change the trust value on
852  * @param value is the int value by which the
853  *  host credit is to be increased or decreased
854  * @returns the actual change in trust (positive or negative)
855  */
856 static int
857 change_host_trust (struct GSF_ConnectedPeer *cp, int value)
858 {
859   if (value == 0)
860     return 0;
861   GNUNET_assert (cp != NULL);
862   if (value > 0)
863     {
864       if (cp->ppd.trust + value < cp->ppd.trust)
865         {
866           value = UINT32_MAX - cp->ppd.trust;
867           cp->ppd.trust = UINT32_MAX;
868         }
869       else
870         cp->ppd.trust += value;
871     }
872   else
873     {
874       if (cp->ppd.trust < -value)
875         {
876           value = -cp->ppd.trust;
877           cp->ppd.trust = 0;
878         }
879       else
880         cp->ppd.trust += value;
881     }
882   return value;
883 }
884
885
886 /**
887  * We've received a request with the specified priority.  Bound it
888  * according to how much we trust the given peer.
889  * 
890  * @param prio_in requested priority
891  * @param cp the peer making the request
892  * @return effective priority
893  */
894 static int32_t
895 bound_priority (uint32_t prio_in,
896                 struct GSF_ConnectedPeer *cp)
897 {
898 #define N ((double)128.0)
899   uint32_t ret;
900   double rret;
901   int ld;
902
903   ld = GSF_test_get_load_too_high_ (0);
904   if (ld == GNUNET_SYSERR)
905     {
906       GNUNET_STATISTICS_update (GSF_stats,
907                                 gettext_noop ("# requests done for free (low load)"),
908                                 1,
909                                 GNUNET_NO);
910       return 0; /* excess resources */
911     }
912   if (prio_in > INT32_MAX)
913     prio_in = INT32_MAX;
914   ret = - change_host_trust (cp, - (int) prio_in);
915   if (ret > 0)
916     {
917       if (ret > GSF_current_priorities + N)
918         rret = GSF_current_priorities + N;
919       else
920         rret = ret;
921       GSF_current_priorities 
922         = (GSF_current_priorities * (N-1) + rret)/N;
923     }
924   if ( (ld == GNUNET_YES) && (ret > 0) )
925     {
926       /* try with charging */
927       ld = GSF_test_get_load_too_high_ (ret);
928     }
929   if (ld == GNUNET_YES)
930     {
931       GNUNET_STATISTICS_update (GSF_stats,
932                                 gettext_noop ("# request dropped, priority insufficient"),
933                                 1,
934                                 GNUNET_NO);
935       /* undo charge */
936       change_host_trust (cp, (int) ret);
937       return -1; /* not enough resources */
938     }
939   else
940     {
941       GNUNET_STATISTICS_update (GSF_stats,
942                                 gettext_noop ("# requests done for a price (normal load)"),
943                                 1,
944                                 GNUNET_NO);
945     }
946 #undef N
947   return ret;
948 }
949
950
951 /**
952  * The priority level imposes a bound on the maximum
953  * value for the ttl that can be requested.
954  *
955  * @param ttl_in requested ttl
956  * @param prio given priority
957  * @return ttl_in if ttl_in is below the limit,
958  *         otherwise the ttl-limit for the given priority
959  */
960 static int32_t
961 bound_ttl (int32_t ttl_in, uint32_t prio)
962 {
963   unsigned long long allowed;
964
965   if (ttl_in <= 0)
966     return ttl_in;
967   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
968   if (ttl_in > allowed)      
969     {
970       if (allowed >= (1 << 30))
971         return 1 << 30;
972       return allowed;
973     }
974   return ttl_in;
975 }
976
977
978 /**
979  * Handle P2P "QUERY" message.  Creates the pending request entry
980  * and sets up all of the data structures to that we will
981  * process replies properly.  Does not initiate forwarding or
982  * local database lookups.
983  *
984  * @param other the other peer involved (sender or receiver, NULL
985  *        for loopback messages where we are both sender and receiver)
986  * @param message the actual message
987  * @return pending request handle, NULL on error
988  */
989 struct GSF_PendingRequest *
990 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
991                        const struct GNUNET_MessageHeader *message)
992 {
993   struct PeerRequest *peerreq;
994   struct GSF_PendingRequest *pr;
995   struct GSF_PendingRequestData *prd;
996   struct GSF_ConnectedPeer *cp;
997   struct GSF_ConnectedPeer *cps;
998   const GNUNET_HashCode *namespace;
999   const struct GNUNET_PeerIdentity *target;
1000   enum GSF_PendingRequestOptions options;                            
1001   uint16_t msize;
1002   const struct GetMessage *gm;
1003   unsigned int bits;
1004   const GNUNET_HashCode *opt;
1005   uint32_t bm;
1006   size_t bfsize;
1007   uint32_t ttl_decrement;
1008   int32_t priority;
1009   int32_t ttl;
1010   enum GNUNET_BLOCK_Type type;
1011   GNUNET_PEER_Id spid;
1012
1013   msize = ntohs(message->size);
1014   if (msize < sizeof (struct GetMessage))
1015     {
1016       GNUNET_break_op (0);
1017       return NULL;
1018     }
1019   gm = (const struct GetMessage*) message;
1020   type = ntohl (gm->type);
1021   bm = ntohl (gm->hash_bitmap);
1022   bits = 0;
1023   while (bm > 0)
1024     {
1025       if (1 == (bm & 1))
1026         bits++;
1027       bm >>= 1;
1028     }
1029   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
1030     {
1031       GNUNET_break_op (0);
1032       return NULL;
1033     }  
1034   opt = (const GNUNET_HashCode*) &gm[1];
1035   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
1036   /* bfsize must be power of 2, check! */
1037   if (0 != ( (bfsize - 1) & bfsize))
1038     {
1039       GNUNET_break_op (0);
1040       return NULL;
1041     }
1042   GSF_cover_query_count++;
1043   bm = ntohl (gm->hash_bitmap);
1044   bits = 0;
1045   cps = GNUNET_CONTAINER_multihashmap_get (cp_map,
1046                                            &other->hashPubKey);
1047   if (NULL == cps)
1048     {
1049       /* peer must have just disconnected */
1050       GNUNET_STATISTICS_update (GSF_stats,
1051                                 gettext_noop ("# requests dropped due to initiator not being connected"),
1052                                 1,
1053                                 GNUNET_NO);
1054       return NULL;
1055     }
1056   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1057     cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1058                                             &opt[bits++]);
1059   else
1060     cp = cps;
1061   if (cp == NULL)
1062     {
1063 #if DEBUG_FS
1064       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1065         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1066                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
1067                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
1068       
1069       else
1070         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
1072                     GNUNET_i2s (other));
1073 #endif
1074       GNUNET_STATISTICS_update (GSF_stats,
1075                                 gettext_noop ("# requests dropped due to missing reverse route"),
1076                                 1,
1077                                 GNUNET_NO);
1078       return NULL;
1079     }
1080   /* note that we can really only check load here since otherwise
1081      peers could find out that we are overloaded by not being
1082      disconnected after sending us a malformed query... */
1083   priority = bound_priority (ntohl (gm->priority), cps);
1084   if (priority < 0)
1085     {
1086 #if DEBUG_FS
1087       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1088                   "Dropping query from `%s', this peer is too busy.\n",
1089                   GNUNET_i2s (other));
1090 #endif
1091       return NULL;
1092     }
1093 #if DEBUG_FS 
1094   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
1096               GNUNET_h2s (&gm->query),
1097               (unsigned int) type,
1098               GNUNET_i2s (other),
1099               (unsigned int) bm);
1100 #endif
1101   namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
1102   if ( (type == GNUNET_BLOCK_TYPE_FS_SBLOCK) &&
1103        (namespace == NULL) )
1104     {
1105       GNUNET_break_op (0);
1106       return NULL;
1107     }
1108   if ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) &&
1109        (namespace != NULL) )
1110     {
1111       GNUNET_break_op (0);
1112       return NULL;
1113     }
1114   target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
1115   options = 0;
1116   spid = 0;
1117   if ( (GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority)) ||
1118        (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) > 
1119         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)) )
1120     {
1121       /* don't have BW to send to peer, or would likely take longer than we have for it,
1122          so at best indirect the query */
1123       priority = 0;
1124       options |= GSF_PRO_FORWARD_ONLY;
1125       spid = GNUNET_PEER_intern (other);
1126     }
1127   ttl = bound_ttl (ntohl (gm->ttl), priority);
1128   /* decrement ttl (always) */
1129   ttl_decrement = 2 * TTL_DECREMENT +
1130     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1131                               TTL_DECREMENT);
1132   if ( (ttl < 0) &&
1133        (((int32_t)(ttl - ttl_decrement)) > 0) )
1134     {
1135 #if DEBUG_FS
1136       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1138                   GNUNET_i2s (other),
1139                   ttl,
1140                   ttl_decrement);
1141 #endif
1142       GNUNET_STATISTICS_update (GSF_stats,
1143                                 gettext_noop ("# requests dropped due TTL underflow"),
1144                                 1,
1145                                 GNUNET_NO);
1146       /* integer underflow => drop (should be very rare)! */      
1147       return NULL;
1148     } 
1149   ttl -= ttl_decrement;
1150
1151   /* test if the request already exists */
1152   peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
1153                                                &gm->query);
1154   if (peerreq != NULL) 
1155     {      
1156       pr = peerreq->pr;
1157       prd = GSF_pending_request_get_data_ (pr);
1158       if ( (prd->type == type) &&
1159            ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) ||
1160              (0 == memcmp (&prd->namespace,
1161                            namespace,
1162                            sizeof (GNUNET_HashCode))) ) )
1163         {
1164           if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl)
1165             {
1166               /* existing request has higher TTL, drop new one! */
1167               prd->priority += priority;
1168 #if DEBUG_FS
1169               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170                           "Have existing request with higher TTL, dropping new request.\n",
1171                           GNUNET_i2s (other));
1172 #endif
1173               GNUNET_STATISTICS_update (GSF_stats,
1174                                         gettext_noop ("# requests dropped due to higher-TTL request"),
1175                                         1,
1176                                         GNUNET_NO);
1177               return NULL;
1178             }
1179           /* existing request has lower TTL, drop old one! */
1180           priority += prd->priority;
1181           GSF_pending_request_cancel_ (pr);
1182           GNUNET_assert (GNUNET_YES ==
1183                          GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
1184                                                                &gm->query,
1185                                                                peerreq));
1186           if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
1187             {
1188               GNUNET_SCHEDULER_cancel (peerreq->kill_task);
1189               peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
1190             }
1191           GNUNET_free (peerreq);
1192         }
1193     }
1194   
1195   peerreq = GNUNET_malloc (sizeof (struct PeerRequest));
1196   peerreq->cp = cp; 
1197   pr = GSF_pending_request_create_ (options,
1198                                     type,
1199                                     &gm->query,
1200                                     namespace,
1201                                     target,
1202                                     (bfsize > 0) ? (const char*)&opt[bits] : NULL,
1203                                     bfsize,
1204                                     ntohl (gm->filter_mutator),
1205                                     1 /* anonymity */,
1206                                     (uint32_t) priority,
1207                                     ttl,
1208                                     spid,
1209                                     NULL, 0, /* replies_seen */
1210                                     &handle_p2p_reply,
1211                                     peerreq);
1212   GNUNET_assert (NULL != pr);
1213   peerreq->pr = pr;
1214   GNUNET_break (GNUNET_OK ==
1215                 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1216                                                    &gm->query,
1217                                                    peerreq,
1218                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1219   GNUNET_STATISTICS_update (GSF_stats,
1220                             gettext_noop ("# P2P searches received"),
1221                             1,
1222                             GNUNET_NO);
1223   GNUNET_STATISTICS_update (GSF_stats,
1224                             gettext_noop ("# P2P searches active"),
1225                             1,
1226                             GNUNET_NO);
1227   return pr;
1228 }
1229
1230
1231 /**
1232  * Function called if there has been a timeout trying to satisfy
1233  * a transmission request.
1234  *
1235  * @param cls the 'struct GSF_PeerTransmitHandle' of the request 
1236  * @param tc scheduler context
1237  */
1238 static void
1239 peer_transmit_timeout (void *cls,
1240                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1241 {
1242   struct GSF_PeerTransmitHandle *pth = cls;
1243   struct GSF_ConnectedPeer *cp;
1244
1245 #if DEBUG_FS
1246   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1247               "Timeout trying to transmit to other peer\n");
1248 #endif  
1249   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1250   cp = pth->cp;
1251   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1252                                cp->pth_tail,
1253                                pth);
1254   if (GNUNET_YES == pth->is_query)
1255     GNUNET_assert (0 < cp->ppd.pending_queries--);    
1256   else if (GNUNET_NO == pth->is_query)
1257     GNUNET_assert (0 < cp->ppd.pending_replies--);
1258   GNUNET_LOAD_update (cp->ppd.transmission_delay,
1259                       UINT64_MAX);
1260   if (NULL != pth->cth)
1261     {
1262       GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
1263       pth->cth = NULL;
1264     }
1265   pth->gmc (pth->gmc_cls, 
1266             0, NULL);
1267   GNUNET_assert (0 == pth->cth_in_progress);
1268   GNUNET_free (pth);
1269 }
1270
1271
1272 /**
1273  * Transmit a message to the given peer as soon as possible.
1274  * If the peer disconnects before the transmission can happen,
1275  * the callback is invoked with a 'NULL' buffer.
1276  *
1277  * @param cp target peer
1278  * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
1279  * @param priority how important is this request?
1280  * @param timeout when does this request timeout (call gmc with error)
1281  * @param size number of bytes we would like to send to the peer
1282  * @param gmc function to call to get the message
1283  * @param gmc_cls closure for gmc
1284  * @return handle to cancel request
1285  */
1286 struct GSF_PeerTransmitHandle *
1287 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1288                     int is_query,
1289                     uint32_t priority,
1290                     struct GNUNET_TIME_Relative timeout,
1291                     size_t size,
1292                     GSF_GetMessageCallback gmc,
1293                     void *gmc_cls)
1294 {
1295   struct GSF_PeerTransmitHandle *pth;
1296   struct GSF_PeerTransmitHandle *pos;
1297   struct GSF_PeerTransmitHandle *prev;
1298
1299   pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
1300   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1301   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1302   pth->gmc = gmc;
1303   pth->gmc_cls = gmc_cls;
1304   pth->size = size;
1305   pth->is_query = is_query;
1306   pth->priority = priority;
1307   pth->cp = cp;
1308   /* insertion sort (by priority, descending) */
1309   prev = NULL;
1310   pos = cp->pth_head;
1311   while ( (pos != NULL) &&
1312           (pos->priority > priority) )
1313     {
1314       prev = pos;
1315       pos = pos->next;
1316     }
1317   if (prev == NULL)
1318     GNUNET_CONTAINER_DLL_insert (cp->pth_head,
1319                                  cp->pth_tail,
1320                                  pth);
1321   else
1322     GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1323                                        cp->pth_tail,
1324                                        prev,
1325                                        pth);
1326   if (GNUNET_YES == is_query)
1327     cp->ppd.pending_queries++;
1328   else if (GNUNET_NO == is_query)
1329     cp->ppd.pending_replies++;
1330   pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
1331                                                     &peer_transmit_timeout,
1332                                                     pth);
1333   schedule_transmission (pth);
1334   return pth;
1335 }
1336
1337
1338 /**
1339  * Cancel an earlier request for transmission.
1340  *
1341  * @param pth request to cancel
1342  */
1343 void
1344 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1345 {
1346   struct GSF_ConnectedPeer *cp;
1347
1348   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1349     {
1350       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1351       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1352     }
1353   if (NULL != pth->cth)
1354     {
1355       GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
1356       pth->cth = NULL;
1357     }
1358   cp = pth->cp;
1359   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1360                                cp->pth_tail,
1361                                pth);
1362   if (GNUNET_YES == pth->is_query)
1363     GNUNET_assert (0 < cp->ppd.pending_queries--);    
1364   else if (GNUNET_NO == pth->is_query)
1365     GNUNET_assert (0 < cp->ppd.pending_replies--);
1366   GNUNET_assert (0 == pth->cth_in_progress);
1367   GNUNET_free (pth);
1368 }
1369
1370
1371 /**
1372  * Report on receiving a reply; update the performance record of the given peer.
1373  *
1374  * @param cp responding peer (will be updated)
1375  * @param request_time time at which the original query was transmitted
1376  * @param request_priority priority of the original request
1377  */
1378 void
1379 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1380                               struct GNUNET_TIME_Absolute request_time,
1381                               uint32_t request_priority)
1382 {
1383   struct GNUNET_TIME_Relative delay;
1384
1385   delay = GNUNET_TIME_absolute_get_duration (request_time);  
1386   cp->ppd.avg_reply_delay.rel_value = (cp->ppd.avg_reply_delay.rel_value * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
1387   cp->ppd.avg_priority = (cp->ppd.avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
1388 }
1389
1390
1391 /**
1392  * Report on receiving a reply in response to an initiating client.
1393  * Remember that this peer is good for this client.
1394  *
1395  * @param cp responding peer (will be updated)
1396  * @param initiator_client local client on responsible for query
1397  */
1398 void
1399 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1400                                    struct GSF_LocalClient *initiator_client)
1401 {
1402   cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1403 }
1404
1405
1406 /**
1407  * Report on receiving a reply in response to an initiating peer.
1408  * Remember that this peer is good for this initiating peer.
1409  *
1410  * @param cp responding peer (will be updated)
1411  * @param initiator_peer other peer responsible for query
1412  */
1413 void
1414 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1415                                  const struct GSF_ConnectedPeer *initiator_peer)
1416 {
1417   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
1418   cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->ppd.pid;
1419   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1420 }
1421
1422
1423 /**
1424  * Method called whenever a given peer has a status change.
1425  *
1426  * @param cls closure
1427  * @param peer peer identity this notification is about
1428  * @param bandwidth_in available amount of inbound bandwidth
1429  * @param bandwidth_out available amount of outbound bandwidth
1430  * @param timeout absolute time when this peer will time out
1431  *        unless we see some further activity from it
1432  * @param atsi status information
1433  */
1434 void
1435 GSF_peer_status_handler_ (void *cls,
1436                           const struct GNUNET_PeerIdentity *peer,
1437                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1438                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1439                           struct GNUNET_TIME_Absolute timeout,
1440                           const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1441 {
1442   struct GSF_ConnectedPeer *cp;
1443
1444   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1445                                           &peer->hashPubKey);
1446   GNUNET_assert (NULL != cp);
1447   update_atsi (cp, atsi);
1448 }
1449
1450
1451 /**
1452  * Cancel all requests associated with the peer.
1453  *
1454  * @param cls unused
1455  * @param query hash code of the request
1456  * @param value the 'struct GSF_PendingRequest'
1457  * @return GNUNET_YES (continue to iterate)
1458  */
1459 static int
1460 cancel_pending_request (void *cls,
1461                         const GNUNET_HashCode *query,
1462                         void *value)
1463 {
1464   struct PeerRequest *peerreq = value;
1465   struct GSF_PendingRequest *pr = peerreq->pr;
1466
1467   GSF_pending_request_cancel_ (pr);
1468   if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
1469     {
1470       GNUNET_SCHEDULER_cancel (peerreq->kill_task);
1471       peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
1472     }
1473   GNUNET_free (peerreq);
1474   return GNUNET_OK;
1475 }
1476
1477
1478 /**
1479  * A peer disconnected from us.  Tear down the connected peer
1480  * record.
1481  *
1482  * @param cls unused
1483  * @param peer identity of peer that connected
1484  */
1485 void
1486 GSF_peer_disconnect_handler_ (void *cls,
1487                               const struct GNUNET_PeerIdentity *peer)
1488 {
1489   struct GSF_ConnectedPeer *cp;
1490   struct GSF_PeerTransmitHandle *pth;
1491
1492   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1493                                           &peer->hashPubKey);
1494   if (NULL == cp)
1495     return; /* must have been disconnect from core with
1496                'peer' == my_id, ignore */
1497   GNUNET_CONTAINER_multihashmap_remove (cp_map,
1498                                         &peer->hashPubKey,
1499                                         cp);
1500   if (NULL != cp->migration_pth)
1501     {
1502       GSF_peer_transmit_cancel_ (cp->migration_pth);
1503       cp->migration_pth = NULL;
1504     }
1505   if (NULL != cp->irc)
1506     {
1507       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1508       cp->irc = NULL;
1509     }
1510   if (GNUNET_SCHEDULER_NO_TASK != cp->irc_delay_task)
1511     {
1512       GNUNET_SCHEDULER_cancel (cp->irc_delay_task);
1513       cp->irc_delay_task = GNUNET_SCHEDULER_NO_TASK;
1514     }
1515   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1516                                          &cancel_pending_request,
1517                                          cp);
1518   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1519   cp->request_map = NULL;
1520   GSF_plan_notify_peer_disconnect_ (cp);
1521   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1522   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1523   while (NULL != (pth = cp->pth_head))
1524     {
1525       if (NULL != pth->cth)
1526         {
1527           GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
1528           pth->cth = NULL;
1529         }
1530       if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1531         {
1532           GNUNET_SCHEDULER_cancel (pth->timeout_task);
1533           pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1534         }
1535       GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1536                                    cp->pth_tail,
1537                                    pth);
1538       GNUNET_assert (0 == pth->cth_in_progress);
1539       GNUNET_free (pth);
1540     }
1541   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1542   if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1543     {
1544       GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1545       cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
1546     }
1547   GNUNET_free (cp);
1548 }
1549
1550
1551 /**
1552  * Closure for 'call_iterator'.
1553  */
1554 struct IterationContext
1555 {
1556   /**
1557    * Function to call on each entry.
1558    */
1559   GSF_ConnectedPeerIterator it;
1560
1561   /**
1562    * Closure for 'it'.
1563    */
1564   void *it_cls;
1565 };
1566
1567
1568 /**
1569  * Function that calls the callback for each peer.
1570  *
1571  * @param cls the 'struct IterationContext*'
1572  * @param key identity of the peer
1573  * @param value the 'struct GSF_ConnectedPeer*'
1574  * @return GNUNET_YES to continue iteration
1575  */
1576 static int
1577 call_iterator (void *cls,
1578                const GNUNET_HashCode *key,
1579                void *value)
1580 {
1581   struct IterationContext *ic = cls;
1582   struct GSF_ConnectedPeer *cp = value;
1583   
1584   ic->it (ic->it_cls,
1585           (const struct GNUNET_PeerIdentity*) key,
1586           cp,
1587           &cp->ppd);
1588   return GNUNET_YES;
1589 }
1590
1591
1592 /**
1593  * Iterate over all connected peers.
1594  *
1595  * @param it function to call for each peer
1596  * @param it_cls closure for it
1597  */
1598 void
1599 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1600                               void *it_cls)
1601 {
1602   struct IterationContext ic;
1603
1604   ic.it = it;
1605   ic.it_cls = it_cls;
1606   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1607                                          &call_iterator,
1608                                          &ic);
1609 }
1610
1611
1612 /**
1613  * Obtain the identity of a connected peer.
1614  *
1615  * @param cp peer to reserve bandwidth from
1616  * @param id identity to set (written to)
1617  */
1618 void
1619 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1620                                   struct GNUNET_PeerIdentity *id)
1621 {
1622   GNUNET_PEER_resolve (cp->ppd.pid,
1623                        id);
1624 }
1625
1626
1627 /**
1628  * Assemble a migration stop message for transmission.
1629  *
1630  * @param cls the 'struct GSF_ConnectedPeer' to use
1631  * @param size number of bytes we're allowed to write to buf
1632  * @param buf where to copy the message
1633  * @return number of bytes copied to buf
1634  */
1635 static size_t
1636 create_migration_stop_message (void *cls,
1637                                size_t size,
1638                                void *buf)
1639 {
1640   struct GSF_ConnectedPeer *cp = cls;
1641   struct MigrationStopMessage msm;
1642
1643   cp->migration_pth = NULL;
1644   if (NULL == buf)
1645     return 0;
1646   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1647   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1648   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1649   msm.reserved = htonl (0);
1650   msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block));
1651   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1652   return sizeof (struct MigrationStopMessage);
1653 }
1654
1655
1656 /**
1657  * Ask a peer to stop migrating data to us until the given point
1658  * in time.
1659  * 
1660  * @param cp peer to ask
1661  * @param block_time until when to block
1662  */
1663 void
1664 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1665                            struct GNUNET_TIME_Relative block_time)
1666 {
1667   if (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block).rel_value > block_time.rel_value)
1668     {
1669 #if DEBUG_FS && 0
1670       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1671           "Migration already blocked for another %llu ms\n",
1672                   (unsigned long long) GNUNET_TIME_absolute_get_remaining (cp->last_migration_block).rel_value);
1673 #endif
1674       return; /* already blocked */
1675     }
1676 #if DEBUG_FS && 0
1677   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1678               "Asking to stop migration for %llu ms\n",
1679               (unsigned long long) block_time.rel_value);
1680 #endif
1681   cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
1682   if (cp->migration_pth != NULL)
1683     GSF_peer_transmit_cancel_ (cp->migration_pth);
1684   cp->migration_pth 
1685     = GSF_peer_transmit_ (cp,
1686                           GNUNET_SYSERR,
1687                           UINT32_MAX,
1688                           GNUNET_TIME_UNIT_FOREVER_REL,
1689                           sizeof (struct MigrationStopMessage),
1690                           &create_migration_stop_message,
1691                           cp);
1692 }
1693
1694
1695 /**
1696  * Write host-trust information to a file - flush the buffer entry!
1697  *
1698  * @param cls closure, not used
1699  * @param key host identity
1700  * @param value the 'struct GSF_ConnectedPeer' to flush
1701  * @return GNUNET_OK to continue iteration
1702  */
1703 static int
1704 flush_trust (void *cls,
1705              const GNUNET_HashCode *key,
1706              void *value)
1707 {
1708   struct GSF_ConnectedPeer *cp = value;
1709   char *fn;
1710   uint32_t trust;
1711   struct GNUNET_PeerIdentity pid;
1712
1713   if (cp->ppd.trust == cp->disk_trust)
1714     return GNUNET_OK;                     /* unchanged */
1715   GNUNET_PEER_resolve (cp->ppd.pid,
1716                        &pid);
1717   fn = get_trust_filename (&pid);
1718   if (cp->ppd.trust == 0)
1719     {
1720       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1721         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1722                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1723     }
1724   else
1725     {
1726       trust = htonl (cp->ppd.trust);
1727       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1728                                                     sizeof(uint32_t),
1729                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1730                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1731         cp->disk_trust = cp->ppd.trust;
1732     }
1733   GNUNET_free (fn);
1734   return GNUNET_OK;
1735 }
1736
1737
1738 /**
1739  * Notify core about a preference we have for the given peer
1740  * (to allocate more resources towards it).  The change will
1741  * be communicated the next time we reserve bandwidth with
1742  * core (not instantly).
1743  *
1744  * @param cp peer to reserve bandwidth from
1745  * @param pref preference change
1746  */
1747 void
1748 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1749                                        uint64_t pref)
1750 {
1751   cp->inc_preference += pref;
1752 }
1753
1754
1755 /**
1756  * Call this method periodically to flush trust information to disk.
1757  *
1758  * @param cls closure, not used
1759  * @param tc task context, not used
1760  */
1761 static void
1762 cron_flush_trust (void *cls,
1763                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1764 {
1765
1766   if (NULL == cp_map)
1767     return;
1768   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1769                                          &flush_trust,
1770                                          NULL);
1771   if (NULL == tc)
1772     return;
1773   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1774     return;
1775   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, 
1776                                 &cron_flush_trust, 
1777                                 NULL);
1778 }
1779
1780
1781 /**
1782  * Initialize peer management subsystem.
1783  */
1784 void
1785 GSF_connected_peer_init_ ()
1786 {
1787   cp_map = GNUNET_CONTAINER_multihashmap_create (128);
1788   GNUNET_assert (GNUNET_OK ==
1789                  GNUNET_CONFIGURATION_get_value_filename (GSF_cfg,
1790                                                           "fs",
1791                                                           "TRUST",
1792                                                           &trustDirectory));
1793   GNUNET_DISK_directory_create (trustDirectory);
1794   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1795                                       &cron_flush_trust, NULL);
1796 }
1797
1798
1799 /**
1800  * Iterator to free peer entries.
1801  *
1802  * @param cls closure, unused
1803  * @param key current key code
1804  * @param value value in the hash map (peer entry)
1805  * @return GNUNET_YES (we should continue to iterate)
1806  */
1807 static int 
1808 clean_peer (void *cls,
1809             const GNUNET_HashCode * key,
1810             void *value)
1811 {
1812   GSF_peer_disconnect_handler_ (NULL, 
1813                                 (const struct GNUNET_PeerIdentity*) key);
1814   return GNUNET_YES;
1815 }
1816
1817
1818 /**
1819  * Shutdown peer management subsystem.
1820  */
1821 void
1822 GSF_connected_peer_done_ ()
1823 {
1824   cron_flush_trust (NULL, NULL);
1825   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1826                                          &clean_peer,
1827                                          NULL);
1828   GNUNET_CONTAINER_multihashmap_destroy (cp_map);
1829   cp_map = NULL;
1830   GNUNET_free (trustDirectory);
1831   trustDirectory = NULL;
1832 }
1833
1834
1835 /**
1836  * Iterator to remove references to LC entry.
1837  *
1838  * @param cls the 'struct GSF_LocalClient*' to look for
1839  * @param key current key code
1840  * @param value value in the hash map (peer entry)
1841  * @return GNUNET_YES (we should continue to iterate)
1842  */
1843 static int 
1844 clean_local_client (void *cls,
1845                     const GNUNET_HashCode * key,
1846                     void *value)
1847 {
1848   const struct GSF_LocalClient *lc = cls;
1849   struct GSF_ConnectedPeer *cp = value;
1850   unsigned int i;
1851
1852   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1853     if (cp->ppd.last_client_replies[i] == lc)
1854       cp->ppd.last_client_replies[i] = NULL;
1855   return GNUNET_YES;
1856 }
1857
1858
1859 /**
1860  * Notification that a local client disconnected.  Clean up all of our
1861  * references to the given handle.
1862  *
1863  * @param lc handle to the local client (henceforth invalid)
1864  */
1865 void
1866 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1867 {
1868   if (NULL == cp_map)
1869     return; /* already cleaned up */
1870   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1871                                          &clean_local_client,
1872                                          (void*) lc);
1873 }
1874
1875
1876 /* end of gnunet-service-fs_cp.c */