e021606a9060aab512a2ee0333a908248d09e650
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet_ats_service.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
31 #include "gnunet-service-fs_pe.h"
32 #include "gnunet-service-fs_pr.h"
33 #include "gnunet-service-fs_push.h"
34
35
36 /**
37  * Ratio for moving average delay calculation.  The previous
38  * average goes in with a factor of (n-1) into the calculation.
39  * Must be > 0.
40  */
41 #define RUNAVG_DELAY_N 16
42
43 /**
44  * How often do we flush trust values to disk?
45  */
46 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
47
48 /**
49  * After how long do we discard a reply?
50  */
51 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
52
53
54 /**
55  * Handle to cancel a transmission request.
56  */
57 struct GSF_PeerTransmitHandle
58 {
59
60   /**
61    * Kept in a doubly-linked list.
62    */
63   struct GSF_PeerTransmitHandle *next;
64
65   /**
66    * Kept in a doubly-linked list.
67    */
68   struct GSF_PeerTransmitHandle *prev;
69
70   /**
71    * Time when this transmission request was issued.
72    */
73   struct GNUNET_TIME_Absolute transmission_request_start_time;
74
75   /**
76    * Timeout for this request.
77    */
78   struct GNUNET_TIME_Absolute timeout;
79
80   /**
81    * Task called on timeout, or 0 for none.
82    */
83   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
84
85   /**
86    * Function to call to get the actual message.
87    */
88   GSF_GetMessageCallback gmc;
89
90   /**
91    * Peer this request targets.
92    */
93   struct GSF_ConnectedPeer *cp;
94
95   /**
96    * Closure for 'gmc'.
97    */
98   void *gmc_cls;
99
100   /**
101    * Size of the message to be transmitted.
102    */
103   size_t size;
104
105   /**
106    * GNUNET_YES if this is a query, GNUNET_NO for content.
107    */
108   int is_query;
109
110   /**
111    * Did we get a reservation already?
112    */
113   int was_reserved;
114
115   /**
116    * Priority of this request.
117    */
118   uint32_t priority;
119
120 };
121
122
123 /**
124  * Handle for an entry in our delay list.
125  */
126 struct GSF_DelayedHandle
127 {
128
129   /**
130    * Kept in a doubly-linked list.
131    */
132   struct GSF_DelayedHandle *next;
133
134   /**
135    * Kept in a doubly-linked list.
136    */
137   struct GSF_DelayedHandle *prev;
138
139   /**
140    * Peer this transmission belongs to.
141    */
142   struct GSF_ConnectedPeer *cp;
143
144   /**
145    * The PUT that was delayed.
146    */
147   struct PutMessage *pm;
148
149   /**
150    * Task for the delay.
151    */
152   GNUNET_SCHEDULER_TaskIdentifier delay_task;
153
154   /**
155    * Size of the message.
156    */
157   size_t msize;
158
159 };
160
161
162 /**
163  * Information per peer and request.
164  */
165 struct PeerRequest
166 {
167
168   /**
169    * Handle to generic request.
170    */
171   struct GSF_PendingRequest *pr;
172
173   /**
174    * Handle to specific peer.
175    */
176   struct GSF_ConnectedPeer *cp;
177
178   /**
179    * Task for asynchronous stopping of this request.
180    */
181   GNUNET_SCHEDULER_TaskIdentifier kill_task;
182
183 };
184
185
186 /**
187  * A connected peer.
188  */
189 struct GSF_ConnectedPeer
190 {
191
192   /**
193    * Performance data for this peer.
194    */
195   struct GSF_PeerPerformanceData ppd;
196
197   /**
198    * Time until when we blocked this peer from migrating
199    * data to us.
200    */
201   struct GNUNET_TIME_Absolute last_migration_block;
202
203   /**
204    * Task scheduled to revive migration to this peer.
205    */
206   GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
207
208   /**
209    * Messages (replies, queries, content migration) we would like to
210    * send to this peer in the near future.  Sorted by priority, head.
211    */
212   struct GSF_PeerTransmitHandle *pth_head;
213
214   /**
215    * Messages (replies, queries, content migration) we would like to
216    * send to this peer in the near future.  Sorted by priority, tail.
217    */
218   struct GSF_PeerTransmitHandle *pth_tail;
219
220   /**
221    * Messages (replies, queries, content migration) we would like to
222    * send to this peer in the near future.  Sorted by priority, head.
223    */
224   struct GSF_DelayedHandle *delayed_head;
225
226   /**
227    * Messages (replies, queries, content migration) we would like to
228    * send to this peer in the near future.  Sorted by priority, tail.
229    */
230   struct GSF_DelayedHandle *delayed_tail;
231
232   /**
233    * Migration stop message in our queue, or NULL if we have none pending.
234    */
235   struct GSF_PeerTransmitHandle *migration_pth;
236
237   /**
238    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
239    */
240   struct GNUNET_ATS_ReservationContext *rc;
241
242   /**
243    * Task scheduled if we need to retry bandwidth reservation later.
244    */
245   GNUNET_SCHEDULER_TaskIdentifier rc_delay_task;
246
247   /**
248    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
249    */
250   struct GNUNET_CONTAINER_MultiHashMap *request_map;
251
252   /**
253    * Handle for an active request for transmission to this
254    * peer, or NULL (if core queue was full).
255    */
256   struct GNUNET_CORE_TransmitHandle *cth;
257
258   /**
259    * Increase in traffic preference still to be submitted
260    * to the core service for this peer.
261    */
262   uint64_t inc_preference;
263
264   /**
265    * Set to 1 if we're currently in the process of calling
266    * 'GNUNET_CORE_notify_transmit_ready' (so while cth is
267    * NULL, we should not call notify_transmit_ready for this
268    * handle right now).
269    */
270   unsigned int cth_in_progress;
271
272   /**
273    * Trust rating for this peer on disk.
274    */
275   uint32_t disk_trust;
276
277   /**
278    * Which offset in "last_p2p_replies" will be updated next?
279    * (we go round-robin).
280    */
281   unsigned int last_p2p_replies_woff;
282
283   /**
284    * Which offset in "last_client_replies" will be updated next?
285    * (we go round-robin).
286    */
287   unsigned int last_client_replies_woff;
288
289   /**
290    * Current offset into 'last_request_times' ring buffer.
291    */
292   unsigned int last_request_times_off;
293
294   /**
295    * GNUNET_YES if we did successfully reserve 32k bandwidth,
296    * GNUNET_NO if not.
297    */
298   int did_reserve;
299
300 };
301
302
303 /**
304  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
305  */
306 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
307
308 /**
309  * Where do we store trust information?
310  */
311 static char *trustDirectory;
312
313 /**
314  * Handle to ATS service.
315  */
316 static struct GNUNET_ATS_PerformanceHandle *ats;
317
318 /**
319  * Get the filename under which we would store the GNUNET_HELLO_Message
320  * for the given host and protocol.
321  * @return filename of the form DIRECTORY/HOSTID
322  */
323 static char *
324 get_trust_filename (const struct GNUNET_PeerIdentity *id)
325 {
326   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
327   char *fn;
328
329   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
330   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
331   return fn;
332 }
333
334
335 /**
336  * Find latency information in 'atsi'.
337  *
338  * @param atsi performance data
339  * @param atsi_count number of records in 'atsi'
340  * @return connection latency
341  */
342 static struct GNUNET_TIME_Relative
343 get_latency (const struct GNUNET_ATS_Information *atsi, unsigned int atsi_count)
344 {
345   unsigned int i;
346
347   for (i = 0; i < atsi_count; i++)
348     if (ntohl (atsi->type) == GNUNET_ATS_QUALITY_NET_DELAY)
349       return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
350                                             ntohl (atsi->value));
351   return GNUNET_TIME_UNIT_SECONDS;
352 }
353
354
355 /**
356  * Update the performance information kept for the given peer.
357  *
358  * @param cp peer record to update
359  * @param atsi transport performance data
360  * @param atsi_count number of records in 'atsi'
361  */
362 static void
363 update_atsi (struct GSF_ConnectedPeer *cp,
364              const struct GNUNET_ATS_Information *atsi, unsigned int atsi_count)
365 {
366   struct GNUNET_TIME_Relative latency;
367
368   latency = get_latency (atsi, atsi_count);
369   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay, latency);
370   /* LATER: merge atsi into cp's performance data (if we ever care...) */
371 }
372
373
374 /**
375  * Return the performance data record for the given peer
376  *
377  * @param cp peer to query
378  * @return performance data record for the peer
379  */
380 struct GSF_PeerPerformanceData *
381 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
382 {
383   return &cp->ppd;
384 }
385
386
387 /**
388  * Core is ready to transmit to a peer, get the message.
389  *
390  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
391  * @param size number of bytes core is willing to take
392  * @param buf where to copy the message
393  * @return number of bytes copied to buf
394  */
395 static size_t
396 peer_transmit_ready_cb (void *cls, size_t size, void *buf);
397
398
399 /**
400  * Function called by core upon success or failure of our bandwidth reservation request.
401  *
402  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
403  * @param peer identifies the peer
404  * @param amount set to the amount that was actually reserved or unreserved;
405  *               either the full requested amount or zero (no partial reservations)
406  * @param res_delay if the reservation could not be satisfied (amount was 0), how
407  *        long should the client wait until re-trying?
408  */
409 static void
410 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
411                       int32_t amount, struct GNUNET_TIME_Relative res_delay);
412
413
414 /**
415  * If ready (bandwidth reserved), try to schedule transmission via
416  * core for the given handle.
417  *
418  * @param pth transmission handle to schedule
419  */
420 static void
421 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
422 {
423   struct GSF_ConnectedPeer *cp;
424   struct GNUNET_PeerIdentity target;
425
426   cp = pth->cp;
427   if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
428     return;                     /* already done */
429   GNUNET_assert (0 != cp->ppd.pid);
430   GNUNET_PEER_resolve (cp->ppd.pid, &target);
431
432   if (0 != cp->inc_preference)
433   {
434     GNUNET_ATS_change_preference (ats, &target, GNUNET_ATS_PREFERENCE_BANDWIDTH,
435                                   (double) cp->inc_preference,
436                                   GNUNET_ATS_PREFERENCE_END);
437     cp->inc_preference = 0;
438   }
439
440   if ((GNUNET_YES == pth->is_query) && (GNUNET_YES != pth->was_reserved))
441   {
442     /* query, need reservation */
443     if (GNUNET_YES != cp->did_reserve)
444       return;                   /* not ready */
445     cp->did_reserve = GNUNET_NO;
446     /* reservation already done! */
447     pth->was_reserved = GNUNET_YES;
448     cp->rc =
449         GNUNET_ATS_reserve_bandwidth (ats, &target, DBLOCK_SIZE,
450                                       &ats_reserve_callback, cp);
451     return;
452   }
453   GNUNET_assert (cp->cth == NULL);
454   cp->cth_in_progress++;
455   cp->cth =
456     GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, pth->priority,
457                                        GNUNET_TIME_absolute_get_remaining
458                                        (pth->timeout), &target, pth->size,
459                                        &peer_transmit_ready_cb, cp);
460   GNUNET_assert (NULL != cp->cth);
461   GNUNET_assert (0 < cp->cth_in_progress--);
462 }
463
464
465 /**
466  * Core is ready to transmit to a peer, get the message.
467  *
468  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
469  * @param size number of bytes core is willing to take
470  * @param buf where to copy the message
471  * @return number of bytes copied to buf
472  */
473 static size_t
474 peer_transmit_ready_cb (void *cls, size_t size, void *buf)
475 {
476   struct GSF_ConnectedPeer *cp = cls;
477   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
478   struct GSF_PeerTransmitHandle *pos;
479   size_t ret;
480
481   cp->cth = NULL;
482   if (NULL == pth)
483     return 0;
484   if (pth->size > size)
485   {
486     schedule_transmission (pth);
487     return 0;
488   }
489   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
490   {
491     GNUNET_SCHEDULER_cancel (pth->timeout_task);
492     pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
493   }
494   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
495   if (GNUNET_YES == pth->is_query)
496   {
497     cp->ppd.last_request_times[(cp->last_request_times_off++) %
498                                MAX_QUEUE_PER_PEER] =
499         GNUNET_TIME_absolute_get ();
500     GNUNET_assert (0 < cp->ppd.pending_queries--);
501   }
502   else if (GNUNET_NO == pth->is_query)
503   {
504     GNUNET_assert (0 < cp->ppd.pending_replies--);
505   }
506   GNUNET_LOAD_update (cp->ppd.transmission_delay,
507                       GNUNET_TIME_absolute_get_duration
508                       (pth->transmission_request_start_time).rel_value);
509   ret = pth->gmc (pth->gmc_cls, size, buf);
510   if (NULL != (pos = cp->pth_head))
511   {
512     GNUNET_assert (pos != pth);
513     schedule_transmission (pos);
514   }
515   GNUNET_free (pth);
516   return ret;
517 }
518
519
520 /**
521  * (re)try to reserve bandwidth from the given peer.
522  *
523  * @param cls the 'struct GSF_ConnectedPeer' to reserve from
524  * @param tc scheduler context
525  */
526 static void
527 retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
528 {
529   struct GSF_ConnectedPeer *cp = cls;
530   struct GNUNET_PeerIdentity target;
531
532   GNUNET_PEER_resolve (cp->ppd.pid, &target);
533   cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
534   cp->rc =
535       GNUNET_ATS_reserve_bandwidth (ats, &target, DBLOCK_SIZE,
536                                     &ats_reserve_callback, cp);
537 }
538
539
540 /**
541  * Function called by core upon success or failure of our bandwidth reservation request.
542  *
543  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
544  * @param peer identifies the peer
545  * @param amount set to the amount that was actually reserved or unreserved;
546  *               either the full requested amount or zero (no partial reservations)
547  * @param res_delay if the reservation could not be satisfied (amount was 0), how
548  *        long should the client wait until re-trying?
549  */
550 static void
551 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
552                       int32_t amount, struct GNUNET_TIME_Relative res_delay)
553 {
554   struct GSF_ConnectedPeer *cp = cls;
555   struct GSF_PeerTransmitHandle *pth;
556
557   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558               "Reserved %d bytes / need to wait %llu ms for reservation\n",
559               (int) amount, (unsigned long long) res_delay.rel_value);
560   cp->rc = NULL;
561   if (0 == amount)
562   {
563     cp->rc_delay_task =
564         GNUNET_SCHEDULER_add_delayed (res_delay, &retry_reservation, cp);
565     return;
566   }
567   cp->did_reserve = GNUNET_YES;
568   pth = cp->pth_head;
569   if ((NULL != pth) && (NULL == cp->cth) && (0 == cp->cth_in_progress))
570   {
571     /* reservation success, try transmission now! */
572     cp->cth_in_progress++;
573     cp->cth =
574         GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, pth->priority,
575                                            GNUNET_TIME_absolute_get_remaining
576                                            (pth->timeout), peer, pth->size,
577                                            &peer_transmit_ready_cb, cp);
578     GNUNET_assert (NULL != cp->cth);
579     GNUNET_assert (0 < cp->cth_in_progress--);
580   }
581 }
582
583
584 /**
585  * A peer connected to us.  Setup the connected peer
586  * records.
587  *
588  * @param peer identity of peer that connected
589  * @param atsi performance data for the connection
590  * @param atsi_count number of records in 'atsi'
591  * @return handle to connected peer entry
592  */
593 struct GSF_ConnectedPeer *
594 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
595                            const struct GNUNET_ATS_Information *atsi,
596                            unsigned int atsi_count)
597 {
598   struct GSF_ConnectedPeer *cp;
599   char *fn;
600   uint32_t trust;
601
602   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to peer %s\n",
603               GNUNET_i2s (peer));
604   cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
605   cp->ppd.pid = GNUNET_PEER_intern (peer);
606   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
607   cp->rc =
608       GNUNET_ATS_reserve_bandwidth (ats, peer, DBLOCK_SIZE,
609                                     &ats_reserve_callback, cp);
610   fn = get_trust_filename (peer);
611   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
612       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
613     cp->disk_trust = cp->ppd.trust = ntohl (trust);
614   GNUNET_free (fn);
615   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
616   GNUNET_break (GNUNET_OK ==
617                 GNUNET_CONTAINER_multihashmap_put (cp_map, &peer->hashPubKey,
618                                                    cp,
619                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
620   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
621                          GNUNET_CONTAINER_multihashmap_size (cp_map),
622                          GNUNET_NO);
623   update_atsi (cp, atsi, atsi_count);
624   GSF_push_start_ (cp);
625   return cp;
626 }
627
628
629 /**
630  * It may be time to re-start migrating content to this
631  * peer.  Check, and if so, restart migration.
632  *
633  * @param cls the 'struct GSF_ConnectedPeer'
634  * @param tc scheduler context
635  */
636 static void
637 revive_migration (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
638 {
639   struct GSF_ConnectedPeer *cp = cls;
640   struct GNUNET_TIME_Relative bt;
641
642   cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
643   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
644   if (0 != bt.rel_value)
645   {
646     /* still time left... */
647     cp->mig_revive_task =
648         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
649     return;
650   }
651   GSF_push_start_ (cp);
652 }
653
654
655 /**
656  * Get a handle for a connected peer.
657  *
658  * @param peer peer's identity
659  * @return NULL if the peer is not currently connected
660  */
661 struct GSF_ConnectedPeer *
662 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
663 {
664   if (NULL == cp_map)
665     return NULL;
666   return GNUNET_CONTAINER_multihashmap_get (cp_map, &peer->hashPubKey);
667 }
668
669
670 /**
671  * Handle P2P "MIGRATION_STOP" message.
672  *
673  * @param cls closure, always NULL
674  * @param other the other peer involved (sender or receiver, NULL
675  *        for loopback messages where we are both sender and receiver)
676  * @param message the actual message
677  * @param atsi performance information
678  * @param atsi_count number of records in 'atsi'
679  * @return GNUNET_OK to keep the connection open,
680  *         GNUNET_SYSERR to close it (signal serious error)
681  */
682 int
683 GSF_handle_p2p_migration_stop_ (void *cls,
684                                 const struct GNUNET_PeerIdentity *other,
685                                 const struct GNUNET_MessageHeader *message,
686                                 const struct GNUNET_ATS_Information *atsi,
687                                 unsigned int atsi_count)
688 {
689   struct GSF_ConnectedPeer *cp;
690   const struct MigrationStopMessage *msm;
691   struct GNUNET_TIME_Relative bt;
692
693   msm = (const struct MigrationStopMessage *) message;
694   cp = GSF_peer_get_ (other);
695   if (cp == NULL)
696   {
697     GNUNET_break (0);
698     return GNUNET_OK;
699   }
700   GNUNET_STATISTICS_update (GSF_stats,
701                             gettext_noop ("# migration stop messages received"),
702                             1, GNUNET_NO);
703   bt = GNUNET_TIME_relative_ntoh (msm->duration);
704   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
705               _("Migration of content to peer `%s' blocked for %llu ms\n"),
706               GNUNET_i2s (other), (unsigned long long) bt.rel_value);
707   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
708   if (cp->mig_revive_task == GNUNET_SCHEDULER_NO_TASK)
709   {
710     GSF_push_stop_ (cp);
711     cp->mig_revive_task =
712         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
713   }
714   update_atsi (cp, atsi, atsi_count);
715   return GNUNET_OK;
716 }
717
718
719 /**
720  * Copy reply and free put message.
721  *
722  * @param cls the 'struct PutMessage'
723  * @param buf_size number of bytes available in buf
724  * @param buf where to copy the message, NULL on error (peer disconnect)
725  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
726  */
727 static size_t
728 copy_reply (void *cls, size_t buf_size, void *buf)
729 {
730   struct PutMessage *pm = cls;
731   size_t size;
732
733   if (buf != NULL)
734   {
735     GNUNET_assert (buf_size >= ntohs (pm->header.size));
736     size = ntohs (pm->header.size);
737     memcpy (buf, pm, size);
738     GNUNET_STATISTICS_update (GSF_stats,
739                               gettext_noop
740                               ("# replies transmitted to other peers"), 1,
741                               GNUNET_NO);
742   }
743   else
744   {
745     size = 0;
746     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies dropped"), 1,
747                               GNUNET_NO);
748   }
749   GNUNET_free (pm);
750   return size;
751 }
752
753
754 /**
755  * Free resources associated with the given peer request.
756  *
757  * @param peerreq request to free
758  * @param query associated key for the request
759  */
760 static void
761 free_pending_request (struct PeerRequest *peerreq,
762                       const struct GNUNET_HashCode *query)
763 {
764   struct GSF_ConnectedPeer *cp = peerreq->cp;
765
766   if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
767   {
768     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
769     peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
770   }
771   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
772                             -1, GNUNET_NO);
773   GNUNET_break (GNUNET_YES ==
774                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
775                                                       query, peerreq));
776   GNUNET_free (peerreq);
777 }
778
779
780 /**
781  * Cancel all requests associated with the peer.
782  *
783  * @param cls unused
784  * @param query hash code of the request
785  * @param value the 'struct GSF_PendingRequest'
786  * @return GNUNET_YES (continue to iterate)
787  */
788 static int
789 cancel_pending_request (void *cls, const struct GNUNET_HashCode * query, void *value)
790 {
791   struct PeerRequest *peerreq = value;
792   struct GSF_PendingRequest *pr = peerreq->pr;
793   struct GSF_PendingRequestData *prd;
794
795   prd = GSF_pending_request_get_data_ (pr);
796   GSF_pending_request_cancel_ (pr, GNUNET_NO);
797   free_pending_request (peerreq, &prd->query);
798   return GNUNET_OK;
799 }
800
801
802 /**
803  * Free the given request.
804  *
805  * @param cls the request to free
806  * @param tc task context
807  */
808 static void
809 peer_request_destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
810 {
811   struct PeerRequest *peerreq = cls;
812   struct GSF_PendingRequest *pr = peerreq->pr;
813   struct GSF_PendingRequestData *prd;
814
815   peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
816   prd = GSF_pending_request_get_data_ (pr);
817   cancel_pending_request (NULL, &prd->query, peerreq);
818 }
819
820
821 /**
822  * The artificial delay is over, transmit the message now.
823  *
824  * @param cls the 'struct GSF_DelayedHandle' with the message
825  * @param tc scheduler context
826  */
827 static void
828 transmit_delayed_now (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
829 {
830   struct GSF_DelayedHandle *dh = cls;
831   struct GSF_ConnectedPeer *cp = dh->cp;
832
833   GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
834   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
835   {
836     GNUNET_free (dh->pm);
837     GNUNET_free (dh);
838     return;
839   }
840   (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
841                              dh->msize, &copy_reply, dh->pm);
842   GNUNET_free (dh);
843 }
844
845
846 /**
847  * Get the randomized delay a response should be subjected to.
848  *
849  * @return desired delay
850  */
851 static struct GNUNET_TIME_Relative
852 get_randomized_delay ()
853 {
854   struct GNUNET_TIME_Relative ret;
855
856   ret =
857       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
858                                      GNUNET_CRYPTO_random_u32
859                                      (GNUNET_CRYPTO_QUALITY_WEAK,
860                                       2 * GSF_avg_latency.rel_value + 1));
861   GNUNET_STATISTICS_update (GSF_stats,
862                             gettext_noop
863                             ("# artificial delays introduced (ms)"),
864                             ret.rel_value, GNUNET_NO);
865
866   return ret;
867 }
868
869
870 /**
871  * Handle a reply to a pending request.  Also called if a request
872  * expires (then with data == NULL).  The handler may be called
873  * many times (depending on the request type), but will not be
874  * called during or after a call to GSF_pending_request_cancel
875  * and will also not be called anymore after a call signalling
876  * expiration.
877  *
878  * @param cls 'struct PeerRequest' this is an answer for
879  * @param eval evaluation of the result
880  * @param pr handle to the original pending request
881  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
882  * @param expiration when does 'data' expire?
883  * @param last_transmission when did we last transmit a request for this block
884  * @param type type of the block
885  * @param data response data, NULL on request expiration
886  * @param data_len number of bytes in data
887  */
888 static void
889 handle_p2p_reply (void *cls, enum GNUNET_BLOCK_EvaluationResult eval,
890                   struct GSF_PendingRequest *pr, uint32_t reply_anonymity_level,
891                   struct GNUNET_TIME_Absolute expiration,
892                   struct GNUNET_TIME_Absolute last_transmission,
893                   enum GNUNET_BLOCK_Type type, const void *data,
894                   size_t data_len)
895 {
896   struct PeerRequest *peerreq = cls;
897   struct GSF_ConnectedPeer *cp = peerreq->cp;
898   struct GSF_PendingRequestData *prd;
899   struct PutMessage *pm;
900   size_t msize;
901
902   GNUNET_assert (data_len + sizeof (struct PutMessage) <
903                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
904   GNUNET_assert (peerreq->pr == pr);
905   prd = GSF_pending_request_get_data_ (pr);
906   if (NULL == data)
907   {
908     free_pending_request (peerreq, &prd->query);
909     return;
910   }
911   GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY);
912   if ((prd->type != type) && (prd->type != GNUNET_BLOCK_TYPE_ANY))
913   {
914     GNUNET_STATISTICS_update (GSF_stats,
915                               gettext_noop
916                               ("# replies dropped due to type mismatch"),
917                                 1, GNUNET_NO);
918     return;
919   }
920   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921               "Transmitting result for query `%s' to peer\n",
922               GNUNET_h2s (&prd->query));
923   GNUNET_STATISTICS_update (GSF_stats,
924                             gettext_noop ("# replies received for other peers"),
925                             1, GNUNET_NO);
926   msize = sizeof (struct PutMessage) + data_len;
927   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
928   {
929     GNUNET_break (0);
930     return;
931   }
932   if ((reply_anonymity_level != UINT32_MAX) && (reply_anonymity_level > 1))
933   {
934     if (reply_anonymity_level - 1 > GSF_cover_content_count)
935     {
936       GNUNET_STATISTICS_update (GSF_stats,
937                                 gettext_noop
938                                 ("# replies dropped due to insufficient cover traffic"),
939                                 1, GNUNET_NO);
940       return;
941     }
942     GSF_cover_content_count -= (reply_anonymity_level - 1);
943   }
944
945   pm = GNUNET_malloc (msize);
946   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
947   pm->header.size = htons (msize);
948   pm->type = htonl (type);
949   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
950   memcpy (&pm[1], data, data_len);
951   if ((reply_anonymity_level != UINT32_MAX) && (reply_anonymity_level != 0) &&
952       (GSF_enable_randomized_delays == GNUNET_YES))
953   {
954     struct GSF_DelayedHandle *dh;
955
956     dh = GNUNET_malloc (sizeof (struct GSF_DelayedHandle));
957     dh->cp = cp;
958     dh->pm = pm;
959     dh->msize = msize;
960     GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
961     dh->delay_task =
962         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
963                                       &transmit_delayed_now, dh);
964   }
965   else
966   {
967     (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
968                                &copy_reply, pm);
969   }
970   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
971     return;
972   if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task)
973   {
974     GNUNET_STATISTICS_update (GSF_stats,
975                               gettext_noop
976                               ("# P2P searches destroyed due to ultimate reply"),
977                               1, GNUNET_NO);
978     peerreq->kill_task =
979         GNUNET_SCHEDULER_add_now (&peer_request_destroy, peerreq);
980   }
981 }
982
983
984 /**
985  * Increase the host credit by a value.
986  *
987  * @param cp which peer to change the trust value on
988  * @param value is the int value by which the
989  *  host credit is to be increased or decreased
990  * @returns the actual change in trust (positive or negative)
991  */
992 static int
993 change_host_trust (struct GSF_ConnectedPeer *cp, int value)
994 {
995   if (value == 0)
996     return 0;
997   GNUNET_assert (cp != NULL);
998   if (value > 0)
999   {
1000     if (cp->ppd.trust + value < cp->ppd.trust)
1001     {
1002       value = UINT32_MAX - cp->ppd.trust;
1003       cp->ppd.trust = UINT32_MAX;
1004     }
1005     else
1006       cp->ppd.trust += value;
1007   }
1008   else
1009   {
1010     if (cp->ppd.trust < -value)
1011     {
1012       value = -cp->ppd.trust;
1013       cp->ppd.trust = 0;
1014     }
1015     else
1016       cp->ppd.trust += value;
1017   }
1018   return value;
1019 }
1020
1021
1022 /**
1023  * We've received a request with the specified priority.  Bound it
1024  * according to how much we trust the given peer.
1025  *
1026  * @param prio_in requested priority
1027  * @param cp the peer making the request
1028  * @return effective priority
1029  */
1030 static int32_t
1031 bound_priority (uint32_t prio_in, struct GSF_ConnectedPeer *cp)
1032 {
1033 #define N ((double)128.0)
1034   uint32_t ret;
1035   double rret;
1036   int ld;
1037
1038   ld = GSF_test_get_load_too_high_ (0);
1039   if (ld == GNUNET_SYSERR)
1040   {
1041     GNUNET_STATISTICS_update (GSF_stats,
1042                               gettext_noop
1043                               ("# requests done for free (low load)"), 1,
1044                               GNUNET_NO);
1045     return 0;                   /* excess resources */
1046   }
1047   if (prio_in > INT32_MAX)
1048     prio_in = INT32_MAX;
1049   ret = -change_host_trust (cp, -(int) prio_in);
1050   if (ret > 0)
1051   {
1052     if (ret > GSF_current_priorities + N)
1053       rret = GSF_current_priorities + N;
1054     else
1055       rret = ret;
1056     GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1057   }
1058   if ((ld == GNUNET_YES) && (ret > 0))
1059   {
1060     /* try with charging */
1061     ld = GSF_test_get_load_too_high_ (ret);
1062   }
1063   if (ld == GNUNET_YES)
1064   {
1065     GNUNET_STATISTICS_update (GSF_stats,
1066                               gettext_noop
1067                               ("# request dropped, priority insufficient"), 1,
1068                               GNUNET_NO);
1069     /* undo charge */
1070     change_host_trust (cp, (int) ret);
1071     return -1;                  /* not enough resources */
1072   }
1073   else
1074   {
1075     GNUNET_STATISTICS_update (GSF_stats,
1076                               gettext_noop
1077                               ("# requests done for a price (normal load)"), 1,
1078                               GNUNET_NO);
1079   }
1080 #undef N
1081   return ret;
1082 }
1083
1084
1085 /**
1086  * The priority level imposes a bound on the maximum
1087  * value for the ttl that can be requested.
1088  *
1089  * @param ttl_in requested ttl
1090  * @param prio given priority
1091  * @return ttl_in if ttl_in is below the limit,
1092  *         otherwise the ttl-limit for the given priority
1093  */
1094 static int32_t
1095 bound_ttl (int32_t ttl_in, uint32_t prio)
1096 {
1097   unsigned long long allowed;
1098
1099   if (ttl_in <= 0)
1100     return ttl_in;
1101   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1102   if (ttl_in > allowed)
1103   {
1104     if (allowed >= (1 << 30))
1105       return 1 << 30;
1106     return allowed;
1107   }
1108   return ttl_in;
1109 }
1110
1111
1112 /**
1113  * Handle P2P "QUERY" message.  Creates the pending request entry
1114  * and sets up all of the data structures to that we will
1115  * process replies properly.  Does not initiate forwarding or
1116  * local database lookups.
1117  *
1118  * @param other the other peer involved (sender or receiver, NULL
1119  *        for loopback messages where we are both sender and receiver)
1120  * @param message the actual message
1121  * @return pending request handle, NULL on error
1122  */
1123 struct GSF_PendingRequest *
1124 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1125                        const struct GNUNET_MessageHeader *message)
1126 {
1127   struct PeerRequest *peerreq;
1128   struct GSF_PendingRequest *pr;
1129   struct GSF_PendingRequestData *prd;
1130   struct GSF_ConnectedPeer *cp;
1131   struct GSF_ConnectedPeer *cps;
1132   const struct GNUNET_HashCode *namespace;
1133   const struct GNUNET_PeerIdentity *target;
1134   enum GSF_PendingRequestOptions options;
1135   uint16_t msize;
1136   const struct GetMessage *gm;
1137   unsigned int bits;
1138   const struct GNUNET_HashCode *opt;
1139   uint32_t bm;
1140   size_t bfsize;
1141   uint32_t ttl_decrement;
1142   int32_t priority;
1143   int32_t ttl;
1144   enum GNUNET_BLOCK_Type type;
1145   GNUNET_PEER_Id spid;
1146
1147   GNUNET_assert (other != NULL);
1148   msize = ntohs (message->size);
1149   if (msize < sizeof (struct GetMessage))
1150   {
1151     GNUNET_break_op (0);
1152     return NULL;
1153   }
1154   GNUNET_STATISTICS_update (GSF_stats,
1155                             gettext_noop
1156                             ("# GET requests received (from other peers)"), 1,
1157                             GNUNET_NO);
1158   gm = (const struct GetMessage *) message;
1159   type = ntohl (gm->type);
1160   bm = ntohl (gm->hash_bitmap);
1161   bits = 0;
1162   while (bm > 0)
1163   {
1164     if (1 == (bm & 1))
1165       bits++;
1166     bm >>= 1;
1167   }
1168   if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_HashCode))
1169   {
1170     GNUNET_break_op (0);
1171     return NULL;
1172   }
1173   opt = (const struct GNUNET_HashCode *) &gm[1];
1174   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_HashCode);
1175   /* bfsize must be power of 2, check! */
1176   if (0 != ((bfsize - 1) & bfsize))
1177   {
1178     GNUNET_break_op (0);
1179     return NULL;
1180   }
1181   GSF_cover_query_count++;
1182   bm = ntohl (gm->hash_bitmap);
1183   bits = 0;
1184   cps = GSF_peer_get_ (other);
1185   if (NULL == cps)
1186   {
1187     /* peer must have just disconnected */
1188     GNUNET_STATISTICS_update (GSF_stats,
1189                               gettext_noop
1190                               ("# requests dropped due to initiator not being connected"),
1191                               1, GNUNET_NO);
1192     return NULL;
1193   }
1194   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1195     cp = GSF_peer_get_ ((const struct GNUNET_PeerIdentity *) &opt[bits++]);
1196   else
1197     cp = cps;
1198   if (cp == NULL)
1199   {
1200     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1201       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1202                   "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
1203                   GNUNET_i2s ((const struct GNUNET_PeerIdentity *)
1204                               &opt[bits - 1]));
1205
1206     else
1207       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1208                   "Failed to find peer `%4s' in connection set. Dropping query.\n",
1209                   GNUNET_i2s (other));
1210     GNUNET_STATISTICS_update (GSF_stats,
1211                               gettext_noop
1212                               ("# requests dropped due to missing reverse route"),
1213                               1, GNUNET_NO);
1214     return NULL;
1215   }
1216   /* note that we can really only check load here since otherwise
1217    * peers could find out that we are overloaded by not being
1218    * disconnected after sending us a malformed query... */
1219   priority = bound_priority (ntohl (gm->priority), cps);
1220   if (priority < 0)
1221   {
1222     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1223                 "Dropping query from `%s', this peer is too busy.\n",
1224                 GNUNET_i2s (other));
1225     return NULL;
1226   }
1227   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1228               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
1229               GNUNET_h2s (&gm->query), (unsigned int) type, GNUNET_i2s (other),
1230               (unsigned int) bm);
1231   namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
1232   if ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) && (namespace == NULL))
1233   {
1234     GNUNET_break_op (0);
1235     return NULL;
1236   }
1237   if ((type != GNUNET_BLOCK_TYPE_FS_SBLOCK) && (namespace != NULL))
1238   {
1239     GNUNET_break_op (0);
1240     return NULL;
1241   }
1242   target =
1243       (0 !=
1244        (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity
1245                                                *) &opt[bits++]) : NULL;
1246   options = GSF_PRO_DEFAULTS;
1247   spid = 0;
1248   if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
1249       || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1250           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 +
1251           GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1252   {
1253     /* don't have BW to send to peer, or would likely take longer than we have for it,
1254      * so at best indirect the query */
1255     priority = 0;
1256     options |= GSF_PRO_FORWARD_ONLY;
1257     spid = GNUNET_PEER_intern (other);
1258     GNUNET_assert (0 != spid);
1259   }
1260   ttl = bound_ttl (ntohl (gm->ttl), priority);
1261   /* decrement ttl (always) */
1262   ttl_decrement =
1263       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1264                                                     TTL_DECREMENT);
1265   if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
1266   {
1267     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1268                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1269                 GNUNET_i2s (other), ttl, ttl_decrement);
1270     GNUNET_STATISTICS_update (GSF_stats,
1271                               gettext_noop
1272                               ("# requests dropped due TTL underflow"), 1,
1273                               GNUNET_NO);
1274     /* integer underflow => drop (should be very rare)! */
1275     return NULL;
1276   }
1277   ttl -= ttl_decrement;
1278
1279   /* test if the request already exists */
1280   peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map, &gm->query);
1281   if (peerreq != NULL)
1282   {
1283     pr = peerreq->pr;
1284     prd = GSF_pending_request_get_data_ (pr);
1285     if ((prd->type == type) &&
1286         ((type != GNUNET_BLOCK_TYPE_FS_SBLOCK) ||
1287          (0 == memcmp (&prd->namespace, namespace, sizeof (struct GNUNET_HashCode)))))
1288     {
1289       if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get ().abs_value + ttl)
1290       {
1291         /* existing request has higher TTL, drop new one! */
1292         prd->priority += priority;
1293         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1294                     "Have existing request with higher TTL, dropping new request.\n",
1295                     GNUNET_i2s (other));
1296         GNUNET_STATISTICS_update (GSF_stats,
1297                                   gettext_noop
1298                                   ("# requests dropped due to higher-TTL request"),
1299                                   1, GNUNET_NO);
1300         return NULL;
1301       }
1302       /* existing request has lower TTL, drop old one! */
1303       priority += prd->priority;
1304       GSF_pending_request_cancel_ (pr, GNUNET_YES);
1305       free_pending_request (peerreq, &gm->query);
1306     }
1307   }
1308
1309   peerreq = GNUNET_malloc (sizeof (struct PeerRequest));
1310   peerreq->cp = cp;
1311   pr = GSF_pending_request_create_ (options, type, &gm->query, namespace,
1312                                     target,
1313                                     (bfsize >
1314                                      0) ? (const char *) &opt[bits] : NULL,
1315                                     bfsize, ntohl (gm->filter_mutator),
1316                                     1 /* anonymity */ ,
1317                                     (uint32_t) priority, ttl, spid, GNUNET_PEER_intern (other), NULL, 0,        /* replies_seen */
1318                                     &handle_p2p_reply, peerreq);
1319   GNUNET_assert (NULL != pr);
1320   peerreq->pr = pr;
1321   GNUNET_break (GNUNET_OK ==
1322                 GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query,
1323                                                    peerreq,
1324                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1325   GNUNET_STATISTICS_update (GSF_stats,
1326                             gettext_noop
1327                             ("# P2P query messages received and processed"), 1,
1328                             GNUNET_NO);
1329   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
1330                             1, GNUNET_NO);
1331   return pr;
1332 }
1333
1334
1335 /**
1336  * Function called if there has been a timeout trying to satisfy
1337  * a transmission request.
1338  *
1339  * @param cls the 'struct GSF_PeerTransmitHandle' of the request
1340  * @param tc scheduler context
1341  */
1342 static void
1343 peer_transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1344 {
1345   struct GSF_PeerTransmitHandle *pth = cls;
1346   struct GSF_ConnectedPeer *cp;
1347
1348   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349               "Timeout trying to transmit to other peer\n");
1350   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1351   cp = pth->cp;
1352   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1353   if (GNUNET_YES == pth->is_query)
1354     GNUNET_assert (0 < cp->ppd.pending_queries--);
1355   else if (GNUNET_NO == pth->is_query)
1356     GNUNET_assert (0 < cp->ppd.pending_replies--);
1357   GNUNET_LOAD_update (cp->ppd.transmission_delay, UINT64_MAX);
1358   if (NULL != cp->cth)
1359   {
1360     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1361     cp->cth = NULL;
1362   }
1363   pth->gmc (pth->gmc_cls, 0, NULL);
1364   GNUNET_assert (0 == cp->cth_in_progress);
1365   GNUNET_free (pth);
1366 }
1367
1368
1369 /**
1370  * Transmit a message to the given peer as soon as possible.
1371  * If the peer disconnects before the transmission can happen,
1372  * the callback is invoked with a 'NULL' buffer.
1373  *
1374  * @param cp target peer
1375  * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
1376  * @param priority how important is this request?
1377  * @param timeout when does this request timeout (call gmc with error)
1378  * @param size number of bytes we would like to send to the peer
1379  * @param gmc function to call to get the message
1380  * @param gmc_cls closure for gmc
1381  * @return handle to cancel request
1382  */
1383 struct GSF_PeerTransmitHandle *
1384 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, int is_query,
1385                     uint32_t priority, struct GNUNET_TIME_Relative timeout,
1386                     size_t size, GSF_GetMessageCallback gmc, void *gmc_cls)
1387 {
1388   struct GSF_PeerTransmitHandle *pth;
1389   struct GSF_PeerTransmitHandle *pos;
1390   struct GSF_PeerTransmitHandle *prev;
1391
1392   pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
1393   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1394   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1395   pth->gmc = gmc;
1396   pth->gmc_cls = gmc_cls;
1397   pth->size = size;
1398   pth->is_query = is_query;
1399   pth->priority = priority;
1400   pth->cp = cp;
1401   /* insertion sort (by priority, descending) */
1402   prev = NULL;
1403   pos = cp->pth_head;
1404   while ((pos != NULL) && (pos->priority > priority))
1405   {
1406     prev = pos;
1407     pos = pos->next;
1408   }
1409   GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
1410   if (GNUNET_YES == is_query)
1411     cp->ppd.pending_queries++;
1412   else if (GNUNET_NO == is_query)
1413     cp->ppd.pending_replies++;
1414   pth->timeout_task =
1415       GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
1416   schedule_transmission (pth);
1417   return pth;
1418 }
1419
1420
1421 /**
1422  * Cancel an earlier request for transmission.
1423  *
1424  * @param pth request to cancel
1425  */
1426 void
1427 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1428 {
1429   struct GSF_ConnectedPeer *cp;
1430
1431   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1432   {
1433     GNUNET_SCHEDULER_cancel (pth->timeout_task);
1434     pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1435   }
1436   cp = pth->cp;
1437   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1438   if (GNUNET_YES == pth->is_query)
1439     GNUNET_assert (0 < cp->ppd.pending_queries--);
1440   else if (GNUNET_NO == pth->is_query)
1441     GNUNET_assert (0 < cp->ppd.pending_replies--);
1442   GNUNET_free (pth);
1443 }
1444
1445
1446 /**
1447  * Report on receiving a reply; update the performance record of the given peer.
1448  *
1449  * @param cp responding peer (will be updated)
1450  * @param request_time time at which the original query was transmitted
1451  * @param request_priority priority of the original request
1452  */
1453 void
1454 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1455                               struct GNUNET_TIME_Absolute request_time,
1456                               uint32_t request_priority)
1457 {
1458   struct GNUNET_TIME_Relative delay;
1459
1460   delay = GNUNET_TIME_absolute_get_duration (request_time);
1461   cp->ppd.avg_reply_delay.rel_value =
1462       (cp->ppd.avg_reply_delay.rel_value * (RUNAVG_DELAY_N - 1) +
1463        delay.rel_value) / RUNAVG_DELAY_N;
1464   cp->ppd.avg_priority =
1465       (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1466        request_priority) / RUNAVG_DELAY_N;
1467 }
1468
1469
1470 /**
1471  * Report on receiving a reply in response to an initiating client.
1472  * Remember that this peer is good for this client.
1473  *
1474  * @param cp responding peer (will be updated)
1475  * @param initiator_client local client on responsible for query
1476  */
1477 void
1478 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1479                                    struct GSF_LocalClient *initiator_client)
1480 {
1481   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1482                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1483 }
1484
1485
1486 /**
1487  * Report on receiving a reply in response to an initiating peer.
1488  * Remember that this peer is good for this initiating peer.
1489  *
1490  * @param cp responding peer (will be updated)
1491  * @param initiator_peer other peer responsible for query
1492  */
1493 void
1494 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1495                                  const struct GSF_ConnectedPeer *initiator_peer)
1496 {
1497   unsigned int woff;
1498
1499   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1500   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1501   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1502   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1503   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1504 }
1505
1506
1507 /**
1508  * A peer disconnected from us.  Tear down the connected peer
1509  * record.
1510  *
1511  * @param cls unused
1512  * @param peer identity of peer that connected
1513  */
1514 void
1515 GSF_peer_disconnect_handler_ (void *cls, const struct GNUNET_PeerIdentity *peer)
1516 {
1517   struct GSF_ConnectedPeer *cp;
1518   struct GSF_PeerTransmitHandle *pth;
1519   struct GSF_DelayedHandle *dh;
1520
1521   cp = GSF_peer_get_ (peer);
1522   if (NULL == cp)
1523     return;                     /* must have been disconnect from core with
1524                                  * 'peer' == my_id, ignore */
1525   GNUNET_assert (GNUNET_YES ==
1526                  GNUNET_CONTAINER_multihashmap_remove (cp_map,
1527                                                        &peer->hashPubKey, cp));
1528   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1529                          GNUNET_CONTAINER_multihashmap_size (cp_map),
1530                          GNUNET_NO);
1531   if (NULL != cp->migration_pth)
1532   {
1533     GSF_peer_transmit_cancel_ (cp->migration_pth);
1534     cp->migration_pth = NULL;
1535   }
1536   if (NULL != cp->rc)
1537   {
1538     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1539     cp->rc = NULL;
1540   }
1541   if (GNUNET_SCHEDULER_NO_TASK != cp->rc_delay_task)
1542   {
1543     GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1544     cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
1545   }
1546   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1547                                          &cancel_pending_request, cp);
1548   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1549   cp->request_map = NULL;
1550   GSF_plan_notify_peer_disconnect_ (cp);
1551   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1552   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1553   memset (cp->ppd.last_p2p_replies, 0, sizeof (cp->ppd.last_p2p_replies));
1554   GSF_push_stop_ (cp);
1555   if (NULL != cp->cth)
1556   {
1557     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1558     cp->cth = NULL;
1559   }
1560   GNUNET_assert (0 == cp->cth_in_progress);
1561   while (NULL != (pth = cp->pth_head))
1562   {
1563     if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1564     {
1565       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1566       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1567     }
1568     GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1569     pth->gmc (pth->gmc_cls, 0, NULL);
1570     GNUNET_free (pth);
1571   }
1572   while (NULL != (dh = cp->delayed_head))
1573   {
1574     GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
1575     GNUNET_SCHEDULER_cancel (dh->delay_task);
1576     GNUNET_free (dh->pm);
1577     GNUNET_free (dh);
1578   }
1579   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1580   if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1581   {
1582     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1583     cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
1584   }
1585   GNUNET_free (cp);
1586 }
1587
1588
1589 /**
1590  * Closure for 'call_iterator'.
1591  */
1592 struct IterationContext
1593 {
1594   /**
1595    * Function to call on each entry.
1596    */
1597   GSF_ConnectedPeerIterator it;
1598
1599   /**
1600    * Closure for 'it'.
1601    */
1602   void *it_cls;
1603 };
1604
1605
1606 /**
1607  * Function that calls the callback for each peer.
1608  *
1609  * @param cls the 'struct IterationContext*'
1610  * @param key identity of the peer
1611  * @param value the 'struct GSF_ConnectedPeer*'
1612  * @return GNUNET_YES to continue iteration
1613  */
1614 static int
1615 call_iterator (void *cls, const struct GNUNET_HashCode * key, void *value)
1616 {
1617   struct IterationContext *ic = cls;
1618   struct GSF_ConnectedPeer *cp = value;
1619
1620   ic->it (ic->it_cls, (const struct GNUNET_PeerIdentity *) key, cp, &cp->ppd);
1621   return GNUNET_YES;
1622 }
1623
1624
1625 /**
1626  * Iterate over all connected peers.
1627  *
1628  * @param it function to call for each peer
1629  * @param it_cls closure for it
1630  */
1631 void
1632 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it, void *it_cls)
1633 {
1634   struct IterationContext ic;
1635
1636   ic.it = it;
1637   ic.it_cls = it_cls;
1638   GNUNET_CONTAINER_multihashmap_iterate (cp_map, &call_iterator, &ic);
1639 }
1640
1641
1642 /**
1643  * Obtain the identity of a connected peer.
1644  *
1645  * @param cp peer to reserve bandwidth from
1646  * @param id identity to set (written to)
1647  */
1648 void
1649 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1650                                   struct GNUNET_PeerIdentity *id)
1651 {
1652   GNUNET_assert (0 != cp->ppd.pid);
1653   GNUNET_PEER_resolve (cp->ppd.pid, id);
1654 }
1655
1656
1657 /**
1658  * Assemble a migration stop message for transmission.
1659  *
1660  * @param cls the 'struct GSF_ConnectedPeer' to use
1661  * @param size number of bytes we're allowed to write to buf
1662  * @param buf where to copy the message
1663  * @return number of bytes copied to buf
1664  */
1665 static size_t
1666 create_migration_stop_message (void *cls, size_t size, void *buf)
1667 {
1668   struct GSF_ConnectedPeer *cp = cls;
1669   struct MigrationStopMessage msm;
1670
1671   cp->migration_pth = NULL;
1672   if (NULL == buf)
1673     return 0;
1674   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1675   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1676   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1677   msm.reserved = htonl (0);
1678   msm.duration =
1679       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1680                                  (cp->last_migration_block));
1681   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1682   GNUNET_STATISTICS_update (GSF_stats,
1683                             gettext_noop ("# migration stop messages sent"),
1684                             1, GNUNET_NO);
1685   return sizeof (struct MigrationStopMessage);
1686 }
1687
1688
1689 /**
1690  * Ask a peer to stop migrating data to us until the given point
1691  * in time.
1692  *
1693  * @param cp peer to ask
1694  * @param block_time until when to block
1695  */
1696 void
1697 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1698                            struct GNUNET_TIME_Absolute block_time)
1699 {
1700   if (cp->last_migration_block.abs_value > block_time.abs_value)
1701   {
1702     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1703                 "Migration already blocked for another %llu ms\n",
1704                 (unsigned long long)
1705                 GNUNET_TIME_absolute_get_remaining
1706                 (cp->last_migration_block).rel_value);
1707     return;                     /* already blocked */
1708   }
1709   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %llu ms\n",
1710               (unsigned long long) GNUNET_TIME_absolute_get_remaining (block_time).rel_value);
1711   cp->last_migration_block = block_time;
1712   if (cp->migration_pth != NULL)
1713     GSF_peer_transmit_cancel_ (cp->migration_pth);
1714   cp->migration_pth =
1715       GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1716                           GNUNET_TIME_UNIT_FOREVER_REL,
1717                           sizeof (struct MigrationStopMessage),
1718                           &create_migration_stop_message, cp);
1719 }
1720
1721
1722 /**
1723  * Write host-trust information to a file - flush the buffer entry!
1724  *
1725  * @param cls closure, not used
1726  * @param key host identity
1727  * @param value the 'struct GSF_ConnectedPeer' to flush
1728  * @return GNUNET_OK to continue iteration
1729  */
1730 static int
1731 flush_trust (void *cls, const struct GNUNET_HashCode * key, void *value)
1732 {
1733   struct GSF_ConnectedPeer *cp = value;
1734   char *fn;
1735   uint32_t trust;
1736   struct GNUNET_PeerIdentity pid;
1737
1738   if (cp->ppd.trust == cp->disk_trust)
1739     return GNUNET_OK;           /* unchanged */
1740   GNUNET_assert (0 != cp->ppd.pid);
1741   GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1742   fn = get_trust_filename (&pid);
1743   if (cp->ppd.trust == 0)
1744   {
1745     if ((0 != UNLINK (fn)) && (errno != ENOENT))
1746       GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1747                                 GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1748   }
1749   else
1750   {
1751     trust = htonl (cp->ppd.trust);
1752     if (sizeof (uint32_t) ==
1753         GNUNET_DISK_fn_write (fn, &trust, sizeof (uint32_t),
1754                               GNUNET_DISK_PERM_USER_READ |
1755                               GNUNET_DISK_PERM_USER_WRITE |
1756                               GNUNET_DISK_PERM_GROUP_READ |
1757                               GNUNET_DISK_PERM_OTHER_READ))
1758       cp->disk_trust = cp->ppd.trust;
1759   }
1760   GNUNET_free (fn);
1761   return GNUNET_OK;
1762 }
1763
1764
1765 /**
1766  * Notify core about a preference we have for the given peer
1767  * (to allocate more resources towards it).  The change will
1768  * be communicated the next time we reserve bandwidth with
1769  * core (not instantly).
1770  *
1771  * @param cp peer to reserve bandwidth from
1772  * @param pref preference change
1773  */
1774 void
1775 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1776                                        uint64_t pref)
1777 {
1778   cp->inc_preference += pref;
1779 }
1780
1781
1782 /**
1783  * Call this method periodically to flush trust information to disk.
1784  *
1785  * @param cls closure, not used
1786  * @param tc task context, not used
1787  */
1788 static void
1789 cron_flush_trust (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1790 {
1791
1792   if (NULL == cp_map)
1793     return;
1794   GNUNET_CONTAINER_multihashmap_iterate (cp_map, &flush_trust, NULL);
1795   if (NULL == tc)
1796     return;
1797   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1798     return;
1799   GNUNET_SCHEDULER_add_delayed_with_priority (TRUST_FLUSH_FREQ,
1800                                               GNUNET_SCHEDULER_PRIORITY_HIGH,
1801                                               &cron_flush_trust, NULL);
1802 }
1803
1804
1805 /**
1806  * Initialize peer management subsystem.
1807  */
1808 void
1809 GSF_connected_peer_init_ ()
1810 {
1811   cp_map = GNUNET_CONTAINER_multihashmap_create (128);
1812   ats = GNUNET_ATS_performance_init (GSF_cfg, NULL, NULL);
1813   GNUNET_assert (GNUNET_OK ==
1814                  GNUNET_CONFIGURATION_get_value_filename (GSF_cfg, "fs",
1815                                                           "TRUST",
1816                                                           &trustDirectory));
1817   GNUNET_DISK_directory_create (trustDirectory);
1818   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1819                                       &cron_flush_trust, NULL);
1820 }
1821
1822
1823 /**
1824  * Iterator to free peer entries.
1825  *
1826  * @param cls closure, unused
1827  * @param key current key code
1828  * @param value value in the hash map (peer entry)
1829  * @return GNUNET_YES (we should continue to iterate)
1830  */
1831 static int
1832 clean_peer (void *cls, const struct GNUNET_HashCode * key, void *value)
1833 {
1834   GSF_peer_disconnect_handler_ (NULL, (const struct GNUNET_PeerIdentity *) key);
1835   return GNUNET_YES;
1836 }
1837
1838
1839 /**
1840  * Shutdown peer management subsystem.
1841  */
1842 void
1843 GSF_connected_peer_done_ ()
1844 {
1845   cron_flush_trust (NULL, NULL);
1846   GNUNET_CONTAINER_multihashmap_iterate (cp_map, &clean_peer, NULL);
1847   GNUNET_CONTAINER_multihashmap_destroy (cp_map);
1848   cp_map = NULL;
1849   GNUNET_free (trustDirectory);
1850   trustDirectory = NULL;
1851   GNUNET_ATS_performance_done (ats);
1852   ats = NULL;
1853 }
1854
1855
1856 /**
1857  * Iterator to remove references to LC entry.
1858  *
1859  * @param cls the 'struct GSF_LocalClient*' to look for
1860  * @param key current key code
1861  * @param value value in the hash map (peer entry)
1862  * @return GNUNET_YES (we should continue to iterate)
1863  */
1864 static int
1865 clean_local_client (void *cls, const struct GNUNET_HashCode * key, void *value)
1866 {
1867   const struct GSF_LocalClient *lc = cls;
1868   struct GSF_ConnectedPeer *cp = value;
1869   unsigned int i;
1870
1871   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1872     if (cp->ppd.last_client_replies[i] == lc)
1873       cp->ppd.last_client_replies[i] = NULL;
1874   return GNUNET_YES;
1875 }
1876
1877
1878 /**
1879  * Notification that a local client disconnected.  Clean up all of our
1880  * references to the given handle.
1881  *
1882  * @param lc handle to the local client (henceforth invalid)
1883  */
1884 void
1885 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1886 {
1887   if (NULL == cp_map)
1888     return;                     /* already cleaned up */
1889   GNUNET_CONTAINER_multihashmap_iterate (cp_map, &clean_local_client,
1890                                          (void *) lc);
1891 }
1892
1893
1894 /* end of gnunet-service-fs_cp.c */