-only trigger check config if we actually need it
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file fs/gnunet-service-fs_cp.c
22  * @brief API to handle 'connected peers'
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_pe.h"
31 #include "gnunet-service-fs_pr.h"
32 #include "gnunet-service-fs_push.h"
33 #include "gnunet_peerstore_service.h"
34
35
36 /**
37  * Ratio for moving average delay calculation.  The previous
38  * average goes in with a factor of (n-1) into the calculation.
39  * Must be > 0.
40  */
41 #define RUNAVG_DELAY_N 16
42
43 /**
44  * How often do we flush respect values to disk?
45  */
46 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
47
48 /**
49  * After how long do we discard a reply?
50  */
51 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
52
53 /**
54  * Collect an instane number of statistics?  May cause excessive IPC.
55  */
56 #define INSANE_STATISTICS GNUNET_NO
57
58
59 /**
60  * Handle to cancel a transmission request.
61  */
62 struct GSF_PeerTransmitHandle
63 {
64
65   /**
66    * Kept in a doubly-linked list.
67    */
68   struct GSF_PeerTransmitHandle *next;
69
70   /**
71    * Kept in a doubly-linked list.
72    */
73   struct GSF_PeerTransmitHandle *prev;
74
75   /**
76    * Time when this transmission request was issued.
77    */
78   struct GNUNET_TIME_Absolute transmission_request_start_time;
79
80   /**
81    * Timeout for this request.
82    */
83   struct GNUNET_TIME_Absolute timeout;
84
85   /**
86    * Task called on timeout, or 0 for none.
87    */
88   struct GNUNET_SCHEDULER_Task *timeout_task;
89
90   /**
91    * Function to call to get the actual message.
92    */
93   GSF_GetMessageCallback gmc;
94
95   /**
96    * Peer this request targets.
97    */
98   struct GSF_ConnectedPeer *cp;
99
100   /**
101    * Closure for @e gmc.
102    */
103   void *gmc_cls;
104
105   /**
106    * Size of the message to be transmitted.
107    */
108   size_t size;
109
110   /**
111    * #GNUNET_YES if this is a query, #GNUNET_NO for content.
112    */
113   int is_query;
114
115   /**
116    * Did we get a reservation already?
117    */
118   int was_reserved;
119
120   /**
121    * Priority of this request.
122    */
123   uint32_t priority;
124
125 };
126
127
128 /**
129  * Handle for an entry in our delay list.
130  */
131 struct GSF_DelayedHandle
132 {
133
134   /**
135    * Kept in a doubly-linked list.
136    */
137   struct GSF_DelayedHandle *next;
138
139   /**
140    * Kept in a doubly-linked list.
141    */
142   struct GSF_DelayedHandle *prev;
143
144   /**
145    * Peer this transmission belongs to.
146    */
147   struct GSF_ConnectedPeer *cp;
148
149   /**
150    * The PUT that was delayed.
151    */
152   struct PutMessage *pm;
153
154   /**
155    * Task for the delay.
156    */
157   struct GNUNET_SCHEDULER_Task *delay_task;
158
159   /**
160    * Size of the message.
161    */
162   size_t msize;
163
164 };
165
166
167 /**
168  * Information per peer and request.
169  */
170 struct PeerRequest
171 {
172
173   /**
174    * Handle to generic request (generic: from peer or local client).
175    */
176   struct GSF_PendingRequest *pr;
177
178   /**
179    * Which specific peer issued this request?
180    */
181   struct GSF_ConnectedPeer *cp;
182
183   /**
184    * Task for asynchronous stopping of this request.
185    */
186   struct GNUNET_SCHEDULER_Task *kill_task;
187
188 };
189
190
191 /**
192  * A connected peer.
193  */
194 struct GSF_ConnectedPeer
195 {
196
197   /**
198    * Performance data for this peer.
199    */
200   struct GSF_PeerPerformanceData ppd;
201
202   /**
203    * Time until when we blocked this peer from migrating
204    * data to us.
205    */
206   struct GNUNET_TIME_Absolute last_migration_block;
207
208   /**
209    * Task scheduled to revive migration to this peer.
210    */
211   struct GNUNET_SCHEDULER_Task *mig_revive_task;
212
213   /**
214    * Messages (replies, queries, content migration) we would like to
215    * send to this peer in the near future.  Sorted by priority, head.
216    */
217   struct GSF_PeerTransmitHandle *pth_head;
218
219   /**
220    * Messages (replies, queries, content migration) we would like to
221    * send to this peer in the near future.  Sorted by priority, tail.
222    */
223   struct GSF_PeerTransmitHandle *pth_tail;
224
225   /**
226    * Messages (replies, queries, content migration) we would like to
227    * send to this peer in the near future.  Sorted by priority, head.
228    */
229   struct GSF_DelayedHandle *delayed_head;
230
231   /**
232    * Messages (replies, queries, content migration) we would like to
233    * send to this peer in the near future.  Sorted by priority, tail.
234    */
235   struct GSF_DelayedHandle *delayed_tail;
236
237   /**
238    * Migration stop message in our queue, or NULL if we have none pending.
239    */
240   struct GSF_PeerTransmitHandle *migration_pth;
241
242   /**
243    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
244    */
245   struct GNUNET_ATS_ReservationContext *rc;
246
247   /**
248    * Task scheduled if we need to retry bandwidth reservation later.
249    */
250   struct GNUNET_SCHEDULER_Task *rc_delay_task;
251
252   /**
253    * Active requests from this neighbour, map of query to `struct PeerRequest`.
254    */
255   struct GNUNET_CONTAINER_MultiHashMap *request_map;
256
257   /**
258    * Handle for an active request for transmission to this
259    * peer, or NULL (if core queue was full).
260    */
261   struct GNUNET_CORE_TransmitHandle *cth;
262
263   /**
264    * Increase in traffic preference still to be submitted
265    * to the core service for this peer.
266    */
267   uint64_t inc_preference;
268
269   /**
270    * Set to 1 if we're currently in the process of calling
271    * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
272    * NULL, we should not call notify_transmit_ready for this
273    * handle right now).
274    */
275   unsigned int cth_in_progress;
276
277   /**
278    * Number of entries in @e delayed_head DLL.
279    */
280   unsigned int delay_queue_size;
281
282   /**
283    * Respect rating for this peer on disk.
284    */
285   uint32_t disk_respect;
286
287   /**
288    * Which offset in @e last_p2p_replies will be updated next?
289    * (we go round-robin).
290    */
291   unsigned int last_p2p_replies_woff;
292
293   /**
294    * Which offset in @e last_client_replies will be updated next?
295    * (we go round-robin).
296    */
297   unsigned int last_client_replies_woff;
298
299   /**
300    * Current offset into @e last_request_times ring buffer.
301    */
302   unsigned int last_request_times_off;
303
304   /**
305    * #GNUNET_YES if we did successfully reserve 32k bandwidth,
306    * #GNUNET_NO if not.
307    */
308   int did_reserve;
309
310   /**
311    * Function called when the creation of this record is complete.
312    */
313   GSF_ConnectedPeerCreationCallback creation_cb;
314
315   /**
316    * Closure for @e creation_cb
317    */
318   void *creation_cb_cls;
319
320   /**
321    * Handle to the PEERSTORE iterate request for peer respect value
322    */
323   struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
324
325 };
326
327
328 /**
329  * Map from peer identities to `struct GSF_ConnectPeer` entries.
330  */
331 static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
332
333 /**
334  * Handle to peerstore service.
335  */
336 static struct GNUNET_PEERSTORE_Handle *peerstore;
337
338 /**
339  * Task used to flush respect values to disk.
340  */
341 static struct GNUNET_SCHEDULER_Task *fr_task;
342
343
344 /**
345  * Update the latency information kept for the given peer.
346  *
347  * @param id peer record to update
348  * @param latency current latency value
349  */
350 void
351 GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
352                           struct GNUNET_TIME_Relative latency)
353 {
354   struct GSF_ConnectedPeer *cp;
355
356   cp = GSF_peer_get_ (id);
357   if (NULL == cp)
358     return; /* we're not yet connected at the core level, ignore */
359   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
360                                  latency);
361 }
362
363
364 /**
365  * Return the performance data record for the given peer
366  *
367  * @param cp peer to query
368  * @return performance data record for the peer
369  */
370 struct GSF_PeerPerformanceData *
371 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
372 {
373   return &cp->ppd;
374 }
375
376
377 /**
378  * Core is ready to transmit to a peer, get the message.
379  *
380  * @param cls the `struct GSF_PeerTransmitHandle` of the message
381  * @param size number of bytes core is willing to take
382  * @param buf where to copy the message
383  * @return number of bytes copied to @a buf
384  */
385 static size_t
386 peer_transmit_ready_cb (void *cls,
387                         size_t size,
388                         void *buf);
389
390
391 /**
392  * Function called by core upon success or failure of our bandwidth reservation request.
393  *
394  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
395  * @param peer identifies the peer
396  * @param amount set to the amount that was actually reserved or unreserved;
397  *               either the full requested amount or zero (no partial reservations)
398  * @param res_delay if the reservation could not be satisfied (amount was 0), how
399  *        long should the client wait until re-trying?
400  */
401 static void
402 ats_reserve_callback (void *cls,
403                       const struct GNUNET_PeerIdentity *peer,
404                       int32_t amount,
405                       struct GNUNET_TIME_Relative res_delay);
406
407
408 /**
409  * If ready (bandwidth reserved), try to schedule transmission via
410  * core for the given handle.
411  *
412  * @param pth transmission handle to schedule
413  */
414 static void
415 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
416 {
417   struct GSF_ConnectedPeer *cp;
418   struct GNUNET_PeerIdentity target;
419
420   cp = pth->cp;
421   if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
422     return;                     /* already done */
423   GNUNET_assert (0 != cp->ppd.pid);
424   GNUNET_PEER_resolve (cp->ppd.pid, &target);
425
426   if (0 != cp->inc_preference)
427   {
428     GNUNET_ATS_performance_change_preference (GSF_ats,
429                                               &target,
430                                               GNUNET_ATS_PREFERENCE_BANDWIDTH,
431                                               (double) cp->inc_preference,
432                                               GNUNET_ATS_PREFERENCE_END);
433     cp->inc_preference = 0;
434   }
435
436   if ( (GNUNET_YES == pth->is_query) &&
437        (GNUNET_YES != pth->was_reserved) )
438   {
439     /* query, need reservation */
440     if (GNUNET_YES != cp->did_reserve)
441       return;                   /* not ready */
442     cp->did_reserve = GNUNET_NO;
443     /* reservation already done! */
444     pth->was_reserved = GNUNET_YES;
445     cp->rc = GNUNET_ATS_reserve_bandwidth (GSF_ats,
446                                            &target,
447                                            DBLOCK_SIZE,
448                                            &ats_reserve_callback,
449                                            cp);
450     return;
451   }
452   GNUNET_assert (NULL == cp->cth);
453   cp->cth_in_progress++;
454   cp->cth =
455     GNUNET_CORE_notify_transmit_ready (GSF_core,
456                                        GNUNET_YES,
457                                        GNUNET_CORE_PRIO_BACKGROUND,
458                                        GNUNET_TIME_absolute_get_remaining (pth->timeout),
459                                        &target,
460                                        pth->size,
461                                        &peer_transmit_ready_cb, cp);
462   GNUNET_assert (NULL != cp->cth);
463   GNUNET_assert (0 < cp->cth_in_progress--);
464 }
465
466
467 /**
468  * Core is ready to transmit to a peer, get the message.
469  *
470  * @param cls the `struct GSF_PeerTransmitHandle` of the message
471  * @param size number of bytes core is willing to take
472  * @param buf where to copy the message
473  * @return number of bytes copied to @a buf
474  */
475 static size_t
476 peer_transmit_ready_cb (void *cls,
477                         size_t size,
478                         void *buf)
479 {
480   struct GSF_ConnectedPeer *cp = cls;
481   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
482   struct GSF_PeerTransmitHandle *pos;
483   size_t ret;
484
485   cp->cth = NULL;
486   if (NULL == pth)
487     return 0;
488   if (pth->size > size)
489   {
490     schedule_transmission (pth);
491     return 0;
492   }
493   if (NULL != pth->timeout_task)
494   {
495     GNUNET_SCHEDULER_cancel (pth->timeout_task);
496     pth->timeout_task = NULL;
497   }
498   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
499                                cp->pth_tail,
500                                pth);
501   if (GNUNET_YES == pth->is_query)
502   {
503     cp->ppd.last_request_times[(cp->last_request_times_off++) %
504                                MAX_QUEUE_PER_PEER] =
505       GNUNET_TIME_absolute_get ();
506     GNUNET_assert (0 < cp->ppd.pending_queries--);
507   }
508   else if (GNUNET_NO == pth->is_query)
509   {
510     GNUNET_assert (0 < cp->ppd.pending_replies--);
511   }
512   GNUNET_LOAD_update (cp->ppd.transmission_delay,
513                       GNUNET_TIME_absolute_get_duration
514                       (pth->transmission_request_start_time).rel_value_us);
515   ret = pth->gmc (pth->gmc_cls, size, buf);
516   if (NULL != (pos = cp->pth_head))
517   {
518     GNUNET_assert (pos != pth);
519     schedule_transmission (pos);
520   }
521   GNUNET_free (pth);
522   return ret;
523 }
524
525
526 /**
527  * (re)try to reserve bandwidth from the given peer.
528  *
529  * @param cls the `struct GSF_ConnectedPeer` to reserve from
530  */
531 static void
532 retry_reservation (void *cls)
533 {
534   struct GSF_ConnectedPeer *cp = cls;
535   struct GNUNET_PeerIdentity target;
536
537   GNUNET_PEER_resolve (cp->ppd.pid, &target);
538   cp->rc_delay_task = NULL;
539   cp->rc =
540     GNUNET_ATS_reserve_bandwidth (GSF_ats,
541                                   &target,
542                                   DBLOCK_SIZE,
543                                   &ats_reserve_callback, cp);
544 }
545
546
547 /**
548  * Function called by core upon success or failure of our bandwidth reservation request.
549  *
550  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
551  * @param peer identifies the peer
552  * @param amount set to the amount that was actually reserved or unreserved;
553  *               either the full requested amount or zero (no partial reservations)
554  * @param res_delay if the reservation could not be satisfied (amount was 0), how
555  *        long should the client wait until re-trying?
556  */
557 static void
558 ats_reserve_callback (void *cls,
559                       const struct GNUNET_PeerIdentity *peer,
560                       int32_t amount,
561                       struct GNUNET_TIME_Relative res_delay)
562 {
563   struct GSF_ConnectedPeer *cp = cls;
564   struct GSF_PeerTransmitHandle *pth;
565
566   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
567               "Reserved %d bytes / need to wait %s for reservation\n",
568               (int) amount,
569               GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
570   cp->rc = NULL;
571   if (0 == amount)
572   {
573     cp->rc_delay_task =
574         GNUNET_SCHEDULER_add_delayed (res_delay,
575                                       &retry_reservation,
576                                       cp);
577     return;
578   }
579   cp->did_reserve = GNUNET_YES;
580   pth = cp->pth_head;
581   if ( (NULL != pth) &&
582        (NULL == cp->cth) &&
583        (0 == cp->cth_in_progress) )
584   {
585     /* reservation success, try transmission now! */
586     cp->cth_in_progress++;
587     cp->cth =
588         GNUNET_CORE_notify_transmit_ready (GSF_core,
589                                            GNUNET_YES,
590                                            GNUNET_CORE_PRIO_BACKGROUND,
591                                            GNUNET_TIME_absolute_get_remaining (pth->timeout),
592                                            peer,
593                                            pth->size,
594                                            &peer_transmit_ready_cb,
595                                            cp);
596     GNUNET_assert (NULL != cp->cth);
597     GNUNET_assert (0 < cp->cth_in_progress--);
598   }
599 }
600
601
602 /**
603  * Function called by PEERSTORE with peer respect record
604  *
605  * @param cls handle to connected peer entry
606  * @param record peerstore record information
607  * @param emsg error message, or NULL if no errors
608  */
609 static void
610 peer_respect_cb (void *cls,
611                  const struct GNUNET_PEERSTORE_Record *record,
612                  const char *emsg)
613 {
614   struct GSF_ConnectedPeer *cp = cls;
615
616   GNUNET_assert (NULL != cp->respect_iterate_req);
617   if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size))
618     cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value);
619   GSF_push_start_ (cp);
620   if (NULL != cp->creation_cb)
621     cp->creation_cb (cp->creation_cb_cls, cp);
622   if (NULL != record)
623     GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
624   cp->respect_iterate_req = NULL;
625 }
626
627
628 /**
629  * A peer connected to us.  Setup the connected peer
630  * records.
631  *
632  * @param peer identity of peer that connected
633  * @param creation_cb callback function when the record is created.
634  * @param creation_cb_cls closure for @creation_cb
635  */
636 void
637 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
638                            GSF_ConnectedPeerCreationCallback creation_cb,
639                            void *creation_cb_cls)
640 {
641   struct GSF_ConnectedPeer *cp;
642
643   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
644               "Connected to peer %s\n",
645               GNUNET_i2s (peer));
646   cp = GNUNET_new (struct GSF_ConnectedPeer);
647   cp->ppd.pid = GNUNET_PEER_intern (peer);
648   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
649   cp->rc =
650       GNUNET_ATS_reserve_bandwidth (GSF_ats,
651                                     peer,
652                                     DBLOCK_SIZE,
653                                     &ats_reserve_callback, cp);
654   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128,
655                                                           GNUNET_YES);
656   GNUNET_break (GNUNET_OK ==
657                 GNUNET_CONTAINER_multipeermap_put (cp_map,
658                GSF_connected_peer_get_identity2_ (cp),
659                                                    cp,
660                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
661   GNUNET_STATISTICS_set (GSF_stats,
662                          gettext_noop ("# peers connected"),
663                          GNUNET_CONTAINER_multipeermap_size (cp_map),
664                          GNUNET_NO);
665   cp->creation_cb = creation_cb;
666   cp->creation_cb_cls = creation_cb_cls;
667   cp->respect_iterate_req =
668       GNUNET_PEERSTORE_iterate (peerstore, "fs",
669                                 peer, "respect",
670                                 GNUNET_TIME_UNIT_FOREVER_REL,
671                                 &peer_respect_cb,
672                                 cp);
673 }
674
675
676 /**
677  * It may be time to re-start migrating content to this
678  * peer.  Check, and if so, restart migration.
679  *
680  * @param cls the `struct GSF_ConnectedPeer`
681  */
682 static void
683 revive_migration (void *cls)
684 {
685   struct GSF_ConnectedPeer *cp = cls;
686   struct GNUNET_TIME_Relative bt;
687
688   cp->mig_revive_task = NULL;
689   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
690   if (0 != bt.rel_value_us)
691   {
692     /* still time left... */
693     cp->mig_revive_task =
694         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
695     return;
696   }
697   GSF_push_start_ (cp);
698 }
699
700
701 /**
702  * Get a handle for a connected peer.
703  *
704  * @param peer peer's identity
705  * @return NULL if the peer is not currently connected
706  */
707 struct GSF_ConnectedPeer *
708 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
709 {
710   if (NULL == cp_map)
711     return NULL;
712   return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
713 }
714
715
716 /**
717  * Handle P2P "MIGRATION_STOP" message.
718  *
719  * @param cls closure, always NULL
720  * @param other the other peer involved (sender or receiver, NULL
721  *        for loopback messages where we are both sender and receiver)
722  * @param message the actual message
723  * @return #GNUNET_OK to keep the connection open,
724  *         #GNUNET_SYSERR to close it (signal serious error)
725  */
726 int
727 GSF_handle_p2p_migration_stop_ (void *cls,
728                                 const struct GNUNET_PeerIdentity *other,
729                                 const struct GNUNET_MessageHeader *message)
730 {
731   struct GSF_ConnectedPeer *cp;
732   const struct MigrationStopMessage *msm;
733   struct GNUNET_TIME_Relative bt;
734
735   msm = (const struct MigrationStopMessage *) message;
736   cp = GSF_peer_get_ (other);
737   if (NULL == cp)
738   {
739     GNUNET_break (0);
740     return GNUNET_OK;
741   }
742   GNUNET_STATISTICS_update (GSF_stats,
743                             gettext_noop ("# migration stop messages received"),
744                             1, GNUNET_NO);
745   bt = GNUNET_TIME_relative_ntoh (msm->duration);
746   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
747               _("Migration of content to peer `%s' blocked for %s\n"),
748               GNUNET_i2s (other),
749               GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
750   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
751   if ( (NULL == cp->mig_revive_task) &&
752        (NULL == cp->respect_iterate_req) )
753   {
754     GSF_push_stop_ (cp);
755     cp->mig_revive_task =
756         GNUNET_SCHEDULER_add_delayed (bt,
757                                       &revive_migration, cp);
758   }
759   return GNUNET_OK;
760 }
761
762
763 /**
764  * Copy reply and free put message.
765  *
766  * @param cls the `struct PutMessage`
767  * @param buf_size number of bytes available in @a buf
768  * @param buf where to copy the message, NULL on error (peer disconnect)
769  * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
770  */
771 static size_t
772 copy_reply (void *cls,
773             size_t buf_size,
774             void *buf)
775 {
776   struct PutMessage *pm = cls;
777   size_t size;
778
779   if (NULL != buf)
780   {
781     GNUNET_assert (buf_size >= ntohs (pm->header.size));
782     size = ntohs (pm->header.size);
783     memcpy (buf, pm, size);
784     GNUNET_STATISTICS_update (GSF_stats,
785                               gettext_noop ("# replies transmitted to other peers"),
786                               1,
787                               GNUNET_NO);
788   }
789   else
790   {
791     size = 0;
792     GNUNET_STATISTICS_update (GSF_stats,
793                               gettext_noop ("# replies dropped"),
794                               1,
795                               GNUNET_NO);
796   }
797   GNUNET_free (pm);
798   return size;
799 }
800
801
802 /**
803  * Free resources associated with the given peer request.
804  *
805  * @param peerreq request to free
806  */
807 static void
808 free_pending_request (struct PeerRequest *peerreq)
809 {
810   struct GSF_ConnectedPeer *cp = peerreq->cp;
811   struct GSF_PendingRequestData *prd;
812
813   prd = GSF_pending_request_get_data_ (peerreq->pr);
814   if (NULL != peerreq->kill_task)
815   {
816     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
817     peerreq->kill_task = NULL;
818   }
819   GNUNET_STATISTICS_update (GSF_stats,
820                             gettext_noop ("# P2P searches active"),
821                             -1,
822                             GNUNET_NO);
823   GNUNET_break (GNUNET_YES ==
824                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
825                                                       &prd->query,
826                                                       peerreq));
827   GNUNET_free (peerreq);
828 }
829
830
831 /**
832  * Cancel all requests associated with the peer.
833  *
834  * @param cls unused
835  * @param query hash code of the request
836  * @param value the `struct GSF_PendingRequest`
837  * @return #GNUNET_YES (continue to iterate)
838  */
839 static int
840 cancel_pending_request (void *cls,
841                         const struct GNUNET_HashCode *query,
842                         void *value)
843 {
844   struct PeerRequest *peerreq = value;
845   struct GSF_PendingRequest *pr = peerreq->pr;
846
847   free_pending_request (peerreq);
848   GSF_pending_request_cancel_ (pr,
849                                GNUNET_NO);
850   return GNUNET_OK;
851 }
852
853
854 /**
855  * Free the given request.
856  *
857  * @param cls the request to free
858  */
859 static void
860 peer_request_destroy (void *cls)
861 {
862   struct PeerRequest *peerreq = cls;
863   struct GSF_PendingRequest *pr = peerreq->pr;
864   struct GSF_PendingRequestData *prd;
865
866   peerreq->kill_task = NULL;
867   prd = GSF_pending_request_get_data_ (pr);
868   cancel_pending_request (NULL,
869                           &prd->query,
870                           peerreq);
871 }
872
873
874 /**
875  * The artificial delay is over, transmit the message now.
876  *
877  * @param cls the `struct GSF_DelayedHandle` with the message
878  */
879 static void
880 transmit_delayed_now (void *cls)
881 {
882   struct GSF_DelayedHandle *dh = cls;
883   struct GSF_ConnectedPeer *cp = dh->cp;
884
885   GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
886                                cp->delayed_tail,
887                                dh);
888   cp->delay_queue_size--;
889   (void) GSF_peer_transmit_ (cp,
890                              GNUNET_NO,
891                              UINT32_MAX,
892                              REPLY_TIMEOUT,
893                              dh->msize,
894                              &copy_reply,
895                              dh->pm);
896   GNUNET_free (dh);
897 }
898
899
900 /**
901  * Get the randomized delay a response should be subjected to.
902  *
903  * @return desired delay
904  */
905 static struct GNUNET_TIME_Relative
906 get_randomized_delay ()
907 {
908   struct GNUNET_TIME_Relative ret;
909
910   ret =
911       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
912                                      GNUNET_CRYPTO_random_u32
913                                      (GNUNET_CRYPTO_QUALITY_WEAK,
914                                       2 * GSF_avg_latency.rel_value_us + 1));
915 #if INSANE_STATISTICS
916   GNUNET_STATISTICS_update (GSF_stats,
917                             gettext_noop
918                             ("# artificial delays introduced (ms)"),
919                             ret.rel_value_us / 1000LL, GNUNET_NO);
920 #endif
921   return ret;
922 }
923
924
925 /**
926  * Handle a reply to a pending request.  Also called if a request
927  * expires (then with data == NULL).  The handler may be called
928  * many times (depending on the request type), but will not be
929  * called during or after a call to GSF_pending_request_cancel
930  * and will also not be called anymore after a call signalling
931  * expiration.
932  *
933  * @param cls `struct PeerRequest` this is an answer for
934  * @param eval evaluation of the result
935  * @param pr handle to the original pending request
936  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
937  * @param expiration when does @a data expire?
938  * @param last_transmission when did we last transmit a request for this block
939  * @param type type of the block
940  * @param data response data, NULL on request expiration
941  * @param data_len number of bytes in @a data
942  */
943 static void
944 handle_p2p_reply (void *cls,
945                   enum GNUNET_BLOCK_EvaluationResult eval,
946                   struct GSF_PendingRequest *pr,
947                   uint32_t reply_anonymity_level,
948                   struct GNUNET_TIME_Absolute expiration,
949                   struct GNUNET_TIME_Absolute last_transmission,
950                   enum GNUNET_BLOCK_Type type,
951                   const void *data,
952                   size_t data_len)
953 {
954   struct PeerRequest *peerreq = cls;
955   struct GSF_ConnectedPeer *cp = peerreq->cp;
956   struct GSF_PendingRequestData *prd;
957   struct PutMessage *pm;
958   size_t msize;
959
960   GNUNET_assert (data_len + sizeof (struct PutMessage) <
961                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
962   GNUNET_assert (peerreq->pr == pr);
963   prd = GSF_pending_request_get_data_ (pr);
964   if (NULL == data)
965   {
966     free_pending_request (peerreq);
967     return;
968   }
969   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
970   if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
971   {
972     GNUNET_STATISTICS_update (GSF_stats,
973                               gettext_noop
974                               ("# replies dropped due to type mismatch"),
975                                 1, GNUNET_NO);
976     return;
977   }
978   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
979               "Transmitting result for query `%s' to peer\n",
980               GNUNET_h2s (&prd->query));
981   GNUNET_STATISTICS_update (GSF_stats,
982                             gettext_noop ("# replies received for other peers"),
983                             1, GNUNET_NO);
984   msize = sizeof (struct PutMessage) + data_len;
985   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
986   {
987     GNUNET_break (0);
988     return;
989   }
990   if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
991   {
992     if (reply_anonymity_level - 1 > GSF_cover_content_count)
993     {
994       GNUNET_STATISTICS_update (GSF_stats,
995                                 gettext_noop
996                                 ("# replies dropped due to insufficient cover traffic"),
997                                 1, GNUNET_NO);
998       return;
999     }
1000     GSF_cover_content_count -= (reply_anonymity_level - 1);
1001   }
1002
1003   pm = GNUNET_malloc (msize);
1004   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1005   pm->header.size = htons (msize);
1006   pm->type = htonl (type);
1007   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
1008   memcpy (&pm[1], data, data_len);
1009   if ( (UINT32_MAX != reply_anonymity_level) &&
1010        (0 != reply_anonymity_level) &&
1011        (GNUNET_YES == GSF_enable_randomized_delays) )
1012   {
1013     struct GSF_DelayedHandle *dh;
1014
1015     dh = GNUNET_new (struct GSF_DelayedHandle);
1016     dh->cp = cp;
1017     dh->pm = pm;
1018     dh->msize = msize;
1019     GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
1020                                  cp->delayed_tail,
1021                                  dh);
1022     cp->delay_queue_size++;
1023     dh->delay_task =
1024         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
1025                                       &transmit_delayed_now,
1026                                       dh);
1027   }
1028   else
1029   {
1030     (void) GSF_peer_transmit_ (cp,
1031                                GNUNET_NO,
1032                                UINT32_MAX,
1033                                REPLY_TIMEOUT,
1034                                msize,
1035                                &copy_reply,
1036                                pm);
1037   }
1038   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
1039     return;
1040   if (NULL == peerreq->kill_task)
1041   {
1042     GNUNET_STATISTICS_update (GSF_stats,
1043                               gettext_noop
1044                               ("# P2P searches destroyed due to ultimate reply"),
1045                               1,
1046                               GNUNET_NO);
1047     peerreq->kill_task =
1048         GNUNET_SCHEDULER_add_now (&peer_request_destroy,
1049                                   peerreq);
1050   }
1051 }
1052
1053
1054 /**
1055  * Increase the peer's respect by a value.
1056  *
1057  * @param cp which peer to change the respect value on
1058  * @param value is the int value by which the
1059  *  peer's credit is to be increased or decreased
1060  * @returns the actual change in respect (positive or negative)
1061  */
1062 static int
1063 change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
1064 {
1065   if (0 == value)
1066     return 0;
1067   GNUNET_assert (NULL != cp);
1068   if (value > 0)
1069   {
1070     if (cp->ppd.respect + value < cp->ppd.respect)
1071     {
1072       value = UINT32_MAX - cp->ppd.respect;
1073       cp->ppd.respect = UINT32_MAX;
1074     }
1075     else
1076       cp->ppd.respect += value;
1077   }
1078   else
1079   {
1080     if (cp->ppd.respect < -value)
1081     {
1082       value = -cp->ppd.respect;
1083       cp->ppd.respect = 0;
1084     }
1085     else
1086       cp->ppd.respect += value;
1087   }
1088   return value;
1089 }
1090
1091
1092 /**
1093  * We've received a request with the specified priority.  Bound it
1094  * according to how much we respect the given peer.
1095  *
1096  * @param prio_in requested priority
1097  * @param cp the peer making the request
1098  * @return effective priority
1099  */
1100 static int32_t
1101 bound_priority (uint32_t prio_in,
1102                 struct GSF_ConnectedPeer *cp)
1103 {
1104 #define N ((double)128.0)
1105   uint32_t ret;
1106   double rret;
1107   int ld;
1108
1109   ld = GSF_test_get_load_too_high_ (0);
1110   if (GNUNET_SYSERR == ld)
1111   {
1112 #if INSANE_STATISTICS
1113     GNUNET_STATISTICS_update (GSF_stats,
1114                               gettext_noop
1115                               ("# requests done for free (low load)"), 1,
1116                               GNUNET_NO);
1117 #endif
1118     return 0;                   /* excess resources */
1119   }
1120   if (prio_in > INT32_MAX)
1121     prio_in = INT32_MAX;
1122   ret = -change_peer_respect (cp, -(int) prio_in);
1123   if (ret > 0)
1124   {
1125     if (ret > GSF_current_priorities + N)
1126       rret = GSF_current_priorities + N;
1127     else
1128       rret = ret;
1129     GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1130   }
1131   if ((GNUNET_YES == ld) && (ret > 0))
1132   {
1133     /* try with charging */
1134     ld = GSF_test_get_load_too_high_ (ret);
1135   }
1136   if (GNUNET_YES == ld)
1137   {
1138     GNUNET_STATISTICS_update (GSF_stats,
1139                               gettext_noop
1140                               ("# request dropped, priority insufficient"), 1,
1141                               GNUNET_NO);
1142     /* undo charge */
1143     change_peer_respect (cp, (int) ret);
1144     return -1;                  /* not enough resources */
1145   }
1146   else
1147   {
1148     GNUNET_STATISTICS_update (GSF_stats,
1149                               gettext_noop
1150                               ("# requests done for a price (normal load)"), 1,
1151                               GNUNET_NO);
1152   }
1153 #undef N
1154   return ret;
1155 }
1156
1157
1158 /**
1159  * The priority level imposes a bound on the maximum
1160  * value for the ttl that can be requested.
1161  *
1162  * @param ttl_in requested ttl
1163  * @param prio given priority
1164  * @return @a ttl_in if @a ttl_in is below the limit,
1165  *         otherwise the ttl-limit for the given @a prio
1166  */
1167 static int32_t
1168 bound_ttl (int32_t ttl_in,
1169            uint32_t prio)
1170 {
1171   unsigned long long allowed;
1172
1173   if (ttl_in <= 0)
1174     return ttl_in;
1175   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1176   if (ttl_in > allowed)
1177   {
1178     if (allowed >= (1 << 30))
1179       return 1 << 30;
1180     return allowed;
1181   }
1182   return ttl_in;
1183 }
1184
1185
1186 /**
1187  * Closure for #test_exist_cb().
1188  */
1189 struct TestExistClosure
1190 {
1191
1192   /**
1193    * Priority of the incoming request.
1194    */
1195   int32_t priority;
1196
1197   /**
1198    * Relative TTL of the incoming request.
1199    */
1200   int32_t ttl;
1201
1202   /**
1203    * Type of the incoming request.
1204    */
1205   enum GNUNET_BLOCK_Type type;
1206
1207   /**
1208    * Set to #GNUNET_YES if we are done handling the query.
1209    */
1210   int finished;
1211
1212 };
1213
1214
1215 /**
1216  * Test if the query already exists.  If so, merge it, otherwise
1217  * keep `finished` at #GNUNET_NO.
1218  *
1219  * @param cls our `struct TestExistClosure`
1220  * @param hc the key of the query
1221  * @param value the existing `struct PeerRequest`.
1222  * @return #GNUNET_YES to continue to iterate,
1223  *         #GNUNET_NO if we successfully merged
1224  */
1225 static int
1226 test_exist_cb (void *cls,
1227                const struct GNUNET_HashCode *hc,
1228                void *value)
1229 {
1230   struct TestExistClosure *tec = cls;
1231   struct PeerRequest *peerreq = value;
1232   struct GSF_PendingRequest *pr;
1233   struct GSF_PendingRequestData *prd;
1234
1235   pr = peerreq->pr;
1236   prd = GSF_pending_request_get_data_ (pr);
1237   if (prd->type != tec->type)
1238     return GNUNET_YES;
1239   if (prd->ttl.abs_value_us >=
1240       GNUNET_TIME_absolute_get ().abs_value_us + tec->ttl * 1000LL)
1241   {
1242     /* existing request has higher TTL, drop new one! */
1243     prd->priority += tec->priority;
1244     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1245                 "Have existing request with higher TTL, dropping new request.\n");
1246     GNUNET_STATISTICS_update (GSF_stats,
1247                               gettext_noop
1248                               ("# requests dropped due to higher-TTL request"),
1249                               1, GNUNET_NO);
1250     tec->finished = GNUNET_YES;
1251     return GNUNET_NO;
1252   }
1253   /* existing request has lower TTL, drop old one! */
1254   tec->priority += prd->priority;
1255   free_pending_request (peerreq);
1256   GSF_pending_request_cancel_ (pr,
1257                                GNUNET_YES);
1258   return GNUNET_NO;
1259 }
1260
1261
1262 /**
1263  * Handle P2P "QUERY" message.  Creates the pending request entry
1264  * and sets up all of the data structures to that we will
1265  * process replies properly.  Does not initiate forwarding or
1266  * local database lookups.
1267  *
1268  * @param other the other peer involved (sender or receiver, NULL
1269  *        for loopback messages where we are both sender and receiver)
1270  * @param message the actual message
1271  * @return pending request handle, NULL on error
1272  */
1273 struct GSF_PendingRequest *
1274 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1275                        const struct GNUNET_MessageHeader *message)
1276 {
1277   struct PeerRequest *peerreq;
1278   struct GSF_PendingRequest *pr;
1279   struct GSF_ConnectedPeer *cp;
1280   struct GSF_ConnectedPeer *cps;
1281   const struct GNUNET_PeerIdentity *target;
1282   enum GSF_PendingRequestOptions options;
1283   uint16_t msize;
1284   const struct GetMessage *gm;
1285   unsigned int bits;
1286   const struct GNUNET_PeerIdentity *opt;
1287   uint32_t bm;
1288   size_t bfsize;
1289   uint32_t ttl_decrement;
1290   struct TestExistClosure tec;
1291   GNUNET_PEER_Id spid;
1292   const struct GSF_PendingRequestData *prd;
1293
1294   msize = ntohs (message->size);
1295   if (msize < sizeof (struct GetMessage))
1296   {
1297     GNUNET_break_op (0);
1298     return NULL;
1299   }
1300   GNUNET_STATISTICS_update (GSF_stats,
1301                             gettext_noop
1302                             ("# GET requests received (from other peers)"),
1303                             1,
1304                             GNUNET_NO);
1305   gm = (const struct GetMessage *) message;
1306   tec.type = ntohl (gm->type);
1307   bm = ntohl (gm->hash_bitmap);
1308   bits = 0;
1309   while (bm > 0)
1310   {
1311     if (1 == (bm & 1))
1312       bits++;
1313     bm >>= 1;
1314   }
1315   if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
1316   {
1317     GNUNET_break_op (0);
1318     return NULL;
1319   }
1320   opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1321   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
1322   /* bfsize must be power of 2, check! */
1323   if (0 != ((bfsize - 1) & bfsize))
1324   {
1325     GNUNET_break_op (0);
1326     return NULL;
1327   }
1328   GSF_cover_query_count++;
1329   bm = ntohl (gm->hash_bitmap);
1330   bits = 0;
1331   cps = GSF_peer_get_ (other);
1332   if (NULL == cps)
1333   {
1334     /* peer must have just disconnected */
1335     GNUNET_STATISTICS_update (GSF_stats,
1336                               gettext_noop
1337                               ("# requests dropped due to initiator not being connected"),
1338                               1, GNUNET_NO);
1339     return NULL;
1340   }
1341   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1342     cp = GSF_peer_get_ (&opt[bits++]);
1343   else
1344     cp = cps;
1345   if (NULL == cp)
1346   {
1347     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1348       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349                   "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1350                   GNUNET_i2s (&opt[bits - 1]));
1351
1352     else
1353       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1354                   "Failed to find peer `%s' in connection set. Dropping query.\n",
1355                   GNUNET_i2s (other));
1356     GNUNET_STATISTICS_update (GSF_stats,
1357                               gettext_noop
1358                               ("# requests dropped due to missing reverse route"),
1359                               1,
1360                               GNUNET_NO);
1361     return NULL;
1362   }
1363   if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER)
1364   {
1365     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366                 "Peer `%s' has too many replies queued already. Dropping query.\n",
1367                 GNUNET_i2s (other));
1368     GNUNET_STATISTICS_update (GSF_stats,
1369                               gettext_noop ("# requests dropped due to full reply queue"),
1370                               1,
1371                               GNUNET_NO);
1372     return NULL;
1373   }
1374   /* note that we can really only check load here since otherwise
1375    * peers could find out that we are overloaded by not being
1376    * disconnected after sending us a malformed query... */
1377   tec.priority = bound_priority (ntohl (gm->priority),
1378                                  cps);
1379   if (tec.priority < 0)
1380   {
1381     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1382                 "Dropping query from `%s', this peer is too busy.\n",
1383                 GNUNET_i2s (other));
1384     return NULL;
1385   }
1386   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1387               "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1388               GNUNET_h2s (&gm->query),
1389               (unsigned int) tec.type,
1390               GNUNET_i2s (other),
1391               (unsigned int) bm);
1392   target =
1393       (0 !=
1394        (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1395   options = GSF_PRO_DEFAULTS;
1396   spid = 0;
1397   if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + tec.priority))
1398       || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1399           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
1400           GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1401   {
1402     /* don't have BW to send to peer, or would likely take longer than we have for it,
1403      * so at best indirect the query */
1404     tec.priority = 0;
1405     options |= GSF_PRO_FORWARD_ONLY;
1406     spid = GNUNET_PEER_intern (other);
1407     GNUNET_assert (0 != spid);
1408   }
1409   tec.ttl = bound_ttl (ntohl (gm->ttl),
1410                        tec.priority);
1411   /* decrement ttl (always) */
1412   ttl_decrement =
1413       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1414                                                     TTL_DECREMENT);
1415   if ((tec.ttl < 0) && (((int32_t) (tec.ttl - ttl_decrement)) > 0))
1416   {
1417     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1418                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1419                 GNUNET_i2s (other),
1420                 tec.ttl,
1421                 ttl_decrement);
1422     GNUNET_STATISTICS_update (GSF_stats,
1423                               gettext_noop
1424                               ("# requests dropped due TTL underflow"), 1,
1425                               GNUNET_NO);
1426     /* integer underflow => drop (should be very rare)! */
1427     return NULL;
1428   }
1429   tec.ttl -= ttl_decrement;
1430
1431   /* test if the request already exists */
1432   tec.finished = GNUNET_NO;
1433   GNUNET_CONTAINER_multihashmap_get_multiple (cp->request_map,
1434                                               &gm->query,
1435                                               &test_exist_cb,
1436                                               &tec);
1437   if (GNUNET_YES == tec.finished)
1438     return NULL; /* merged into existing request, we're done */
1439
1440   peerreq = GNUNET_new (struct PeerRequest);
1441   peerreq->cp = cp;
1442   pr = GSF_pending_request_create_ (options,
1443                                     tec.type,
1444                                     &gm->query,
1445                                     target,
1446                                     (bfsize > 0)
1447                                     ? (const char *) &opt[bits]
1448                                     : NULL,
1449                                     bfsize,
1450                                     ntohl (gm->filter_mutator),
1451                                     1 /* anonymity */,
1452                                     (uint32_t) tec.priority,
1453                                     tec.ttl,
1454                                     spid,
1455                                     GNUNET_PEER_intern (other),
1456                                     NULL, 0,        /* replies_seen */
1457                                     &handle_p2p_reply,
1458                                     peerreq);
1459   GNUNET_assert (NULL != pr);
1460   prd = GSF_pending_request_get_data_ (pr);
1461   peerreq->pr = pr;
1462   GNUNET_break (GNUNET_OK ==
1463                 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1464                                                    &prd->query,
1465                                                    peerreq,
1466                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1467   GNUNET_STATISTICS_update (GSF_stats,
1468                             gettext_noop ("# P2P query messages received and processed"),
1469                             1,
1470                             GNUNET_NO);
1471   GNUNET_STATISTICS_update (GSF_stats,
1472                             gettext_noop ("# P2P searches active"),
1473                             1,
1474                             GNUNET_NO);
1475   return pr;
1476 }
1477
1478
1479 /**
1480  * Function called if there has been a timeout trying to satisfy
1481  * a transmission request.
1482  *
1483  * @param cls the `struct GSF_PeerTransmitHandle` of the request
1484  */
1485 static void
1486 peer_transmit_timeout (void *cls)
1487 {
1488   struct GSF_PeerTransmitHandle *pth = cls;
1489   struct GSF_ConnectedPeer *cp;
1490
1491   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1492               "Timeout trying to transmit to other peer\n");
1493   pth->timeout_task = NULL;
1494   cp = pth->cp;
1495   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1496                                cp->pth_tail,
1497                                pth);
1498   if (GNUNET_YES == pth->is_query)
1499     GNUNET_assert (0 < cp->ppd.pending_queries--);
1500   else if (GNUNET_NO == pth->is_query)
1501     GNUNET_assert (0 < cp->ppd.pending_replies--);
1502   GNUNET_LOAD_update (cp->ppd.transmission_delay,
1503                       UINT64_MAX);
1504   if (NULL != cp->cth)
1505   {
1506     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1507     cp->cth = NULL;
1508   }
1509   pth->gmc (pth->gmc_cls, 0, NULL);
1510   GNUNET_assert (0 == cp->cth_in_progress);
1511   GNUNET_free (pth);
1512 }
1513
1514
1515 /**
1516  * Transmit a message to the given peer as soon as possible.
1517  * If the peer disconnects before the transmission can happen,
1518  * the callback is invoked with a `NULL` @a buffer.
1519  *
1520  * @param cp target peer
1521  * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1522  * @param priority how important is this request?
1523  * @param timeout when does this request timeout (call gmc with error)
1524  * @param size number of bytes we would like to send to the peer
1525  * @param gmc function to call to get the message
1526  * @param gmc_cls closure for @a gmc
1527  * @return handle to cancel request
1528  */
1529 struct GSF_PeerTransmitHandle *
1530 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1531                     int is_query,
1532                     uint32_t priority,
1533                     struct GNUNET_TIME_Relative timeout,
1534                     size_t size,
1535                     GSF_GetMessageCallback gmc, void *gmc_cls)
1536 {
1537   struct GSF_PeerTransmitHandle *pth;
1538   struct GSF_PeerTransmitHandle *pos;
1539   struct GSF_PeerTransmitHandle *prev;
1540
1541   pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1542   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1543   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1544   pth->gmc = gmc;
1545   pth->gmc_cls = gmc_cls;
1546   pth->size = size;
1547   pth->is_query = is_query;
1548   pth->priority = priority;
1549   pth->cp = cp;
1550   /* insertion sort (by priority, descending) */
1551   prev = NULL;
1552   pos = cp->pth_head;
1553   while ((NULL != pos) && (pos->priority > priority))
1554   {
1555     prev = pos;
1556     pos = pos->next;
1557   }
1558   GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1559                                      cp->pth_tail,
1560                                      prev,
1561                                      pth);
1562   if (GNUNET_YES == is_query)
1563     cp->ppd.pending_queries++;
1564   else if (GNUNET_NO == is_query)
1565     cp->ppd.pending_replies++;
1566   pth->timeout_task
1567     = GNUNET_SCHEDULER_add_delayed (timeout,
1568                                     &peer_transmit_timeout,
1569                                     pth);
1570   schedule_transmission (pth);
1571   return pth;
1572 }
1573
1574
1575 /**
1576  * Cancel an earlier request for transmission.
1577  *
1578  * @param pth request to cancel
1579  */
1580 void
1581 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1582 {
1583   struct GSF_ConnectedPeer *cp;
1584
1585   if (NULL != pth->timeout_task)
1586   {
1587     GNUNET_SCHEDULER_cancel (pth->timeout_task);
1588     pth->timeout_task = NULL;
1589   }
1590   cp = pth->cp;
1591   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1592                                cp->pth_tail,
1593                                pth);
1594   if (GNUNET_YES == pth->is_query)
1595     GNUNET_assert (0 < cp->ppd.pending_queries--);
1596   else if (GNUNET_NO == pth->is_query)
1597     GNUNET_assert (0 < cp->ppd.pending_replies--);
1598   GNUNET_free (pth);
1599 }
1600
1601
1602 /**
1603  * Report on receiving a reply; update the performance record of the given peer.
1604  *
1605  * @param cp responding peer (will be updated)
1606  * @param request_time time at which the original query was transmitted
1607  * @param request_priority priority of the original request
1608  */
1609 void
1610 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1611                               struct GNUNET_TIME_Absolute request_time,
1612                               uint32_t request_priority)
1613 {
1614   struct GNUNET_TIME_Relative delay;
1615
1616   delay = GNUNET_TIME_absolute_get_duration (request_time);
1617   cp->ppd.avg_reply_delay.rel_value_us =
1618       (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
1619        delay.rel_value_us) / RUNAVG_DELAY_N;
1620   cp->ppd.avg_priority =
1621       (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1622        request_priority) / RUNAVG_DELAY_N;
1623 }
1624
1625
1626 /**
1627  * Report on receiving a reply in response to an initiating client.
1628  * Remember that this peer is good for this client.
1629  *
1630  * @param cp responding peer (will be updated)
1631  * @param initiator_client local client on responsible for query
1632  */
1633 void
1634 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1635                                    struct GSF_LocalClient *initiator_client)
1636 {
1637   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1638                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1639 }
1640
1641
1642 /**
1643  * Report on receiving a reply in response to an initiating peer.
1644  * Remember that this peer is good for this initiating peer.
1645  *
1646  * @param cp responding peer (will be updated)
1647  * @param initiator_peer other peer responsible for query
1648  */
1649 void
1650 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1651                                  const struct GSF_ConnectedPeer *initiator_peer)
1652 {
1653   unsigned int woff;
1654
1655   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1656   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1657   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1658   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1659   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1660 }
1661
1662
1663 /**
1664  * Write peer-respect information to a file - flush the buffer entry!
1665  *
1666  * @param cls unused
1667  * @param key peer identity
1668  * @param value the `struct GSF_ConnectedPeer` to flush
1669  * @return #GNUNET_OK to continue iteration
1670  */
1671 static int
1672 flush_respect (void *cls,
1673                const struct GNUNET_PeerIdentity *key,
1674                void *value)
1675 {
1676   struct GSF_ConnectedPeer *cp = value;
1677   struct GNUNET_PeerIdentity pid;
1678
1679   if (cp->ppd.respect == cp->disk_respect)
1680     return GNUNET_OK;           /* unchanged */
1681   GNUNET_assert (0 != cp->ppd.pid);
1682   GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1683   GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1684                           sizeof (cp->ppd.respect),
1685                           GNUNET_TIME_UNIT_FOREVER_ABS,
1686                           GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL);
1687   return GNUNET_OK;
1688 }
1689
1690
1691 /**
1692  * A peer disconnected from us.  Tear down the connected peer
1693  * record.
1694  *
1695  * @param cls unused
1696  * @param peer identity of peer that connected
1697  */
1698 void
1699 GSF_peer_disconnect_handler_ (void *cls,
1700                               const struct GNUNET_PeerIdentity *peer)
1701 {
1702   struct GSF_ConnectedPeer *cp;
1703   struct GSF_PeerTransmitHandle *pth;
1704   struct GSF_DelayedHandle *dh;
1705
1706   cp = GSF_peer_get_ (peer);
1707   if (NULL == cp)
1708     return;                     /* must have been disconnect from core with
1709                                  * 'peer' == my_id, ignore */
1710   flush_respect (NULL, peer, cp);
1711   GNUNET_assert (GNUNET_YES ==
1712                  GNUNET_CONTAINER_multipeermap_remove (cp_map,
1713                                                        peer,
1714                                                        cp));
1715   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1716                          GNUNET_CONTAINER_multipeermap_size (cp_map),
1717                          GNUNET_NO);
1718   if (NULL != cp->respect_iterate_req)
1719   {
1720     GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1721     cp->respect_iterate_req = NULL;
1722   }
1723   if (NULL != cp->migration_pth)
1724   {
1725     GSF_peer_transmit_cancel_ (cp->migration_pth);
1726     cp->migration_pth = NULL;
1727   }
1728   if (NULL != cp->rc)
1729   {
1730     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1731     cp->rc = NULL;
1732   }
1733   if (NULL != cp->rc_delay_task)
1734   {
1735     GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1736     cp->rc_delay_task = NULL;
1737   }
1738   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1739                                          &cancel_pending_request,
1740                                          cp);
1741   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1742   cp->request_map = NULL;
1743   GSF_plan_notify_peer_disconnect_ (cp);
1744   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1745   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies,
1746                              P2P_SUCCESS_LIST_SIZE);
1747   memset (cp->ppd.last_p2p_replies,
1748           0,
1749           sizeof (cp->ppd.last_p2p_replies));
1750   GSF_push_stop_ (cp);
1751   if (NULL != cp->cth)
1752   {
1753     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1754     cp->cth = NULL;
1755   }
1756   GNUNET_assert (0 == cp->cth_in_progress);
1757   while (NULL != (pth = cp->pth_head))
1758   {
1759     if (pth->timeout_task != NULL)
1760     {
1761       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1762       pth->timeout_task = NULL;
1763     }
1764     GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1765                                  cp->pth_tail,
1766                                  pth);
1767     if (GNUNET_YES == pth->is_query)
1768       GNUNET_assert (0 < cp->ppd.pending_queries--);
1769     else if (GNUNET_NO == pth->is_query)
1770       GNUNET_assert (0 < cp->ppd.pending_replies--);
1771     pth->gmc (pth->gmc_cls, 0, NULL);
1772     GNUNET_free (pth);
1773   }
1774   while (NULL != (dh = cp->delayed_head))
1775   {
1776     GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1777                                  cp->delayed_tail,
1778                                  dh);
1779     cp->delay_queue_size--;
1780     GNUNET_SCHEDULER_cancel (dh->delay_task);
1781     GNUNET_free (dh->pm);
1782     GNUNET_free (dh);
1783   }
1784   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1785   if (NULL != cp->mig_revive_task)
1786   {
1787     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1788     cp->mig_revive_task = NULL;
1789   }
1790   GNUNET_break (0 == cp->ppd.pending_queries);
1791   GNUNET_break (0 == cp->ppd.pending_replies);
1792   GNUNET_free (cp);
1793 }
1794
1795
1796 /**
1797  * Closure for #call_iterator().
1798  */
1799 struct IterationContext
1800 {
1801   /**
1802    * Function to call on each entry.
1803    */
1804   GSF_ConnectedPeerIterator it;
1805
1806   /**
1807    * Closure for @e it.
1808    */
1809   void *it_cls;
1810 };
1811
1812
1813 /**
1814  * Function that calls the callback for each peer.
1815  *
1816  * @param cls the `struct IterationContext *`
1817  * @param key identity of the peer
1818  * @param value the `struct GSF_ConnectedPeer *`
1819  * @return #GNUNET_YES to continue iteration
1820  */
1821 static int
1822 call_iterator (void *cls,
1823                const struct GNUNET_PeerIdentity *key,
1824                void *value)
1825 {
1826   struct IterationContext *ic = cls;
1827   struct GSF_ConnectedPeer *cp = value;
1828
1829   ic->it (ic->it_cls,
1830           key, cp,
1831           &cp->ppd);
1832   return GNUNET_YES;
1833 }
1834
1835
1836 /**
1837  * Iterate over all connected peers.
1838  *
1839  * @param it function to call for each peer
1840  * @param it_cls closure for @a it
1841  */
1842 void
1843 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1844                               void *it_cls)
1845 {
1846   struct IterationContext ic;
1847
1848   ic.it = it;
1849   ic.it_cls = it_cls;
1850   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1851                                          &call_iterator,
1852                                          &ic);
1853 }
1854
1855
1856 /**
1857  * Obtain the identity of a connected peer.
1858  *
1859  * @param cp peer to get identity of
1860  * @param id identity to set (written to)
1861  */
1862 void
1863 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1864                                   struct GNUNET_PeerIdentity *id)
1865 {
1866   GNUNET_assert (0 != cp->ppd.pid);
1867   GNUNET_PEER_resolve (cp->ppd.pid, id);
1868 }
1869
1870
1871 /**
1872  * Obtain the identity of a connected peer.
1873  *
1874  * @param cp peer to get identity of
1875  * @return reference to peer identity, valid until peer disconnects (!)
1876  */
1877 const struct GNUNET_PeerIdentity *
1878 GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1879 {
1880   GNUNET_assert (0 != cp->ppd.pid);
1881   return GNUNET_PEER_resolve2 (cp->ppd.pid);
1882 }
1883
1884
1885 /**
1886  * Assemble a migration stop message for transmission.
1887  *
1888  * @param cls the `struct GSF_ConnectedPeer` to use
1889  * @param size number of bytes we're allowed to write to @a buf
1890  * @param buf where to copy the message
1891  * @return number of bytes copied to @a buf
1892  */
1893 static size_t
1894 create_migration_stop_message (void *cls,
1895                                size_t size,
1896                                void *buf)
1897 {
1898   struct GSF_ConnectedPeer *cp = cls;
1899   struct MigrationStopMessage msm;
1900
1901   cp->migration_pth = NULL;
1902   if (NULL == buf)
1903     return 0;
1904   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1905   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1906   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1907   msm.reserved = htonl (0);
1908   msm.duration =
1909       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1910                                  (cp->last_migration_block));
1911   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1912   GNUNET_STATISTICS_update (GSF_stats,
1913                             gettext_noop ("# migration stop messages sent"),
1914                             1, GNUNET_NO);
1915   return sizeof (struct MigrationStopMessage);
1916 }
1917
1918
1919 /**
1920  * Ask a peer to stop migrating data to us until the given point
1921  * in time.
1922  *
1923  * @param cp peer to ask
1924  * @param block_time until when to block
1925  */
1926 void
1927 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1928                            struct GNUNET_TIME_Absolute block_time)
1929 {
1930   if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1931   {
1932     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1933                 "Migration already blocked for another %s\n",
1934                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
1935                                                         (cp->last_migration_block), GNUNET_YES));
1936     return;                     /* already blocked */
1937   }
1938   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1939               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
1940                                                       GNUNET_YES));
1941   cp->last_migration_block = block_time;
1942   if (NULL != cp->migration_pth)
1943     GSF_peer_transmit_cancel_ (cp->migration_pth);
1944   cp->migration_pth =
1945       GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1946                           GNUNET_TIME_UNIT_FOREVER_REL,
1947                           sizeof (struct MigrationStopMessage),
1948                           &create_migration_stop_message, cp);
1949 }
1950
1951
1952 /**
1953  * Notify core about a preference we have for the given peer
1954  * (to allocate more resources towards it).  The change will
1955  * be communicated the next time we reserve bandwidth with
1956  * core (not instantly).
1957  *
1958  * @param cp peer to reserve bandwidth from
1959  * @param pref preference change
1960  */
1961 void
1962 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1963                                        uint64_t pref)
1964 {
1965   cp->inc_preference += pref;
1966 }
1967
1968
1969 /**
1970  * Call this method periodically to flush respect information to disk.
1971  *
1972  * @param cls closure, not used
1973  */
1974 static void
1975 cron_flush_respect (void *cls)
1976 {
1977   fr_task = NULL;
1978   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1979                                          &flush_respect,
1980                                          NULL);
1981   fr_task = GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1982                                                         GNUNET_SCHEDULER_PRIORITY_HIGH,
1983                                                         &cron_flush_respect, NULL);
1984 }
1985
1986
1987 /**
1988  * Initialize peer management subsystem.
1989  */
1990 void
1991 GSF_connected_peer_init_ ()
1992 {
1993   cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1994   peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
1995   fr_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1996                                                 &cron_flush_respect, NULL);
1997 }
1998
1999
2000 /**
2001  * Iterator to free peer entries.
2002  *
2003  * @param cls closure, unused
2004  * @param key current key code
2005  * @param value value in the hash map (peer entry)
2006  * @return #GNUNET_YES (we should continue to iterate)
2007  */
2008 static int
2009 clean_peer (void *cls,
2010             const struct GNUNET_PeerIdentity *key,
2011             void *value)
2012 {
2013   GSF_peer_disconnect_handler_ (NULL, key);
2014   return GNUNET_YES;
2015 }
2016
2017
2018 /**
2019  * Shutdown peer management subsystem.
2020  */
2021 void
2022 GSF_connected_peer_done_ ()
2023 {
2024   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
2025                                          &flush_respect,
2026                                          NULL);
2027   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
2028                                          &clean_peer,
2029                                          NULL);
2030   GNUNET_SCHEDULER_cancel (fr_task);
2031   fr_task = NULL;
2032   GNUNET_CONTAINER_multipeermap_destroy (cp_map);
2033   cp_map = NULL;
2034   GNUNET_PEERSTORE_disconnect (peerstore,
2035                                GNUNET_YES);
2036   
2037 }
2038
2039
2040 /**
2041  * Iterator to remove references to LC entry.
2042  *
2043  * @param cls the `struct GSF_LocalClient *` to look for
2044  * @param key current key code
2045  * @param value value in the hash map (peer entry)
2046  * @return #GNUNET_YES (we should continue to iterate)
2047  */
2048 static int
2049 clean_local_client (void *cls,
2050                     const struct GNUNET_PeerIdentity *key,
2051                     void *value)
2052 {
2053   const struct GSF_LocalClient *lc = cls;
2054   struct GSF_ConnectedPeer *cp = value;
2055   unsigned int i;
2056
2057   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
2058     if (cp->ppd.last_client_replies[i] == lc)
2059       cp->ppd.last_client_replies[i] = NULL;
2060   return GNUNET_YES;
2061 }
2062
2063
2064 /**
2065  * Notification that a local client disconnected.  Clean up all of our
2066  * references to the given handle.
2067  *
2068  * @param lc handle to the local client (henceforth invalid)
2069  */
2070 void
2071 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
2072 {
2073   if (NULL == cp_map)
2074     return;                     /* already cleaned up */
2075   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client,
2076                                          (void *) lc);
2077 }
2078
2079
2080 /* end of gnunet-service-fs_cp.c */