track performance data
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_pe.h"
31 #include "gnunet-service-fs_pr.h"
32 #include "gnunet-service-fs_push.h"
33
34 /**
35  * How often do we flush trust values to disk?
36  */
37 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
38
39 /**
40  * After how long do we discard a reply?
41  */
42 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
43
44
45 /**
46  * Handle to cancel a transmission request.
47  */
48 struct GSF_PeerTransmitHandle
49 {
50
51   /**
52    * Kept in a doubly-linked list.
53    */
54   struct GSF_PeerTransmitHandle *next;
55
56   /**
57    * Kept in a doubly-linked list.
58    */
59   struct GSF_PeerTransmitHandle *prev;
60
61   /**
62    * Handle for an active request for transmission to this
63    * peer, or NULL (if core queue was full).
64    */
65   struct GNUNET_CORE_TransmitHandle *cth;
66
67   /**
68    * Time when this transmission request was issued.
69    */
70   struct GNUNET_TIME_Absolute transmission_request_start_time;
71
72   /**
73    * Timeout for this request.
74    */
75   struct GNUNET_TIME_Absolute timeout;
76
77   /**
78    * Task called on timeout, or 0 for none.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
81
82   /**
83    * Function to call to get the actual message.
84    */
85   GSF_GetMessageCallback gmc;
86
87   /**
88    * Peer this request targets.
89    */
90   struct GSF_ConnectedPeer *cp;
91
92   /**
93    * Closure for 'gmc'.
94    */
95   void *gmc_cls;
96
97   /**
98    * Size of the message to be transmitted.
99    */
100   size_t size;
101
102   /**
103    * Set to 1 if we're currently in the process of calling
104    * 'GNUNET_CORE_notify_transmit_ready' (so while cth is
105    * NULL, we should not call notify_transmit_ready for this
106    * handle right now).
107    */
108   unsigned int cth_in_progress;
109
110   /**
111    * GNUNET_YES if this is a query, GNUNET_NO for content.
112    */
113   int is_query;
114
115   /**
116    * Did we get a reservation already?
117    */
118   int was_reserved;
119
120   /**
121    * Priority of this request.
122    */
123   uint32_t priority;
124
125 };
126
127
128 /**
129  * Information per peer and request.
130  */
131 struct PeerRequest
132 {
133
134   /**
135    * Handle to generic request.
136    */
137   struct GSF_PendingRequest *pr;
138   
139   /**
140    * Handle to specific peer.
141    */
142   struct GSF_ConnectedPeer *cp;
143
144   /**
145    * Task for asynchronous stopping of this request.
146    */
147   GNUNET_SCHEDULER_TaskIdentifier kill_task;
148
149 };
150
151
152 /**
153  * A connected peer.
154  */
155 struct GSF_ConnectedPeer 
156 {
157
158   /**
159    * Performance data for this peer.
160    */
161   struct GSF_PeerPerformanceData ppd;
162
163   /**
164    * Time until when we blocked this peer from migrating
165    * data to us.
166    */
167   struct GNUNET_TIME_Absolute last_migration_block;
168
169   /**
170    * Task scheduled to revive migration to this peer.
171    */
172   GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
173
174   /**
175    * Messages (replies, queries, content migration) we would like to
176    * send to this peer in the near future.  Sorted by priority, head.
177    */
178   struct GSF_PeerTransmitHandle *pth_head;
179
180   /**
181    * Messages (replies, queries, content migration) we would like to
182    * send to this peer in the near future.  Sorted by priority, tail.
183    */
184   struct GSF_PeerTransmitHandle *pth_tail;
185
186   /**
187    * Migration stop message in our queue, or NULL if we have none pending.
188    */
189   struct GSF_PeerTransmitHandle *migration_pth;
190
191   /**
192    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
193    */
194   struct GNUNET_CORE_InformationRequestContext *irc;
195
196   /**
197    * Task scheduled if we need to retry bandwidth reservation later.
198    */
199   GNUNET_SCHEDULER_TaskIdentifier irc_delay_task;
200
201   /**
202    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
203    */
204   struct GNUNET_CONTAINER_MultiHashMap *request_map;
205
206   /**
207    * Increase in traffic preference still to be submitted
208    * to the core service for this peer.
209    */
210   uint64_t inc_preference;
211
212   /**
213    * Trust rating for this peer on disk.
214    */
215   uint32_t disk_trust;
216
217   /**
218    * Which offset in "last_p2p_replies" will be updated next?
219    * (we go round-robin).
220    */
221   unsigned int last_p2p_replies_woff;
222
223   /**
224    * Which offset in "last_client_replies" will be updated next?
225    * (we go round-robin).
226    */
227   unsigned int last_client_replies_woff;
228
229   /**
230    * Current offset into 'last_request_times' ring buffer.
231    */
232   unsigned int last_request_times_off;
233
234   /**
235    * GNUNET_YES if we did successfully reserve 32k bandwidth,
236    * GNUNET_NO if not.
237    */
238   int did_reserve;
239
240 };
241
242
243 /**
244  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
245  */
246 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
247
248 /**
249  * Where do we store trust information?
250  */
251 static char *trustDirectory;
252
253
254 /**
255  * Get the filename under which we would store the GNUNET_HELLO_Message
256  * for the given host and protocol.
257  * @return filename of the form DIRECTORY/HOSTID
258  */
259 static char *
260 get_trust_filename (const struct GNUNET_PeerIdentity *id)
261 {
262   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
263   char *fn;
264
265   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
266   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
267   return fn;
268 }
269
270
271 /**
272  * Find latency information in 'atsi'.
273  *
274  * @param atsi performance data
275  * @return connection latency
276  */
277 static struct GNUNET_TIME_Relative
278 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
279 {
280   if (atsi == NULL)
281     return GNUNET_TIME_UNIT_SECONDS;
282   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
283           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
284     atsi++;
285   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
286     {
287       GNUNET_break (0);
288       /* how can we not have latency data? */
289       return GNUNET_TIME_UNIT_SECONDS;
290     }
291   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
292                                         ntohl (atsi->value));
293 }
294
295
296 /**
297  * Update the performance information kept for the given peer.
298  *
299  * @param cp peer record to update
300  * @param atsi transport performance data
301  */
302 static void
303 update_atsi (struct GSF_ConnectedPeer *cp,
304              const struct GNUNET_TRANSPORT_ATS_Information *atsi)
305 {
306   struct GNUNET_TIME_Relative latency;
307
308   latency = get_latency (atsi);
309   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
310                                  latency);
311   /* LATER: merge atsi into cp's performance data (if we ever care...) */
312 }
313
314
315 /**
316  * Return the performance data record for the given peer
317  * 
318  * @param cp peer to query
319  * @return performance data record for the peer
320  */
321 struct GSF_PeerPerformanceData *
322 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
323 {
324   return &cp->ppd;
325 }
326
327
328 /**
329  * Core is ready to transmit to a peer, get the message.
330  *
331  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
332  * @param size number of bytes core is willing to take
333  * @param buf where to copy the message
334  * @return number of bytes copied to buf
335  */
336 static size_t
337 peer_transmit_ready_cb (void *cls,
338                         size_t size,
339                         void *buf);
340
341
342
343
344 /**
345  * Function called by core upon success or failure of our bandwidth reservation request.
346  *
347  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
348  * @param peer identifies the peer
349  * @param bandwidth_out available amount of outbound bandwidth
350  * @param amount set to the amount that was actually reserved or unreserved;
351  *               either the full requested amount or zero (no partial reservations)
352  * @param res_delay if the reservation could not be satisfied (amount was 0), how
353  *        long should the client wait until re-trying?
354  * @param preference current traffic preference for the given peer
355  */
356 static void
357 core_reserve_callback (void *cls,
358                        const struct GNUNET_PeerIdentity *peer,
359                        struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
360                        int32_t amount,
361                        struct GNUNET_TIME_Relative res_delay,
362                        uint64_t preference);
363
364
365 /**
366  * If ready (bandwidth reserved), try to schedule transmission via
367  * core for the given handle.
368  *
369  * @param pth transmission handle to schedule
370  */
371 static void
372 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
373 {
374   struct GSF_ConnectedPeer *cp;
375   struct GNUNET_PeerIdentity target;
376   uint64_t ip;
377
378   if ( (NULL != pth->cth) ||
379        (0 != pth->cth_in_progress) )
380     return; /* already done */
381   cp = pth->cp;
382   GNUNET_PEER_resolve (cp->ppd.pid,
383                        &target);
384   if ( (GNUNET_YES == pth->is_query) &&
385        (GNUNET_YES != pth->was_reserved) )
386     {
387       /* query, need reservation */
388       if (GNUNET_YES != cp->did_reserve)
389         return; /* not ready */
390       cp->did_reserve = GNUNET_NO;
391       /* reservation already done! */
392       pth->was_reserved = GNUNET_YES;
393       ip = cp->inc_preference;
394       cp->inc_preference = 0;
395       cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
396                                                     &target,
397                                                     GNUNET_TIME_UNIT_FOREVER_REL,
398                                                     GNUNET_BANDWIDTH_VALUE_MAX,
399                                                     DBLOCK_SIZE,
400                                                     ip,
401                                                     &core_reserve_callback,
402                                                     cp);          
403     }
404   GNUNET_assert (pth->cth == NULL);
405   pth->cth_in_progress++;
406   pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
407                                                 GNUNET_YES,
408                                                 pth->priority,
409                                                 GNUNET_TIME_absolute_get_remaining (pth->timeout),
410                                                 &target,
411                                                 pth->size,
412                                                 &peer_transmit_ready_cb,
413                                                 pth);
414   GNUNET_assert (0 < pth->cth_in_progress--);
415 }
416
417
418 /**
419  * Core is ready to transmit to a peer, get the message.
420  *
421  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
422  * @param size number of bytes core is willing to take
423  * @param buf where to copy the message
424  * @return number of bytes copied to buf
425  */
426 static size_t
427 peer_transmit_ready_cb (void *cls,
428                         size_t size,
429                         void *buf)
430 {
431   struct GSF_PeerTransmitHandle *pth = cls;
432   struct GSF_PeerTransmitHandle *pos;
433   struct GSF_ConnectedPeer *cp;
434   size_t ret;
435
436   GNUNET_assert ( (NULL == buf) ||
437                   (pth->size <= size) );
438   pth->cth = NULL;
439   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
440     {
441       GNUNET_SCHEDULER_cancel (pth->timeout_task);
442       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
443     }
444   cp = pth->cp;
445   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
446                                cp->pth_tail,
447                                pth);
448   if (GNUNET_YES == pth->is_query)
449     {
450       cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
451       GNUNET_assert (0 < cp->ppd.pending_queries--);    
452     }
453   else if (GNUNET_NO == pth->is_query)
454     {
455       GNUNET_assert (0 < cp->ppd.pending_replies--);
456     }
457   GNUNET_LOAD_update (cp->ppd.transmission_delay,
458                       GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value);
459   ret = pth->gmc (pth->gmc_cls, 
460                   size, buf);
461   GNUNET_assert (NULL == pth->cth);
462   for (pos = cp->pth_head; pos != NULL; pos = pos->next)
463     {
464       GNUNET_assert (pos != pth);
465       schedule_transmission (pos);
466     }
467   GNUNET_assert (pth->cth == NULL);
468   GNUNET_assert (pth->cth_in_progress == 0);
469   GNUNET_free (pth);
470   return ret;
471 }
472
473
474 /**
475  * (re)try to reserve bandwidth from the given peer.
476  *
477  * @param cls the 'struct GSF_ConnectedPeer' to reserve from
478  * @param tc scheduler context
479  */
480 static void
481 retry_reservation (void *cls,
482                    const struct GNUNET_SCHEDULER_TaskContext *tc)
483 {
484   struct GSF_ConnectedPeer *cp = cls;
485   uint64_t ip;
486   struct GNUNET_PeerIdentity target;
487
488   GNUNET_PEER_resolve (cp->ppd.pid,
489                        &target);
490   cp->irc_delay_task = GNUNET_SCHEDULER_NO_TASK;
491   ip = cp->inc_preference;
492   cp->inc_preference = 0;
493   cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
494                                                 &target,
495                                                 GNUNET_TIME_UNIT_FOREVER_REL,
496                                                 GNUNET_BANDWIDTH_VALUE_MAX,
497                                                 DBLOCK_SIZE,
498                                                 ip,
499                                                 &core_reserve_callback,
500                                                 cp);
501 }
502
503
504 /**
505  * Function called by core upon success or failure of our bandwidth reservation request.
506  *
507  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
508  * @param peer identifies the peer
509  * @param bandwidth_out available amount of outbound bandwidth
510  * @param amount set to the amount that was actually reserved or unreserved;
511  *               either the full requested amount or zero (no partial reservations)
512  * @param res_delay if the reservation could not be satisfied (amount was 0), how
513  *        long should the client wait until re-trying?
514  * @param preference current traffic preference for the given peer
515  */
516 static void
517 core_reserve_callback (void *cls,
518                        const struct GNUNET_PeerIdentity *peer,
519                        struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
520                        int32_t amount,
521                        struct GNUNET_TIME_Relative res_delay,
522                        uint64_t preference)
523 {
524   struct GSF_ConnectedPeer *cp = cls;
525   struct GSF_PeerTransmitHandle *pth;
526
527   cp->irc = NULL;
528   if (0 == amount)
529     {
530       cp->irc_delay_task = GNUNET_SCHEDULER_add_delayed (res_delay,
531                                                          &retry_reservation,
532                                                          cp);
533       return;
534     }
535   cp->did_reserve = GNUNET_YES;
536   pth = cp->pth_head;
537   if ( (NULL != pth) &&
538        (NULL == pth->cth) )
539     {
540       /* reservation success, try transmission now! */
541       pth->cth_in_progress++;
542       pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
543                                                     GNUNET_YES,
544                                                     pth->priority,
545                                                     GNUNET_TIME_absolute_get_remaining (pth->timeout),
546                                                     peer,
547                                                     pth->size,
548                                                     &peer_transmit_ready_cb,
549                                                     pth);
550       GNUNET_assert (0 < pth->cth_in_progress--);
551     }
552 }
553
554
555 /**
556  * A peer connected to us.  Setup the connected peer
557  * records.
558  *
559  * @param peer identity of peer that connected
560  * @param atsi performance data for the connection
561  * @return handle to connected peer entry
562  */
563 struct GSF_ConnectedPeer *
564 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
565                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
566 {
567   struct GSF_ConnectedPeer *cp;
568   char *fn;
569   uint32_t trust;
570
571   cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
572   cp->ppd.pid = GNUNET_PEER_intern (peer);
573   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
574   cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
575                                                 peer,
576                                                 GNUNET_TIME_UNIT_FOREVER_REL,
577                                                 GNUNET_BANDWIDTH_VALUE_MAX,
578                                                 DBLOCK_SIZE,
579                                                 0,
580                                                 &core_reserve_callback,
581                                                 cp);
582   fn = get_trust_filename (peer);
583   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
584       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
585     cp->disk_trust = cp->ppd.trust = ntohl (trust);
586   GNUNET_free (fn);
587   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
588   GNUNET_break (GNUNET_OK ==
589                 GNUNET_CONTAINER_multihashmap_put (cp_map,
590                                                    &peer->hashPubKey,
591                                                    cp,
592                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
593   update_atsi (cp, atsi);
594   GSF_push_start_ (cp);
595   return cp;
596 }
597
598
599 /**
600  * It may be time to re-start migrating content to this
601  * peer.  Check, and if so, restart migration.
602  *
603  * @param cls the 'struct GSF_ConnectedPeer'
604  * @param tc scheduler context
605  */
606 static void
607 revive_migration (void *cls,
608                   const struct GNUNET_SCHEDULER_TaskContext *tc)
609 {
610   struct GSF_ConnectedPeer *cp = cls;
611   struct GNUNET_TIME_Relative bt;
612   
613   cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
614   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
615   if (0 != bt.rel_value)
616     {
617       /* still time left... */
618       cp->mig_revive_task 
619         = GNUNET_SCHEDULER_add_delayed (bt,
620                                         &revive_migration,
621                                         cp);
622       return;
623     }
624   GSF_push_start_ (cp);
625 }
626
627
628 /**
629  * Get a handle for a connected peer.
630  *
631  * @param peer peer's identity
632  * @return NULL if the peer is not currently connected
633  */
634 struct GSF_ConnectedPeer *
635 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
636 {
637   return GNUNET_CONTAINER_multihashmap_get (cp_map,
638                                             &peer->hashPubKey);
639 }
640
641
642 /**
643  * Handle P2P "MIGRATION_STOP" message.
644  *
645  * @param cls closure, always NULL
646  * @param other the other peer involved (sender or receiver, NULL
647  *        for loopback messages where we are both sender and receiver)
648  * @param message the actual message
649  * @param atsi performance information
650  * @return GNUNET_OK to keep the connection open,
651  *         GNUNET_SYSERR to close it (signal serious error)
652  */
653 int
654 GSF_handle_p2p_migration_stop_ (void *cls,
655                                 const struct GNUNET_PeerIdentity *other,
656                                 const struct GNUNET_MessageHeader *message,
657                                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
658 {
659   struct GSF_ConnectedPeer *cp; 
660   const struct MigrationStopMessage *msm;
661   struct GNUNET_TIME_Relative bt;
662
663   msm = (const struct MigrationStopMessage*) message;
664   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
665                                           &other->hashPubKey);
666   if (cp == NULL)
667     {
668       GNUNET_break (0);
669       return GNUNET_OK;
670     }
671   bt = GNUNET_TIME_relative_ntoh (msm->duration);
672   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
673               _("Migration of content to peer `%s' blocked for %llu ms\n"),
674               GNUNET_i2s (other),
675               (unsigned long long) bt.rel_value);
676   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
677   if (cp->mig_revive_task == GNUNET_SCHEDULER_NO_TASK)
678     {
679       GSF_push_stop_ (cp);
680       cp->mig_revive_task 
681         = GNUNET_SCHEDULER_add_delayed (bt,
682                                         &revive_migration,
683                                         cp);
684     }
685   update_atsi (cp, atsi);
686   return GNUNET_OK;
687 }
688
689
690 /**
691  * Copy reply and free put message.
692  *
693  * @param cls the 'struct PutMessage'
694  * @param buf_size number of bytes available in buf
695  * @param buf where to copy the message, NULL on error (peer disconnect)
696  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
697  */
698 static size_t 
699 copy_reply (void *cls,
700             size_t buf_size,
701             void *buf)
702 {
703   struct PutMessage *pm = cls;
704   size_t size;
705
706   if (buf != NULL)
707     {
708       GNUNET_assert (buf_size >= ntohs (pm->header.size));
709       size = ntohs (pm->header.size);
710       memcpy (buf, pm, size); 
711       GNUNET_STATISTICS_update (GSF_stats,
712                                 gettext_noop ("# replies transmitted to other peers"),
713                                 1,
714                                 GNUNET_NO); 
715     }
716   else
717     {
718       size = 0;
719       GNUNET_STATISTICS_update (GSF_stats,
720                                 gettext_noop ("# replies dropped"),
721                                 1,
722                                 GNUNET_NO); 
723     }
724   GNUNET_free (pm);
725   return size;
726 }
727
728
729 /**
730  * Free the given request.
731  *
732  * @param cls the request to free
733  * @param tc task context
734  */ 
735 static void
736 peer_request_destroy (void *cls,
737                       const struct GNUNET_SCHEDULER_TaskContext *tc)
738 {
739   struct PeerRequest *peerreq = cls;
740   struct GSF_PendingRequest *pr = peerreq->pr;
741   struct GSF_ConnectedPeer *cp = peerreq->cp;
742   struct GSF_PendingRequestData *prd;
743
744   peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
745   prd = GSF_pending_request_get_data_ (pr);
746   GNUNET_STATISTICS_update (GSF_stats,
747                             gettext_noop ("# P2P searches active"),
748                             -1,
749                             GNUNET_NO);
750   GNUNET_break (GNUNET_OK ==
751                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
752                                                       &prd->query,
753                                                       peerreq));
754   GSF_pending_request_cancel_ (pr);
755   GNUNET_free (peerreq);
756 }
757
758
759 /**
760  * Handle a reply to a pending request.  Also called if a request
761  * expires (then with data == NULL).  The handler may be called
762  * many times (depending on the request type), but will not be
763  * called during or after a call to GSF_pending_request_cancel 
764  * and will also not be called anymore after a call signalling
765  * expiration.
766  *
767  * @param cls 'struct PeerRequest' this is an answer for
768  * @param eval evaluation of the result
769  * @param pr handle to the original pending request
770  * @param expiration when does 'data' expire?
771  * @param type type of the block
772  * @param data response data, NULL on request expiration
773  * @param data_len number of bytes in data
774  */
775 static void
776 handle_p2p_reply (void *cls,
777                   enum GNUNET_BLOCK_EvaluationResult eval,
778                   struct GSF_PendingRequest *pr,
779                   struct GNUNET_TIME_Absolute expiration,
780                   enum GNUNET_BLOCK_Type type,
781                   const void *data,
782                   size_t data_len)
783 {
784   struct PeerRequest *peerreq = cls;
785   struct GSF_ConnectedPeer *cp = peerreq->cp;
786   struct GSF_PendingRequestData *prd;
787   struct PutMessage *pm;
788   size_t msize;
789
790   GNUNET_assert (data_len + sizeof (struct PutMessage) < GNUNET_SERVER_MAX_MESSAGE_SIZE);
791   GNUNET_assert (peerreq->pr == pr);
792   prd = GSF_pending_request_get_data_ (pr);
793   if (NULL == data)
794     {
795       GNUNET_STATISTICS_update (GSF_stats,
796                                 gettext_noop ("# P2P searches active"),
797                                 -1,
798                                 GNUNET_NO);
799       GNUNET_break (GNUNET_OK ==
800                     GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
801                                                           &prd->query,
802                                                           peerreq));
803       GNUNET_free (peerreq);
804       return;
805     }  
806   GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY);
807   if ( (prd->type != type) &&
808        (prd->type != GNUNET_BLOCK_TYPE_ANY) )
809     {
810       GNUNET_break (0);
811       return;
812     }
813 #if DEBUG_FS
814   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815               "Transmitting result for query `%s' to peer\n",
816               GNUNET_h2s (&prd->query));
817 #endif  
818   GNUNET_STATISTICS_update (GSF_stats,
819                             gettext_noop ("# replies received for other peers"),
820                             1,
821                             GNUNET_NO); 
822   msize = sizeof (struct PutMessage) + data_len;
823   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
824     {
825       GNUNET_break (0);
826       return;
827     }
828   pm = GNUNET_malloc (msize);
829   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
830   pm->header.size = htons (msize);
831   pm->type = htonl (type);
832   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
833   memcpy (&pm[1], data, data_len);
834   (void) GSF_peer_transmit_ (cp, GNUNET_NO,
835                              UINT32_MAX,
836                              REPLY_TIMEOUT,
837                              msize,
838                              &copy_reply,
839                              pm);
840   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
841     return;
842   if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task)
843     peerreq->kill_task = GNUNET_SCHEDULER_add_now (&peer_request_destroy,
844                                                    peerreq);
845 }
846
847
848 /**
849  * Increase the host credit by a value.
850  *
851  * @param cp which peer to change the trust value on
852  * @param value is the int value by which the
853  *  host credit is to be increased or decreased
854  * @returns the actual change in trust (positive or negative)
855  */
856 static int
857 change_host_trust (struct GSF_ConnectedPeer *cp, int value)
858 {
859   if (value == 0)
860     return 0;
861   GNUNET_assert (cp != NULL);
862   if (value > 0)
863     {
864       if (cp->ppd.trust + value < cp->ppd.trust)
865         {
866           value = UINT32_MAX - cp->ppd.trust;
867           cp->ppd.trust = UINT32_MAX;
868         }
869       else
870         cp->ppd.trust += value;
871     }
872   else
873     {
874       if (cp->ppd.trust < -value)
875         {
876           value = -cp->ppd.trust;
877           cp->ppd.trust = 0;
878         }
879       else
880         cp->ppd.trust += value;
881     }
882   return value;
883 }
884
885
886 /**
887  * We've received a request with the specified priority.  Bound it
888  * according to how much we trust the given peer.
889  * 
890  * @param prio_in requested priority
891  * @param cp the peer making the request
892  * @return effective priority
893  */
894 static int32_t
895 bound_priority (uint32_t prio_in,
896                 struct GSF_ConnectedPeer *cp)
897 {
898 #define N ((double)128.0)
899   uint32_t ret;
900   double rret;
901   int ld;
902
903   ld = GSF_test_get_load_too_high_ (0);
904   if (ld == GNUNET_SYSERR)
905     {
906       GNUNET_STATISTICS_update (GSF_stats,
907                                 gettext_noop ("# requests done for free (low load)"),
908                                 1,
909                                 GNUNET_NO);
910       return 0; /* excess resources */
911     }
912   if (prio_in > INT32_MAX)
913     prio_in = INT32_MAX;
914   ret = - change_host_trust (cp, - (int) prio_in);
915   if (ret > 0)
916     {
917       if (ret > GSF_current_priorities + N)
918         rret = GSF_current_priorities + N;
919       else
920         rret = ret;
921       GSF_current_priorities 
922         = (GSF_current_priorities * (N-1) + rret)/N;
923     }
924   if ( (ld == GNUNET_YES) && (ret > 0) )
925     {
926       /* try with charging */
927       ld = GSF_test_get_load_too_high_ (ret);
928     }
929   if (ld == GNUNET_YES)
930     {
931       GNUNET_STATISTICS_update (GSF_stats,
932                                 gettext_noop ("# request dropped, priority insufficient"),
933                                 1,
934                                 GNUNET_NO);
935       /* undo charge */
936       change_host_trust (cp, (int) ret);
937       return -1; /* not enough resources */
938     }
939   else
940     {
941       GNUNET_STATISTICS_update (GSF_stats,
942                                 gettext_noop ("# requests done for a price (normal load)"),
943                                 1,
944                                 GNUNET_NO);
945     }
946 #undef N
947   return ret;
948 }
949
950
951 /**
952  * The priority level imposes a bound on the maximum
953  * value for the ttl that can be requested.
954  *
955  * @param ttl_in requested ttl
956  * @param prio given priority
957  * @return ttl_in if ttl_in is below the limit,
958  *         otherwise the ttl-limit for the given priority
959  */
960 static int32_t
961 bound_ttl (int32_t ttl_in, uint32_t prio)
962 {
963   unsigned long long allowed;
964
965   if (ttl_in <= 0)
966     return ttl_in;
967   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
968   if (ttl_in > allowed)      
969     {
970       if (allowed >= (1 << 30))
971         return 1 << 30;
972       return allowed;
973     }
974   return ttl_in;
975 }
976
977
978 /**
979  * Handle P2P "QUERY" message.  Creates the pending request entry
980  * and sets up all of the data structures to that we will
981  * process replies properly.  Does not initiate forwarding or
982  * local database lookups.
983  *
984  * @param other the other peer involved (sender or receiver, NULL
985  *        for loopback messages where we are both sender and receiver)
986  * @param message the actual message
987  * @return pending request handle, NULL on error
988  */
989 struct GSF_PendingRequest *
990 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
991                        const struct GNUNET_MessageHeader *message)
992 {
993   struct PeerRequest *peerreq;
994   struct GSF_PendingRequest *pr;
995   struct GSF_PendingRequestData *prd;
996   struct GSF_ConnectedPeer *cp;
997   struct GSF_ConnectedPeer *cps;
998   const GNUNET_HashCode *namespace;
999   const struct GNUNET_PeerIdentity *target;
1000   enum GSF_PendingRequestOptions options;                            
1001   uint16_t msize;
1002   const struct GetMessage *gm;
1003   unsigned int bits;
1004   const GNUNET_HashCode *opt;
1005   uint32_t bm;
1006   size_t bfsize;
1007   uint32_t ttl_decrement;
1008   int32_t priority;
1009   int32_t ttl;
1010   enum GNUNET_BLOCK_Type type;
1011   GNUNET_PEER_Id spid;
1012
1013   msize = ntohs(message->size);
1014   if (msize < sizeof (struct GetMessage))
1015     {
1016       GNUNET_break_op (0);
1017       return NULL;
1018     }
1019   gm = (const struct GetMessage*) message;
1020   type = ntohl (gm->type);
1021   bm = ntohl (gm->hash_bitmap);
1022   bits = 0;
1023   while (bm > 0)
1024     {
1025       if (1 == (bm & 1))
1026         bits++;
1027       bm >>= 1;
1028     }
1029   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
1030     {
1031       GNUNET_break_op (0);
1032       return NULL;
1033     }  
1034   opt = (const GNUNET_HashCode*) &gm[1];
1035   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
1036   /* bfsize must be power of 2, check! */
1037   if (0 != ( (bfsize - 1) & bfsize))
1038     {
1039       GNUNET_break_op (0);
1040       return NULL;
1041     }
1042   GSF_cover_query_count++;
1043   bm = ntohl (gm->hash_bitmap);
1044   bits = 0;
1045   cps = GNUNET_CONTAINER_multihashmap_get (cp_map,
1046                                            &other->hashPubKey);
1047   if (NULL == cps)
1048     {
1049       /* peer must have just disconnected */
1050       GNUNET_STATISTICS_update (GSF_stats,
1051                                 gettext_noop ("# requests dropped due to initiator not being connected"),
1052                                 1,
1053                                 GNUNET_NO);
1054       return NULL;
1055     }
1056   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1057     cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1058                                             &opt[bits++]);
1059   else
1060     cp = cps;
1061   if (cp == NULL)
1062     {
1063 #if DEBUG_FS
1064       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1065         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1066                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
1067                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
1068       
1069       else
1070         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
1072                     GNUNET_i2s (other));
1073 #endif
1074       GNUNET_STATISTICS_update (GSF_stats,
1075                                 gettext_noop ("# requests dropped due to missing reverse route"),
1076                                 1,
1077                                 GNUNET_NO);
1078       return NULL;
1079     }
1080   /* note that we can really only check load here since otherwise
1081      peers could find out that we are overloaded by not being
1082      disconnected after sending us a malformed query... */
1083   priority = bound_priority (ntohl (gm->priority), cps);
1084   if (priority < 0)
1085     {
1086 #if DEBUG_FS
1087       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1088                   "Dropping query from `%s', this peer is too busy.\n",
1089                   GNUNET_i2s (other));
1090 #endif
1091       return NULL;
1092     }
1093 #if DEBUG_FS 
1094   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
1096               GNUNET_h2s (&gm->query),
1097               (unsigned int) type,
1098               GNUNET_i2s (other),
1099               (unsigned int) bm);
1100 #endif
1101   namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
1102   if ( (type == GNUNET_BLOCK_TYPE_FS_SBLOCK) &&
1103        (namespace == NULL) )
1104     {
1105       GNUNET_break_op (0);
1106       return NULL;
1107     }
1108   if ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) &&
1109        (namespace != NULL) )
1110     {
1111       GNUNET_break_op (0);
1112       return NULL;
1113     }
1114   target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
1115   options = 0;
1116   spid = 0;
1117   if ( (GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority)) ||
1118        (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) > 
1119         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)) )
1120     {
1121       /* don't have BW to send to peer, or would likely take longer than we have for it,
1122          so at best indirect the query */
1123       priority = 0;
1124       options |= GSF_PRO_FORWARD_ONLY;
1125       spid = GNUNET_PEER_intern (other);
1126     }
1127   ttl = bound_ttl (ntohl (gm->ttl), priority);
1128   /* decrement ttl (always) */
1129   ttl_decrement = 2 * TTL_DECREMENT +
1130     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1131                               TTL_DECREMENT);
1132   if ( (ttl < 0) &&
1133        (((int32_t)(ttl - ttl_decrement)) > 0) )
1134     {
1135 #if DEBUG_FS
1136       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1138                   GNUNET_i2s (other),
1139                   ttl,
1140                   ttl_decrement);
1141 #endif
1142       GNUNET_STATISTICS_update (GSF_stats,
1143                                 gettext_noop ("# requests dropped due TTL underflow"),
1144                                 1,
1145                                 GNUNET_NO);
1146       /* integer underflow => drop (should be very rare)! */      
1147       return NULL;
1148     } 
1149   ttl -= ttl_decrement;
1150
1151   /* test if the request already exists */
1152   peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
1153                                                &gm->query);
1154   if (peerreq != NULL) 
1155     {      
1156       pr = peerreq->pr;
1157       prd = GSF_pending_request_get_data_ (pr);
1158       if ( (prd->type == type) &&
1159            ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) ||
1160              (0 == memcmp (&prd->namespace,
1161                            namespace,
1162                            sizeof (GNUNET_HashCode))) ) )
1163         {
1164           if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl)
1165             {
1166               /* existing request has higher TTL, drop new one! */
1167               prd->priority += priority;
1168 #if DEBUG_FS
1169               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170                           "Have existing request with higher TTL, dropping new request.\n",
1171                           GNUNET_i2s (other));
1172 #endif
1173               GNUNET_STATISTICS_update (GSF_stats,
1174                                         gettext_noop ("# requests dropped due to higher-TTL request"),
1175                                         1,
1176                                         GNUNET_NO);
1177               return NULL;
1178             }
1179           /* existing request has lower TTL, drop old one! */
1180           GNUNET_STATISTICS_update (GSF_stats,
1181                                     gettext_noop ("# P2P searches active"),
1182                                     -1,
1183                                     GNUNET_NO);
1184           priority += prd->priority;
1185           GSF_pending_request_cancel_ (pr);
1186           GNUNET_assert (GNUNET_YES ==
1187                          GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
1188                                                                &gm->query,
1189                                                                peerreq));
1190           if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
1191             {
1192               GNUNET_SCHEDULER_cancel (peerreq->kill_task);
1193               peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
1194             }
1195           GNUNET_free (peerreq);
1196         }
1197     }
1198   
1199   peerreq = GNUNET_malloc (sizeof (struct PeerRequest));
1200   peerreq->cp = cp; 
1201   pr = GSF_pending_request_create_ (options,
1202                                     type,
1203                                     &gm->query,
1204                                     namespace,
1205                                     target,
1206                                     (bfsize > 0) ? (const char*)&opt[bits] : NULL,
1207                                     bfsize,
1208                                     ntohl (gm->filter_mutator),
1209                                     1 /* anonymity */,
1210                                     (uint32_t) priority,
1211                                     ttl,
1212                                     spid,
1213                                     NULL, 0, /* replies_seen */
1214                                     &handle_p2p_reply,
1215                                     peerreq);
1216   GNUNET_assert (NULL != pr);
1217   peerreq->pr = pr;
1218   GNUNET_break (GNUNET_OK ==
1219                 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1220                                                    &gm->query,
1221                                                    peerreq,
1222                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1223   GNUNET_STATISTICS_update (GSF_stats,
1224                             gettext_noop ("# P2P query messages received and processed"),
1225                             1,
1226                             GNUNET_NO);
1227   GNUNET_STATISTICS_update (GSF_stats,
1228                             gettext_noop ("# P2P searches active"),
1229                             1,
1230                             GNUNET_NO);
1231   return pr;
1232 }
1233
1234
1235 /**
1236  * Function called if there has been a timeout trying to satisfy
1237  * a transmission request.
1238  *
1239  * @param cls the 'struct GSF_PeerTransmitHandle' of the request 
1240  * @param tc scheduler context
1241  */
1242 static void
1243 peer_transmit_timeout (void *cls,
1244                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1245 {
1246   struct GSF_PeerTransmitHandle *pth = cls;
1247   struct GSF_ConnectedPeer *cp;
1248
1249 #if DEBUG_FS
1250   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1251               "Timeout trying to transmit to other peer\n");
1252 #endif  
1253   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1254   cp = pth->cp;
1255   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1256                                cp->pth_tail,
1257                                pth);
1258   if (GNUNET_YES == pth->is_query)
1259     GNUNET_assert (0 < cp->ppd.pending_queries--);    
1260   else if (GNUNET_NO == pth->is_query)
1261     GNUNET_assert (0 < cp->ppd.pending_replies--);
1262   GNUNET_LOAD_update (cp->ppd.transmission_delay,
1263                       UINT64_MAX);
1264   if (NULL != pth->cth)
1265     {
1266       GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
1267       pth->cth = NULL;
1268     }
1269   pth->gmc (pth->gmc_cls, 
1270             0, NULL);
1271   GNUNET_assert (0 == pth->cth_in_progress);
1272   GNUNET_free (pth);
1273 }
1274
1275
1276 /**
1277  * Transmit a message to the given peer as soon as possible.
1278  * If the peer disconnects before the transmission can happen,
1279  * the callback is invoked with a 'NULL' buffer.
1280  *
1281  * @param cp target peer
1282  * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
1283  * @param priority how important is this request?
1284  * @param timeout when does this request timeout (call gmc with error)
1285  * @param size number of bytes we would like to send to the peer
1286  * @param gmc function to call to get the message
1287  * @param gmc_cls closure for gmc
1288  * @return handle to cancel request
1289  */
1290 struct GSF_PeerTransmitHandle *
1291 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1292                     int is_query,
1293                     uint32_t priority,
1294                     struct GNUNET_TIME_Relative timeout,
1295                     size_t size,
1296                     GSF_GetMessageCallback gmc,
1297                     void *gmc_cls)
1298 {
1299   struct GSF_PeerTransmitHandle *pth;
1300   struct GSF_PeerTransmitHandle *pos;
1301   struct GSF_PeerTransmitHandle *prev;
1302
1303   pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
1304   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1305   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1306   pth->gmc = gmc;
1307   pth->gmc_cls = gmc_cls;
1308   pth->size = size;
1309   pth->is_query = is_query;
1310   pth->priority = priority;
1311   pth->cp = cp;
1312   /* insertion sort (by priority, descending) */
1313   prev = NULL;
1314   pos = cp->pth_head;
1315   while ( (pos != NULL) &&
1316           (pos->priority > priority) )
1317     {
1318       prev = pos;
1319       pos = pos->next;
1320     }
1321   if (prev == NULL)
1322     GNUNET_CONTAINER_DLL_insert (cp->pth_head,
1323                                  cp->pth_tail,
1324                                  pth);
1325   else
1326     GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1327                                        cp->pth_tail,
1328                                        prev,
1329                                        pth);
1330   if (GNUNET_YES == is_query)
1331     cp->ppd.pending_queries++;
1332   else if (GNUNET_NO == is_query)
1333     cp->ppd.pending_replies++;
1334   pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
1335                                                     &peer_transmit_timeout,
1336                                                     pth);
1337   schedule_transmission (pth);
1338   return pth;
1339 }
1340
1341
1342 /**
1343  * Cancel an earlier request for transmission.
1344  *
1345  * @param pth request to cancel
1346  */
1347 void
1348 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1349 {
1350   struct GSF_ConnectedPeer *cp;
1351
1352   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1353     {
1354       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1355       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1356     }
1357   if (NULL != pth->cth)
1358     {
1359       GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
1360       pth->cth = NULL;
1361     }
1362   cp = pth->cp;
1363   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1364                                cp->pth_tail,
1365                                pth);
1366   if (GNUNET_YES == pth->is_query)
1367     GNUNET_assert (0 < cp->ppd.pending_queries--);    
1368   else if (GNUNET_NO == pth->is_query)
1369     GNUNET_assert (0 < cp->ppd.pending_replies--);
1370   GNUNET_assert (0 == pth->cth_in_progress);
1371   GNUNET_free (pth);
1372 }
1373
1374
1375 /**
1376  * Report on receiving a reply; update the performance record of the given peer.
1377  *
1378  * @param cp responding peer (will be updated)
1379  * @param request_time time at which the original query was transmitted
1380  * @param request_priority priority of the original request
1381  */
1382 void
1383 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1384                               struct GNUNET_TIME_Absolute request_time,
1385                               uint32_t request_priority)
1386 {
1387   struct GNUNET_TIME_Relative delay;
1388
1389   delay = GNUNET_TIME_absolute_get_duration (request_time);  
1390   cp->ppd.avg_reply_delay.rel_value = (cp->ppd.avg_reply_delay.rel_value * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
1391   cp->ppd.avg_priority = (cp->ppd.avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
1392 }
1393
1394
1395 /**
1396  * Report on receiving a reply in response to an initiating client.
1397  * Remember that this peer is good for this client.
1398  *
1399  * @param cp responding peer (will be updated)
1400  * @param initiator_client local client on responsible for query
1401  */
1402 void
1403 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1404                                    struct GSF_LocalClient *initiator_client)
1405 {
1406   cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1407 }
1408
1409
1410 /**
1411  * Report on receiving a reply in response to an initiating peer.
1412  * Remember that this peer is good for this initiating peer.
1413  *
1414  * @param cp responding peer (will be updated)
1415  * @param initiator_peer other peer responsible for query
1416  */
1417 void
1418 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1419                                  const struct GSF_ConnectedPeer *initiator_peer)
1420 {
1421   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
1422   cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->ppd.pid;
1423   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1424 }
1425
1426
1427 /**
1428  * Method called whenever a given peer has a status change.
1429  *
1430  * @param cls closure
1431  * @param peer peer identity this notification is about
1432  * @param bandwidth_in available amount of inbound bandwidth
1433  * @param bandwidth_out available amount of outbound bandwidth
1434  * @param timeout absolute time when this peer will time out
1435  *        unless we see some further activity from it
1436  * @param atsi status information
1437  */
1438 void
1439 GSF_peer_status_handler_ (void *cls,
1440                           const struct GNUNET_PeerIdentity *peer,
1441                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1442                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1443                           struct GNUNET_TIME_Absolute timeout,
1444                           const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1445 {
1446   struct GSF_ConnectedPeer *cp;
1447
1448   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1449                                           &peer->hashPubKey);
1450   GNUNET_assert (NULL != cp);
1451   update_atsi (cp, atsi);
1452 }
1453
1454
1455 /**
1456  * Cancel all requests associated with the peer.
1457  *
1458  * @param cls unused
1459  * @param query hash code of the request
1460  * @param value the 'struct GSF_PendingRequest'
1461  * @return GNUNET_YES (continue to iterate)
1462  */
1463 static int
1464 cancel_pending_request (void *cls,
1465                         const GNUNET_HashCode *query,
1466                         void *value)
1467 {
1468   struct PeerRequest *peerreq = value;
1469   struct GSF_PendingRequest *pr = peerreq->pr;
1470
1471   GSF_pending_request_cancel_ (pr);
1472   if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
1473     {
1474       GNUNET_SCHEDULER_cancel (peerreq->kill_task);
1475       peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
1476     }
1477   GNUNET_free (peerreq);
1478   return GNUNET_OK;
1479 }
1480
1481
1482 /**
1483  * A peer disconnected from us.  Tear down the connected peer
1484  * record.
1485  *
1486  * @param cls unused
1487  * @param peer identity of peer that connected
1488  */
1489 void
1490 GSF_peer_disconnect_handler_ (void *cls,
1491                               const struct GNUNET_PeerIdentity *peer)
1492 {
1493   struct GSF_ConnectedPeer *cp;
1494   struct GSF_PeerTransmitHandle *pth;
1495
1496   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1497                                           &peer->hashPubKey);
1498   if (NULL == cp)
1499     return; /* must have been disconnect from core with
1500                'peer' == my_id, ignore */
1501   GNUNET_CONTAINER_multihashmap_remove (cp_map,
1502                                         &peer->hashPubKey,
1503                                         cp);
1504   if (NULL != cp->migration_pth)
1505     {
1506       GSF_peer_transmit_cancel_ (cp->migration_pth);
1507       cp->migration_pth = NULL;
1508     }
1509   if (NULL != cp->irc)
1510     {
1511       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1512       cp->irc = NULL;
1513     }
1514   if (GNUNET_SCHEDULER_NO_TASK != cp->irc_delay_task)
1515     {
1516       GNUNET_SCHEDULER_cancel (cp->irc_delay_task);
1517       cp->irc_delay_task = GNUNET_SCHEDULER_NO_TASK;
1518     }
1519   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1520                                          &cancel_pending_request,
1521                                          cp);
1522   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1523   cp->request_map = NULL;
1524   GSF_plan_notify_peer_disconnect_ (cp);
1525   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1526   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1527   while (NULL != (pth = cp->pth_head))
1528     {
1529       if (NULL != pth->cth)
1530         {
1531           GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
1532           pth->cth = NULL;
1533         }
1534       if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1535         {
1536           GNUNET_SCHEDULER_cancel (pth->timeout_task);
1537           pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1538         }
1539       GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1540                                    cp->pth_tail,
1541                                    pth);
1542       GNUNET_assert (0 == pth->cth_in_progress);
1543       GNUNET_free (pth);
1544     }
1545   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1546   if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1547     {
1548       GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1549       cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
1550     }
1551   GNUNET_free (cp);
1552 }
1553
1554
1555 /**
1556  * Closure for 'call_iterator'.
1557  */
1558 struct IterationContext
1559 {
1560   /**
1561    * Function to call on each entry.
1562    */
1563   GSF_ConnectedPeerIterator it;
1564
1565   /**
1566    * Closure for 'it'.
1567    */
1568   void *it_cls;
1569 };
1570
1571
1572 /**
1573  * Function that calls the callback for each peer.
1574  *
1575  * @param cls the 'struct IterationContext*'
1576  * @param key identity of the peer
1577  * @param value the 'struct GSF_ConnectedPeer*'
1578  * @return GNUNET_YES to continue iteration
1579  */
1580 static int
1581 call_iterator (void *cls,
1582                const GNUNET_HashCode *key,
1583                void *value)
1584 {
1585   struct IterationContext *ic = cls;
1586   struct GSF_ConnectedPeer *cp = value;
1587   
1588   ic->it (ic->it_cls,
1589           (const struct GNUNET_PeerIdentity*) key,
1590           cp,
1591           &cp->ppd);
1592   return GNUNET_YES;
1593 }
1594
1595
1596 /**
1597  * Iterate over all connected peers.
1598  *
1599  * @param it function to call for each peer
1600  * @param it_cls closure for it
1601  */
1602 void
1603 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1604                               void *it_cls)
1605 {
1606   struct IterationContext ic;
1607
1608   ic.it = it;
1609   ic.it_cls = it_cls;
1610   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1611                                          &call_iterator,
1612                                          &ic);
1613 }
1614
1615
1616 /**
1617  * Obtain the identity of a connected peer.
1618  *
1619  * @param cp peer to reserve bandwidth from
1620  * @param id identity to set (written to)
1621  */
1622 void
1623 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1624                                   struct GNUNET_PeerIdentity *id)
1625 {
1626   GNUNET_PEER_resolve (cp->ppd.pid,
1627                        id);
1628 }
1629
1630
1631 /**
1632  * Assemble a migration stop message for transmission.
1633  *
1634  * @param cls the 'struct GSF_ConnectedPeer' to use
1635  * @param size number of bytes we're allowed to write to buf
1636  * @param buf where to copy the message
1637  * @return number of bytes copied to buf
1638  */
1639 static size_t
1640 create_migration_stop_message (void *cls,
1641                                size_t size,
1642                                void *buf)
1643 {
1644   struct GSF_ConnectedPeer *cp = cls;
1645   struct MigrationStopMessage msm;
1646
1647   cp->migration_pth = NULL;
1648   if (NULL == buf)
1649     return 0;
1650   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1651   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1652   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1653   msm.reserved = htonl (0);
1654   msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block));
1655   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1656   return sizeof (struct MigrationStopMessage);
1657 }
1658
1659
1660 /**
1661  * Ask a peer to stop migrating data to us until the given point
1662  * in time.
1663  * 
1664  * @param cp peer to ask
1665  * @param block_time until when to block
1666  */
1667 void
1668 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1669                            struct GNUNET_TIME_Relative block_time)
1670 {
1671   if (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block).rel_value > block_time.rel_value)
1672     {
1673 #if DEBUG_FS && 0
1674       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1675           "Migration already blocked for another %llu ms\n",
1676                   (unsigned long long) GNUNET_TIME_absolute_get_remaining (cp->last_migration_block).rel_value);
1677 #endif
1678       return; /* already blocked */
1679     }
1680 #if DEBUG_FS && 0
1681   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1682               "Asking to stop migration for %llu ms\n",
1683               (unsigned long long) block_time.rel_value);
1684 #endif
1685   cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
1686   if (cp->migration_pth != NULL)
1687     GSF_peer_transmit_cancel_ (cp->migration_pth);
1688   cp->migration_pth 
1689     = GSF_peer_transmit_ (cp,
1690                           GNUNET_SYSERR,
1691                           UINT32_MAX,
1692                           GNUNET_TIME_UNIT_FOREVER_REL,
1693                           sizeof (struct MigrationStopMessage),
1694                           &create_migration_stop_message,
1695                           cp);
1696 }
1697
1698
1699 /**
1700  * Write host-trust information to a file - flush the buffer entry!
1701  *
1702  * @param cls closure, not used
1703  * @param key host identity
1704  * @param value the 'struct GSF_ConnectedPeer' to flush
1705  * @return GNUNET_OK to continue iteration
1706  */
1707 static int
1708 flush_trust (void *cls,
1709              const GNUNET_HashCode *key,
1710              void *value)
1711 {
1712   struct GSF_ConnectedPeer *cp = value;
1713   char *fn;
1714   uint32_t trust;
1715   struct GNUNET_PeerIdentity pid;
1716
1717   if (cp->ppd.trust == cp->disk_trust)
1718     return GNUNET_OK;                     /* unchanged */
1719   GNUNET_PEER_resolve (cp->ppd.pid,
1720                        &pid);
1721   fn = get_trust_filename (&pid);
1722   if (cp->ppd.trust == 0)
1723     {
1724       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1725         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1726                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1727     }
1728   else
1729     {
1730       trust = htonl (cp->ppd.trust);
1731       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1732                                                     sizeof(uint32_t),
1733                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1734                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1735         cp->disk_trust = cp->ppd.trust;
1736     }
1737   GNUNET_free (fn);
1738   return GNUNET_OK;
1739 }
1740
1741
1742 /**
1743  * Notify core about a preference we have for the given peer
1744  * (to allocate more resources towards it).  The change will
1745  * be communicated the next time we reserve bandwidth with
1746  * core (not instantly).
1747  *
1748  * @param cp peer to reserve bandwidth from
1749  * @param pref preference change
1750  */
1751 void
1752 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1753                                        uint64_t pref)
1754 {
1755   cp->inc_preference += pref;
1756 }
1757
1758
1759 /**
1760  * Call this method periodically to flush trust information to disk.
1761  *
1762  * @param cls closure, not used
1763  * @param tc task context, not used
1764  */
1765 static void
1766 cron_flush_trust (void *cls,
1767                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1768 {
1769
1770   if (NULL == cp_map)
1771     return;
1772   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1773                                          &flush_trust,
1774                                          NULL);
1775   if (NULL == tc)
1776     return;
1777   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1778     return;
1779   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, 
1780                                 &cron_flush_trust, 
1781                                 NULL);
1782 }
1783
1784
1785 /**
1786  * Initialize peer management subsystem.
1787  */
1788 void
1789 GSF_connected_peer_init_ ()
1790 {
1791   cp_map = GNUNET_CONTAINER_multihashmap_create (128);
1792   GNUNET_assert (GNUNET_OK ==
1793                  GNUNET_CONFIGURATION_get_value_filename (GSF_cfg,
1794                                                           "fs",
1795                                                           "TRUST",
1796                                                           &trustDirectory));
1797   GNUNET_DISK_directory_create (trustDirectory);
1798   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1799                                       &cron_flush_trust, NULL);
1800 }
1801
1802
1803 /**
1804  * Iterator to free peer entries.
1805  *
1806  * @param cls closure, unused
1807  * @param key current key code
1808  * @param value value in the hash map (peer entry)
1809  * @return GNUNET_YES (we should continue to iterate)
1810  */
1811 static int 
1812 clean_peer (void *cls,
1813             const GNUNET_HashCode * key,
1814             void *value)
1815 {
1816   GSF_peer_disconnect_handler_ (NULL, 
1817                                 (const struct GNUNET_PeerIdentity*) key);
1818   return GNUNET_YES;
1819 }
1820
1821
1822 /**
1823  * Shutdown peer management subsystem.
1824  */
1825 void
1826 GSF_connected_peer_done_ ()
1827 {
1828   cron_flush_trust (NULL, NULL);
1829   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1830                                          &clean_peer,
1831                                          NULL);
1832   GNUNET_CONTAINER_multihashmap_destroy (cp_map);
1833   cp_map = NULL;
1834   GNUNET_free (trustDirectory);
1835   trustDirectory = NULL;
1836 }
1837
1838
1839 /**
1840  * Iterator to remove references to LC entry.
1841  *
1842  * @param cls the 'struct GSF_LocalClient*' to look for
1843  * @param key current key code
1844  * @param value value in the hash map (peer entry)
1845  * @return GNUNET_YES (we should continue to iterate)
1846  */
1847 static int 
1848 clean_local_client (void *cls,
1849                     const GNUNET_HashCode * key,
1850                     void *value)
1851 {
1852   const struct GSF_LocalClient *lc = cls;
1853   struct GSF_ConnectedPeer *cp = value;
1854   unsigned int i;
1855
1856   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1857     if (cp->ppd.last_client_replies[i] == lc)
1858       cp->ppd.last_client_replies[i] = NULL;
1859   return GNUNET_YES;
1860 }
1861
1862
1863 /**
1864  * Notification that a local client disconnected.  Clean up all of our
1865  * references to the given handle.
1866  *
1867  * @param lc handle to the local client (henceforth invalid)
1868  */
1869 void
1870 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1871 {
1872   if (NULL == cp_map)
1873     return; /* already cleaned up */
1874   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1875                                          &clean_local_client,
1876                                          (void*) lc);
1877 }
1878
1879
1880 /* end of gnunet-service-fs_cp.c */