-doxygen and indentation
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file fs/gnunet-service-fs_cp.c
22  * @brief API to handle 'connected peers'
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_pe.h"
31 #include "gnunet-service-fs_pr.h"
32 #include "gnunet-service-fs_push.h"
33 #include "gnunet_peerstore_service.h"
34
35
36 /**
37  * Ratio for moving average delay calculation.  The previous
38  * average goes in with a factor of (n-1) into the calculation.
39  * Must be > 0.
40  */
41 #define RUNAVG_DELAY_N 16
42
43 /**
44  * How often do we flush respect values to disk?
45  */
46 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
47
48 /**
49  * After how long do we discard a reply?
50  */
51 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
52
53 /**
54  * Collect an instane number of statistics?  May cause excessive IPC.
55  */
56 #define INSANE_STATISTICS GNUNET_NO
57
58
59 /**
60  * Handle to cancel a transmission request.
61  */
62 struct GSF_PeerTransmitHandle
63 {
64
65   /**
66    * Kept in a doubly-linked list.
67    */
68   struct GSF_PeerTransmitHandle *next;
69
70   /**
71    * Kept in a doubly-linked list.
72    */
73   struct GSF_PeerTransmitHandle *prev;
74
75   /**
76    * Time when this transmission request was issued.
77    */
78   struct GNUNET_TIME_Absolute transmission_request_start_time;
79
80   /**
81    * Timeout for this request.
82    */
83   struct GNUNET_TIME_Absolute timeout;
84
85   /**
86    * Task called on timeout, or 0 for none.
87    */
88   struct GNUNET_SCHEDULER_Task *timeout_task;
89
90   /**
91    * Function to call to get the actual message.
92    */
93   GSF_GetMessageCallback gmc;
94
95   /**
96    * Peer this request targets.
97    */
98   struct GSF_ConnectedPeer *cp;
99
100   /**
101    * Closure for @e gmc.
102    */
103   void *gmc_cls;
104
105   /**
106    * Size of the message to be transmitted.
107    */
108   size_t size;
109
110   /**
111    * #GNUNET_YES if this is a query, #GNUNET_NO for content.
112    */
113   int is_query;
114
115   /**
116    * Did we get a reservation already?
117    */
118   int was_reserved;
119
120   /**
121    * Priority of this request.
122    */
123   uint32_t priority;
124
125 };
126
127
128 /**
129  * Handle for an entry in our delay list.
130  */
131 struct GSF_DelayedHandle
132 {
133
134   /**
135    * Kept in a doubly-linked list.
136    */
137   struct GSF_DelayedHandle *next;
138
139   /**
140    * Kept in a doubly-linked list.
141    */
142   struct GSF_DelayedHandle *prev;
143
144   /**
145    * Peer this transmission belongs to.
146    */
147   struct GSF_ConnectedPeer *cp;
148
149   /**
150    * The PUT that was delayed.
151    */
152   struct PutMessage *pm;
153
154   /**
155    * Task for the delay.
156    */
157   struct GNUNET_SCHEDULER_Task *delay_task;
158
159   /**
160    * Size of the message.
161    */
162   size_t msize;
163
164 };
165
166
167 /**
168  * Information per peer and request.
169  */
170 struct PeerRequest
171 {
172
173   /**
174    * Handle to generic request.
175    */
176   struct GSF_PendingRequest *pr;
177
178   /**
179    * Handle to specific peer.
180    */
181   struct GSF_ConnectedPeer *cp;
182
183   /**
184    * Task for asynchronous stopping of this request.
185    */
186   struct GNUNET_SCHEDULER_Task *kill_task;
187
188 };
189
190
191 /**
192  * A connected peer.
193  */
194 struct GSF_ConnectedPeer
195 {
196
197   /**
198    * Performance data for this peer.
199    */
200   struct GSF_PeerPerformanceData ppd;
201
202   /**
203    * Time until when we blocked this peer from migrating
204    * data to us.
205    */
206   struct GNUNET_TIME_Absolute last_migration_block;
207
208   /**
209    * Task scheduled to revive migration to this peer.
210    */
211   struct GNUNET_SCHEDULER_Task *mig_revive_task;
212
213   /**
214    * Messages (replies, queries, content migration) we would like to
215    * send to this peer in the near future.  Sorted by priority, head.
216    */
217   struct GSF_PeerTransmitHandle *pth_head;
218
219   /**
220    * Messages (replies, queries, content migration) we would like to
221    * send to this peer in the near future.  Sorted by priority, tail.
222    */
223   struct GSF_PeerTransmitHandle *pth_tail;
224
225   /**
226    * Messages (replies, queries, content migration) we would like to
227    * send to this peer in the near future.  Sorted by priority, head.
228    */
229   struct GSF_DelayedHandle *delayed_head;
230
231   /**
232    * Messages (replies, queries, content migration) we would like to
233    * send to this peer in the near future.  Sorted by priority, tail.
234    */
235   struct GSF_DelayedHandle *delayed_tail;
236
237   /**
238    * Migration stop message in our queue, or NULL if we have none pending.
239    */
240   struct GSF_PeerTransmitHandle *migration_pth;
241
242   /**
243    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
244    */
245   struct GNUNET_ATS_ReservationContext *rc;
246
247   /**
248    * Task scheduled if we need to retry bandwidth reservation later.
249    */
250   struct GNUNET_SCHEDULER_Task *rc_delay_task;
251
252   /**
253    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
254    */
255   struct GNUNET_CONTAINER_MultiHashMap *request_map;
256
257   /**
258    * Handle for an active request for transmission to this
259    * peer, or NULL (if core queue was full).
260    */
261   struct GNUNET_CORE_TransmitHandle *cth;
262
263   /**
264    * Increase in traffic preference still to be submitted
265    * to the core service for this peer.
266    */
267   uint64_t inc_preference;
268
269   /**
270    * Set to 1 if we're currently in the process of calling
271    * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
272    * NULL, we should not call notify_transmit_ready for this
273    * handle right now).
274    */
275   unsigned int cth_in_progress;
276
277   /**
278    * Number of entries in @e delayed_head DLL.
279    */
280   unsigned int delay_queue_size;
281
282   /**
283    * Respect rating for this peer on disk.
284    */
285   uint32_t disk_respect;
286
287   /**
288    * Which offset in @e last_p2p_replies will be updated next?
289    * (we go round-robin).
290    */
291   unsigned int last_p2p_replies_woff;
292
293   /**
294    * Which offset in @e last_client_replies will be updated next?
295    * (we go round-robin).
296    */
297   unsigned int last_client_replies_woff;
298
299   /**
300    * Current offset into @e last_request_times ring buffer.
301    */
302   unsigned int last_request_times_off;
303
304   /**
305    * #GNUNET_YES if we did successfully reserve 32k bandwidth,
306    * #GNUNET_NO if not.
307    */
308   int did_reserve;
309
310   /**
311    * Function called when the creation of this record is complete.
312    */
313   GSF_ConnectedPeerCreationCallback creation_cb;
314
315   /**
316    * Closure for @e creation_cb
317    */
318   void *creation_cb_cls;
319
320   /**
321    * Handle to the PEERSTORE iterate request for peer respect value
322    */
323   struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
324
325 };
326
327
328 /**
329  * Map from peer identities to `struct GSF_ConnectPeer` entries.
330  */
331 static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
332
333 /**
334  * Handle to peerstore service.
335  */
336 static struct GNUNET_PEERSTORE_Handle *peerstore;
337
338
339 /**
340  * Update the latency information kept for the given peer.
341  *
342  * @param id peer record to update
343  * @param latency current latency value
344  */
345 void
346 GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
347                           struct GNUNET_TIME_Relative latency)
348 {
349   struct GSF_ConnectedPeer *cp;
350
351   cp = GSF_peer_get_ (id);
352   if (NULL == cp)
353     return; /* we're not yet connected at the core level, ignore */
354   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
355                                  latency);
356 }
357
358
359 /**
360  * Return the performance data record for the given peer
361  *
362  * @param cp peer to query
363  * @return performance data record for the peer
364  */
365 struct GSF_PeerPerformanceData *
366 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
367 {
368   return &cp->ppd;
369 }
370
371
372 /**
373  * Core is ready to transmit to a peer, get the message.
374  *
375  * @param cls the `struct GSF_PeerTransmitHandle` of the message
376  * @param size number of bytes core is willing to take
377  * @param buf where to copy the message
378  * @return number of bytes copied to @a buf
379  */
380 static size_t
381 peer_transmit_ready_cb (void *cls,
382                         size_t size,
383                         void *buf);
384
385
386 /**
387  * Function called by core upon success or failure of our bandwidth reservation request.
388  *
389  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
390  * @param peer identifies the peer
391  * @param amount set to the amount that was actually reserved or unreserved;
392  *               either the full requested amount or zero (no partial reservations)
393  * @param res_delay if the reservation could not be satisfied (amount was 0), how
394  *        long should the client wait until re-trying?
395  */
396 static void
397 ats_reserve_callback (void *cls,
398                       const struct GNUNET_PeerIdentity *peer,
399                       int32_t amount,
400                       struct GNUNET_TIME_Relative res_delay);
401
402
403 /**
404  * If ready (bandwidth reserved), try to schedule transmission via
405  * core for the given handle.
406  *
407  * @param pth transmission handle to schedule
408  */
409 static void
410 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
411 {
412   struct GSF_ConnectedPeer *cp;
413   struct GNUNET_PeerIdentity target;
414
415   cp = pth->cp;
416   if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
417     return;                     /* already done */
418   GNUNET_assert (0 != cp->ppd.pid);
419   GNUNET_PEER_resolve (cp->ppd.pid, &target);
420
421   if (0 != cp->inc_preference)
422   {
423     GNUNET_ATS_performance_change_preference (GSF_ats,
424                                               &target,
425                                               GNUNET_ATS_PREFERENCE_BANDWIDTH,
426                                               (double) cp->inc_preference,
427                                               GNUNET_ATS_PREFERENCE_END);
428     cp->inc_preference = 0;
429   }
430
431   if ( (GNUNET_YES == pth->is_query) &&
432        (GNUNET_YES != pth->was_reserved) )
433   {
434     /* query, need reservation */
435     if (GNUNET_YES != cp->did_reserve)
436       return;                   /* not ready */
437     cp->did_reserve = GNUNET_NO;
438     /* reservation already done! */
439     pth->was_reserved = GNUNET_YES;
440     cp->rc = GNUNET_ATS_reserve_bandwidth (GSF_ats,
441                                            &target,
442                                            DBLOCK_SIZE,
443                                            &ats_reserve_callback,
444                                            cp);
445     return;
446   }
447   GNUNET_assert (NULL == cp->cth);
448   cp->cth_in_progress++;
449   cp->cth =
450     GNUNET_CORE_notify_transmit_ready (GSF_core,
451                                        GNUNET_YES,
452                                        GNUNET_CORE_PRIO_BACKGROUND,
453                                        GNUNET_TIME_absolute_get_remaining (pth->timeout),
454                                        &target,
455                                        pth->size,
456                                        &peer_transmit_ready_cb, cp);
457   GNUNET_assert (NULL != cp->cth);
458   GNUNET_assert (0 < cp->cth_in_progress--);
459 }
460
461
462 /**
463  * Core is ready to transmit to a peer, get the message.
464  *
465  * @param cls the `struct GSF_PeerTransmitHandle` of the message
466  * @param size number of bytes core is willing to take
467  * @param buf where to copy the message
468  * @return number of bytes copied to @a buf
469  */
470 static size_t
471 peer_transmit_ready_cb (void *cls,
472                         size_t size,
473                         void *buf)
474 {
475   struct GSF_ConnectedPeer *cp = cls;
476   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
477   struct GSF_PeerTransmitHandle *pos;
478   size_t ret;
479
480   cp->cth = NULL;
481   if (NULL == pth)
482     return 0;
483   if (pth->size > size)
484   {
485     schedule_transmission (pth);
486     return 0;
487   }
488   if (NULL != pth->timeout_task)
489   {
490     GNUNET_SCHEDULER_cancel (pth->timeout_task);
491     pth->timeout_task = NULL;
492   }
493   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
494                                cp->pth_tail,
495                                pth);
496   if (GNUNET_YES == pth->is_query)
497   {
498     cp->ppd.last_request_times[(cp->last_request_times_off++) %
499                                MAX_QUEUE_PER_PEER] =
500       GNUNET_TIME_absolute_get ();
501     GNUNET_assert (0 < cp->ppd.pending_queries--);
502   }
503   else if (GNUNET_NO == pth->is_query)
504   {
505     GNUNET_assert (0 < cp->ppd.pending_replies--);
506   }
507   GNUNET_LOAD_update (cp->ppd.transmission_delay,
508                       GNUNET_TIME_absolute_get_duration
509                       (pth->transmission_request_start_time).rel_value_us);
510   ret = pth->gmc (pth->gmc_cls, size, buf);
511   if (NULL != (pos = cp->pth_head))
512   {
513     GNUNET_assert (pos != pth);
514     schedule_transmission (pos);
515   }
516   GNUNET_free (pth);
517   return ret;
518 }
519
520
521 /**
522  * (re)try to reserve bandwidth from the given peer.
523  *
524  * @param cls the `struct GSF_ConnectedPeer` to reserve from
525  * @param tc scheduler context
526  */
527 static void
528 retry_reservation (void *cls,
529                    const struct GNUNET_SCHEDULER_TaskContext *tc)
530 {
531   struct GSF_ConnectedPeer *cp = cls;
532   struct GNUNET_PeerIdentity target;
533
534   GNUNET_PEER_resolve (cp->ppd.pid, &target);
535   cp->rc_delay_task = NULL;
536   cp->rc =
537     GNUNET_ATS_reserve_bandwidth (GSF_ats,
538                                   &target,
539                                   DBLOCK_SIZE,
540                                   &ats_reserve_callback, cp);
541 }
542
543
544 /**
545  * Function called by core upon success or failure of our bandwidth reservation request.
546  *
547  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
548  * @param peer identifies the peer
549  * @param amount set to the amount that was actually reserved or unreserved;
550  *               either the full requested amount or zero (no partial reservations)
551  * @param res_delay if the reservation could not be satisfied (amount was 0), how
552  *        long should the client wait until re-trying?
553  */
554 static void
555 ats_reserve_callback (void *cls,
556                       const struct GNUNET_PeerIdentity *peer,
557                       int32_t amount,
558                       struct GNUNET_TIME_Relative res_delay)
559 {
560   struct GSF_ConnectedPeer *cp = cls;
561   struct GSF_PeerTransmitHandle *pth;
562
563   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
564               "Reserved %d bytes / need to wait %s for reservation\n",
565               (int) amount,
566               GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
567   cp->rc = NULL;
568   if (0 == amount)
569   {
570     cp->rc_delay_task =
571         GNUNET_SCHEDULER_add_delayed (res_delay,
572                                       &retry_reservation,
573                                       cp);
574     return;
575   }
576   cp->did_reserve = GNUNET_YES;
577   pth = cp->pth_head;
578   if ( (NULL != pth) &&
579        (NULL == cp->cth) &&
580        (0 == cp->cth_in_progress) )
581   {
582     /* reservation success, try transmission now! */
583     cp->cth_in_progress++;
584     cp->cth =
585         GNUNET_CORE_notify_transmit_ready (GSF_core,
586                                            GNUNET_YES,
587                                            GNUNET_CORE_PRIO_BACKGROUND,
588                                            GNUNET_TIME_absolute_get_remaining (pth->timeout),
589                                            peer,
590                                            pth->size,
591                                            &peer_transmit_ready_cb,
592                                            cp);
593     GNUNET_assert (NULL != cp->cth);
594     GNUNET_assert (0 < cp->cth_in_progress--);
595   }
596 }
597
598
599 /**
600  * Function called by PEERSTORE with peer respect record
601  *
602  * @param cls handle to connected peer entry
603  * @param record peerstore record information
604  * @param emsg error message, or NULL if no errors
605  * @return #GNUNET_NO to stop iterating since we only expect 0 or 1 records
606  */
607 static int
608 peer_respect_cb (void *cls,
609                  const struct GNUNET_PEERSTORE_Record *record,
610                  const char *emsg)
611 {
612   struct GSF_ConnectedPeer *cp = cls;
613
614   cp->respect_iterate_req = NULL;
615   if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size))
616     cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value);
617   GSF_push_start_ (cp);
618   if (NULL != cp->creation_cb)
619     cp->creation_cb (cp->creation_cb_cls, cp);
620   return GNUNET_NO;
621 }
622
623
624 /**
625  * A peer connected to us.  Setup the connected peer
626  * records.
627  *
628  * @param peer identity of peer that connected
629  * @param creation_cb callback function when the record is created.
630  * @param creation_cb_cls closure for @creation_cb
631  */
632 void
633 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
634                            GSF_ConnectedPeerCreationCallback creation_cb,
635                            void *creation_cb_cls)
636 {
637   struct GSF_ConnectedPeer *cp;
638
639   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
640               "Connected to peer %s\n",
641               GNUNET_i2s (peer));
642   cp = GNUNET_new (struct GSF_ConnectedPeer);
643   cp->ppd.pid = GNUNET_PEER_intern (peer);
644   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
645   cp->rc =
646       GNUNET_ATS_reserve_bandwidth (GSF_ats, peer, DBLOCK_SIZE,
647                                     &ats_reserve_callback, cp);
648   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
649   GNUNET_break (GNUNET_OK ==
650                 GNUNET_CONTAINER_multipeermap_put (cp_map,
651                GSF_connected_peer_get_identity2_ (cp),
652                                                    cp,
653                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
654   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
655                          GNUNET_CONTAINER_multipeermap_size (cp_map),
656                          GNUNET_NO);
657   cp->creation_cb = creation_cb;
658   cp->creation_cb_cls = creation_cb_cls;
659   cp->respect_iterate_req =
660       GNUNET_PEERSTORE_iterate (peerstore, "fs", peer, "respect",
661                                 GNUNET_TIME_UNIT_FOREVER_REL, &peer_respect_cb,
662                                 cp);
663 }
664
665
666 /**
667  * It may be time to re-start migrating content to this
668  * peer.  Check, and if so, restart migration.
669  *
670  * @param cls the `struct GSF_ConnectedPeer`
671  * @param tc scheduler context
672  */
673 static void
674 revive_migration (void *cls,
675                   const struct GNUNET_SCHEDULER_TaskContext *tc)
676 {
677   struct GSF_ConnectedPeer *cp = cls;
678   struct GNUNET_TIME_Relative bt;
679
680   cp->mig_revive_task = NULL;
681   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
682   if (0 != bt.rel_value_us)
683   {
684     /* still time left... */
685     cp->mig_revive_task =
686         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
687     return;
688   }
689   GSF_push_start_ (cp);
690 }
691
692
693 /**
694  * Get a handle for a connected peer.
695  *
696  * @param peer peer's identity
697  * @return NULL if the peer is not currently connected
698  */
699 struct GSF_ConnectedPeer *
700 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
701 {
702   if (NULL == cp_map)
703     return NULL;
704   return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
705 }
706
707
708 /**
709  * Handle P2P "MIGRATION_STOP" message.
710  *
711  * @param cls closure, always NULL
712  * @param other the other peer involved (sender or receiver, NULL
713  *        for loopback messages where we are both sender and receiver)
714  * @param message the actual message
715  * @return #GNUNET_OK to keep the connection open,
716  *         #GNUNET_SYSERR to close it (signal serious error)
717  */
718 int
719 GSF_handle_p2p_migration_stop_ (void *cls,
720                                 const struct GNUNET_PeerIdentity *other,
721                                 const struct GNUNET_MessageHeader *message)
722 {
723   struct GSF_ConnectedPeer *cp;
724   const struct MigrationStopMessage *msm;
725   struct GNUNET_TIME_Relative bt;
726
727   msm = (const struct MigrationStopMessage *) message;
728   cp = GSF_peer_get_ (other);
729   if (NULL == cp)
730   {
731     GNUNET_break (0);
732     return GNUNET_OK;
733   }
734   GNUNET_STATISTICS_update (GSF_stats,
735                             gettext_noop ("# migration stop messages received"),
736                             1, GNUNET_NO);
737   bt = GNUNET_TIME_relative_ntoh (msm->duration);
738   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
739               _("Migration of content to peer `%s' blocked for %s\n"),
740               GNUNET_i2s (other),
741               GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
742   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
743   if (NULL == cp->mig_revive_task)
744   {
745     GSF_push_stop_ (cp);
746     cp->mig_revive_task =
747         GNUNET_SCHEDULER_add_delayed (bt,
748                                       &revive_migration, cp);
749   }
750   return GNUNET_OK;
751 }
752
753
754 /**
755  * Copy reply and free put message.
756  *
757  * @param cls the `struct PutMessage`
758  * @param buf_size number of bytes available in @a buf
759  * @param buf where to copy the message, NULL on error (peer disconnect)
760  * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
761  */
762 static size_t
763 copy_reply (void *cls,
764             size_t buf_size,
765             void *buf)
766 {
767   struct PutMessage *pm = cls;
768   size_t size;
769
770   if (NULL != buf)
771   {
772     GNUNET_assert (buf_size >= ntohs (pm->header.size));
773     size = ntohs (pm->header.size);
774     memcpy (buf, pm, size);
775     GNUNET_STATISTICS_update (GSF_stats,
776                               gettext_noop ("# replies transmitted to other peers"),
777                               1,
778                               GNUNET_NO);
779   }
780   else
781   {
782     size = 0;
783     GNUNET_STATISTICS_update (GSF_stats,
784                               gettext_noop ("# replies dropped"),
785                               1,
786                               GNUNET_NO);
787   }
788   GNUNET_free (pm);
789   return size;
790 }
791
792
793 /**
794  * Free resources associated with the given peer request.
795  *
796  * @param peerreq request to free
797  * @param query associated key for the request
798  */
799 static void
800 free_pending_request (struct PeerRequest *peerreq,
801                       const struct GNUNET_HashCode *query)
802 {
803   struct GSF_ConnectedPeer *cp = peerreq->cp;
804
805   if (NULL != peerreq->kill_task)
806   {
807     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
808     peerreq->kill_task = NULL;
809   }
810   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
811                             -1, GNUNET_NO);
812   GNUNET_break (GNUNET_YES ==
813                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
814                                                       query, peerreq));
815   GNUNET_free (peerreq);
816 }
817
818
819 /**
820  * Cancel all requests associated with the peer.
821  *
822  * @param cls unused
823  * @param query hash code of the request
824  * @param value the `struct GSF_PendingRequest`
825  * @return #GNUNET_YES (continue to iterate)
826  */
827 static int
828 cancel_pending_request (void *cls,
829                         const struct GNUNET_HashCode *query,
830                         void *value)
831 {
832   struct PeerRequest *peerreq = value;
833   struct GSF_PendingRequest *pr = peerreq->pr;
834   struct GSF_PendingRequestData *prd;
835
836   prd = GSF_pending_request_get_data_ (pr);
837   GSF_pending_request_cancel_ (pr, GNUNET_NO);
838   free_pending_request (peerreq, &prd->query);
839   return GNUNET_OK;
840 }
841
842
843 /**
844  * Free the given request.
845  *
846  * @param cls the request to free
847  * @param tc task context
848  */
849 static void
850 peer_request_destroy (void *cls,
851                       const struct GNUNET_SCHEDULER_TaskContext *tc)
852 {
853   struct PeerRequest *peerreq = cls;
854   struct GSF_PendingRequest *pr = peerreq->pr;
855   struct GSF_PendingRequestData *prd;
856
857   peerreq->kill_task = NULL;
858   prd = GSF_pending_request_get_data_ (pr);
859   cancel_pending_request (NULL, &prd->query, peerreq);
860 }
861
862
863 /**
864  * The artificial delay is over, transmit the message now.
865  *
866  * @param cls the `struct GSF_DelayedHandle` with the message
867  * @param tc scheduler context
868  */
869 static void
870 transmit_delayed_now (void *cls,
871                       const struct GNUNET_SCHEDULER_TaskContext *tc)
872 {
873   struct GSF_DelayedHandle *dh = cls;
874   struct GSF_ConnectedPeer *cp = dh->cp;
875
876   GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
877                                cp->delayed_tail,
878                                dh);
879   cp->delay_queue_size--;
880   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
881   {
882     GNUNET_free (dh->pm);
883     GNUNET_free (dh);
884     return;
885   }
886   (void) GSF_peer_transmit_ (cp,
887                              GNUNET_NO,
888                              UINT32_MAX,
889                              REPLY_TIMEOUT,
890                              dh->msize,
891                              &copy_reply,
892                              dh->pm);
893   GNUNET_free (dh);
894 }
895
896
897 /**
898  * Get the randomized delay a response should be subjected to.
899  *
900  * @return desired delay
901  */
902 static struct GNUNET_TIME_Relative
903 get_randomized_delay ()
904 {
905   struct GNUNET_TIME_Relative ret;
906
907   ret =
908       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
909                                      GNUNET_CRYPTO_random_u32
910                                      (GNUNET_CRYPTO_QUALITY_WEAK,
911                                       2 * GSF_avg_latency.rel_value_us + 1));
912 #if INSANE_STATISTICS
913   GNUNET_STATISTICS_update (GSF_stats,
914                             gettext_noop
915                             ("# artificial delays introduced (ms)"),
916                             ret.rel_value_us / 1000LL, GNUNET_NO);
917 #endif
918   return ret;
919 }
920
921
922 /**
923  * Handle a reply to a pending request.  Also called if a request
924  * expires (then with data == NULL).  The handler may be called
925  * many times (depending on the request type), but will not be
926  * called during or after a call to GSF_pending_request_cancel
927  * and will also not be called anymore after a call signalling
928  * expiration.
929  *
930  * @param cls `struct PeerRequest` this is an answer for
931  * @param eval evaluation of the result
932  * @param pr handle to the original pending request
933  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
934  * @param expiration when does @a data expire?
935  * @param last_transmission when did we last transmit a request for this block
936  * @param type type of the block
937  * @param data response data, NULL on request expiration
938  * @param data_len number of bytes in @a data
939  */
940 static void
941 handle_p2p_reply (void *cls,
942                   enum GNUNET_BLOCK_EvaluationResult eval,
943                   struct GSF_PendingRequest *pr,
944                   uint32_t reply_anonymity_level,
945                   struct GNUNET_TIME_Absolute expiration,
946                   struct GNUNET_TIME_Absolute last_transmission,
947                   enum GNUNET_BLOCK_Type type,
948                   const void *data,
949                   size_t data_len)
950 {
951   struct PeerRequest *peerreq = cls;
952   struct GSF_ConnectedPeer *cp = peerreq->cp;
953   struct GSF_PendingRequestData *prd;
954   struct PutMessage *pm;
955   size_t msize;
956
957   GNUNET_assert (data_len + sizeof (struct PutMessage) <
958                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
959   GNUNET_assert (peerreq->pr == pr);
960   prd = GSF_pending_request_get_data_ (pr);
961   if (NULL == data)
962   {
963     free_pending_request (peerreq, &prd->query);
964     return;
965   }
966   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
967   if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
968   {
969     GNUNET_STATISTICS_update (GSF_stats,
970                               gettext_noop
971                               ("# replies dropped due to type mismatch"),
972                                 1, GNUNET_NO);
973     return;
974   }
975   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
976               "Transmitting result for query `%s' to peer\n",
977               GNUNET_h2s (&prd->query));
978   GNUNET_STATISTICS_update (GSF_stats,
979                             gettext_noop ("# replies received for other peers"),
980                             1, GNUNET_NO);
981   msize = sizeof (struct PutMessage) + data_len;
982   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
983   {
984     GNUNET_break (0);
985     return;
986   }
987   if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
988   {
989     if (reply_anonymity_level - 1 > GSF_cover_content_count)
990     {
991       GNUNET_STATISTICS_update (GSF_stats,
992                                 gettext_noop
993                                 ("# replies dropped due to insufficient cover traffic"),
994                                 1, GNUNET_NO);
995       return;
996     }
997     GSF_cover_content_count -= (reply_anonymity_level - 1);
998   }
999
1000   pm = GNUNET_malloc (msize);
1001   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1002   pm->header.size = htons (msize);
1003   pm->type = htonl (type);
1004   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
1005   memcpy (&pm[1], data, data_len);
1006   if ( (UINT32_MAX != reply_anonymity_level) &&
1007        (0 != reply_anonymity_level) &&
1008        (GNUNET_YES == GSF_enable_randomized_delays) )
1009   {
1010     struct GSF_DelayedHandle *dh;
1011
1012     dh = GNUNET_new (struct GSF_DelayedHandle);
1013     dh->cp = cp;
1014     dh->pm = pm;
1015     dh->msize = msize;
1016     GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
1017                                  cp->delayed_tail,
1018                                  dh);
1019     cp->delay_queue_size++;
1020     dh->delay_task =
1021         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
1022                                       &transmit_delayed_now,
1023                                       dh);
1024   }
1025   else
1026   {
1027     (void) GSF_peer_transmit_ (cp,
1028                                GNUNET_NO,
1029                                UINT32_MAX,
1030                                REPLY_TIMEOUT,
1031                                msize,
1032                                &copy_reply,
1033                                pm);
1034   }
1035   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
1036     return;
1037   if (NULL == peerreq->kill_task)
1038   {
1039     GNUNET_STATISTICS_update (GSF_stats,
1040                               gettext_noop
1041                               ("# P2P searches destroyed due to ultimate reply"),
1042                               1, GNUNET_NO);
1043     peerreq->kill_task =
1044         GNUNET_SCHEDULER_add_now (&peer_request_destroy, peerreq);
1045   }
1046 }
1047
1048
1049 /**
1050  * Increase the peer's respect by a value.
1051  *
1052  * @param cp which peer to change the respect value on
1053  * @param value is the int value by which the
1054  *  peer's credit is to be increased or decreased
1055  * @returns the actual change in respect (positive or negative)
1056  */
1057 static int
1058 change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
1059 {
1060   if (0 == value)
1061     return 0;
1062   GNUNET_assert (NULL != cp);
1063   if (value > 0)
1064   {
1065     if (cp->ppd.respect + value < cp->ppd.respect)
1066     {
1067       value = UINT32_MAX - cp->ppd.respect;
1068       cp->ppd.respect = UINT32_MAX;
1069     }
1070     else
1071       cp->ppd.respect += value;
1072   }
1073   else
1074   {
1075     if (cp->ppd.respect < -value)
1076     {
1077       value = -cp->ppd.respect;
1078       cp->ppd.respect = 0;
1079     }
1080     else
1081       cp->ppd.respect += value;
1082   }
1083   return value;
1084 }
1085
1086
1087 /**
1088  * We've received a request with the specified priority.  Bound it
1089  * according to how much we respect the given peer.
1090  *
1091  * @param prio_in requested priority
1092  * @param cp the peer making the request
1093  * @return effective priority
1094  */
1095 static int32_t
1096 bound_priority (uint32_t prio_in, struct GSF_ConnectedPeer *cp)
1097 {
1098 #define N ((double)128.0)
1099   uint32_t ret;
1100   double rret;
1101   int ld;
1102
1103   ld = GSF_test_get_load_too_high_ (0);
1104   if (GNUNET_SYSERR == ld)
1105   {
1106 #if INSANE_STATISTICS
1107     GNUNET_STATISTICS_update (GSF_stats,
1108                               gettext_noop
1109                               ("# requests done for free (low load)"), 1,
1110                               GNUNET_NO);
1111 #endif
1112     return 0;                   /* excess resources */
1113   }
1114   if (prio_in > INT32_MAX)
1115     prio_in = INT32_MAX;
1116   ret = -change_peer_respect (cp, -(int) prio_in);
1117   if (ret > 0)
1118   {
1119     if (ret > GSF_current_priorities + N)
1120       rret = GSF_current_priorities + N;
1121     else
1122       rret = ret;
1123     GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1124   }
1125   if ((GNUNET_YES == ld) && (ret > 0))
1126   {
1127     /* try with charging */
1128     ld = GSF_test_get_load_too_high_ (ret);
1129   }
1130   if (GNUNET_YES == ld)
1131   {
1132     GNUNET_STATISTICS_update (GSF_stats,
1133                               gettext_noop
1134                               ("# request dropped, priority insufficient"), 1,
1135                               GNUNET_NO);
1136     /* undo charge */
1137     change_peer_respect (cp, (int) ret);
1138     return -1;                  /* not enough resources */
1139   }
1140   else
1141   {
1142     GNUNET_STATISTICS_update (GSF_stats,
1143                               gettext_noop
1144                               ("# requests done for a price (normal load)"), 1,
1145                               GNUNET_NO);
1146   }
1147 #undef N
1148   return ret;
1149 }
1150
1151
1152 /**
1153  * The priority level imposes a bound on the maximum
1154  * value for the ttl that can be requested.
1155  *
1156  * @param ttl_in requested ttl
1157  * @param prio given priority
1158  * @return ttl_in if ttl_in is below the limit,
1159  *         otherwise the ttl-limit for the given priority
1160  */
1161 static int32_t
1162 bound_ttl (int32_t ttl_in, uint32_t prio)
1163 {
1164   unsigned long long allowed;
1165
1166   if (ttl_in <= 0)
1167     return ttl_in;
1168   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1169   if (ttl_in > allowed)
1170   {
1171     if (allowed >= (1 << 30))
1172       return 1 << 30;
1173     return allowed;
1174   }
1175   return ttl_in;
1176 }
1177
1178
1179 /**
1180  * Handle P2P "QUERY" message.  Creates the pending request entry
1181  * and sets up all of the data structures to that we will
1182  * process replies properly.  Does not initiate forwarding or
1183  * local database lookups.
1184  *
1185  * @param other the other peer involved (sender or receiver, NULL
1186  *        for loopback messages where we are both sender and receiver)
1187  * @param message the actual message
1188  * @return pending request handle, NULL on error
1189  */
1190 struct GSF_PendingRequest *
1191 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1192                        const struct GNUNET_MessageHeader *message)
1193 {
1194   struct PeerRequest *peerreq;
1195   struct GSF_PendingRequest *pr;
1196   struct GSF_PendingRequestData *prd;
1197   struct GSF_ConnectedPeer *cp;
1198   struct GSF_ConnectedPeer *cps;
1199   const struct GNUNET_PeerIdentity *target;
1200   enum GSF_PendingRequestOptions options;
1201   uint16_t msize;
1202   const struct GetMessage *gm;
1203   unsigned int bits;
1204   const struct GNUNET_PeerIdentity *opt;
1205   uint32_t bm;
1206   size_t bfsize;
1207   uint32_t ttl_decrement;
1208   int32_t priority;
1209   int32_t ttl;
1210   enum GNUNET_BLOCK_Type type;
1211   GNUNET_PEER_Id spid;
1212
1213   msize = ntohs (message->size);
1214   if (msize < sizeof (struct GetMessage))
1215   {
1216     GNUNET_break_op (0);
1217     return NULL;
1218   }
1219   GNUNET_STATISTICS_update (GSF_stats,
1220                             gettext_noop
1221                             ("# GET requests received (from other peers)"),
1222                             1,
1223                             GNUNET_NO);
1224   gm = (const struct GetMessage *) message;
1225   type = ntohl (gm->type);
1226   bm = ntohl (gm->hash_bitmap);
1227   bits = 0;
1228   while (bm > 0)
1229   {
1230     if (1 == (bm & 1))
1231       bits++;
1232     bm >>= 1;
1233   }
1234   if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
1235   {
1236     GNUNET_break_op (0);
1237     return NULL;
1238   }
1239   opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1240   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
1241   /* bfsize must be power of 2, check! */
1242   if (0 != ((bfsize - 1) & bfsize))
1243   {
1244     GNUNET_break_op (0);
1245     return NULL;
1246   }
1247   GSF_cover_query_count++;
1248   bm = ntohl (gm->hash_bitmap);
1249   bits = 0;
1250   cps = GSF_peer_get_ (other);
1251   if (NULL == cps)
1252   {
1253     /* peer must have just disconnected */
1254     GNUNET_STATISTICS_update (GSF_stats,
1255                               gettext_noop
1256                               ("# requests dropped due to initiator not being connected"),
1257                               1, GNUNET_NO);
1258     return NULL;
1259   }
1260   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1261     cp = GSF_peer_get_ (&opt[bits++]);
1262   else
1263     cp = cps;
1264   if (NULL == cp)
1265   {
1266     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1267       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1268                   "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1269                   GNUNET_i2s (&opt[bits - 1]));
1270
1271     else
1272       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273                   "Failed to find peer `%s' in connection set. Dropping query.\n",
1274                   GNUNET_i2s (other));
1275     GNUNET_STATISTICS_update (GSF_stats,
1276                               gettext_noop
1277                               ("# requests dropped due to missing reverse route"),
1278                               1,
1279                               GNUNET_NO);
1280     return NULL;
1281   }
1282   if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER)
1283   {
1284     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1285                 "Peer `%s' has too many replies queued already. Dropping query.\n",
1286                 GNUNET_i2s (other));
1287     GNUNET_STATISTICS_update (GSF_stats,
1288                               gettext_noop ("# requests dropped due to full reply queue"),
1289                               1,
1290                               GNUNET_NO);
1291     return NULL;
1292   }
1293   /* note that we can really only check load here since otherwise
1294    * peers could find out that we are overloaded by not being
1295    * disconnected after sending us a malformed query... */
1296   priority = bound_priority (ntohl (gm->priority),
1297                              cps);
1298   if (priority < 0)
1299   {
1300     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1301                 "Dropping query from `%s', this peer is too busy.\n",
1302                 GNUNET_i2s (other));
1303     return NULL;
1304   }
1305   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306               "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1307               GNUNET_h2s (&gm->query),
1308               (unsigned int) type,
1309               GNUNET_i2s (other),
1310               (unsigned int) bm);
1311   target =
1312       (0 !=
1313        (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1314   options = GSF_PRO_DEFAULTS;
1315   spid = 0;
1316   if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
1317       || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1318           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
1319           GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1320   {
1321     /* don't have BW to send to peer, or would likely take longer than we have for it,
1322      * so at best indirect the query */
1323     priority = 0;
1324     options |= GSF_PRO_FORWARD_ONLY;
1325     spid = GNUNET_PEER_intern (other);
1326     GNUNET_assert (0 != spid);
1327   }
1328   ttl = bound_ttl (ntohl (gm->ttl), priority);
1329   /* decrement ttl (always) */
1330   ttl_decrement =
1331       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1332                                                     TTL_DECREMENT);
1333   if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
1334   {
1335     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1336                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1337                 GNUNET_i2s (other), ttl, ttl_decrement);
1338     GNUNET_STATISTICS_update (GSF_stats,
1339                               gettext_noop
1340                               ("# requests dropped due TTL underflow"), 1,
1341                               GNUNET_NO);
1342     /* integer underflow => drop (should be very rare)! */
1343     return NULL;
1344   }
1345   ttl -= ttl_decrement;
1346
1347   /* test if the request already exists */
1348   peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
1349                                                &gm->query);
1350   if (NULL != peerreq)
1351   {
1352     pr = peerreq->pr;
1353     prd = GSF_pending_request_get_data_ (pr);
1354     if (prd->type == type)
1355     {
1356       if (prd->ttl.abs_value_us >= GNUNET_TIME_absolute_get ().abs_value_us + ttl * 1000LL)
1357       {
1358         /* existing request has higher TTL, drop new one! */
1359         prd->priority += priority;
1360         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1361                     "Have existing request with higher TTL, dropping new request.\n",
1362                     GNUNET_i2s (other));
1363         GNUNET_STATISTICS_update (GSF_stats,
1364                                   gettext_noop
1365                                   ("# requests dropped due to higher-TTL request"),
1366                                   1, GNUNET_NO);
1367         return NULL;
1368       }
1369       /* existing request has lower TTL, drop old one! */
1370       priority += prd->priority;
1371       GSF_pending_request_cancel_ (pr, GNUNET_YES);
1372       free_pending_request (peerreq, &gm->query);
1373     }
1374   }
1375
1376   peerreq = GNUNET_new (struct PeerRequest);
1377   peerreq->cp = cp;
1378   pr = GSF_pending_request_create_ (options,
1379                                     type,
1380                                     &gm->query,
1381                                     target,
1382                                     (bfsize > 0)
1383                                     ? (const char *) &opt[bits]
1384                                     : NULL,
1385                                     bfsize,
1386                                     ntohl (gm->filter_mutator),
1387                                     1 /* anonymity */,
1388                                     (uint32_t) priority,
1389                                     ttl,
1390                                     spid,
1391                                     GNUNET_PEER_intern (other),
1392                                     NULL, 0,        /* replies_seen */
1393                                     &handle_p2p_reply, peerreq);
1394   GNUNET_assert (NULL != pr);
1395   peerreq->pr = pr;
1396   GNUNET_break (GNUNET_OK ==
1397                 GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query,
1398                                                    peerreq,
1399                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1400   GNUNET_STATISTICS_update (GSF_stats,
1401                             gettext_noop
1402                             ("# P2P query messages received and processed"), 1,
1403                             GNUNET_NO);
1404   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
1405                             1, GNUNET_NO);
1406   return pr;
1407 }
1408
1409
1410 /**
1411  * Function called if there has been a timeout trying to satisfy
1412  * a transmission request.
1413  *
1414  * @param cls the `struct GSF_PeerTransmitHandle` of the request
1415  * @param tc scheduler context
1416  */
1417 static void
1418 peer_transmit_timeout (void *cls,
1419                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1420 {
1421   struct GSF_PeerTransmitHandle *pth = cls;
1422   struct GSF_ConnectedPeer *cp;
1423
1424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1425               "Timeout trying to transmit to other peer\n");
1426   pth->timeout_task = NULL;
1427   cp = pth->cp;
1428   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1429                                cp->pth_tail,
1430                                pth);
1431   if (GNUNET_YES == pth->is_query)
1432     GNUNET_assert (0 < cp->ppd.pending_queries--);
1433   else if (GNUNET_NO == pth->is_query)
1434     GNUNET_assert (0 < cp->ppd.pending_replies--);
1435   GNUNET_LOAD_update (cp->ppd.transmission_delay, UINT64_MAX);
1436   if (NULL != cp->cth)
1437   {
1438     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1439     cp->cth = NULL;
1440   }
1441   pth->gmc (pth->gmc_cls, 0, NULL);
1442   GNUNET_assert (0 == cp->cth_in_progress);
1443   GNUNET_free (pth);
1444 }
1445
1446
1447 /**
1448  * Transmit a message to the given peer as soon as possible.
1449  * If the peer disconnects before the transmission can happen,
1450  * the callback is invoked with a `NULL` @a buffer.
1451  *
1452  * @param cp target peer
1453  * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1454  * @param priority how important is this request?
1455  * @param timeout when does this request timeout (call gmc with error)
1456  * @param size number of bytes we would like to send to the peer
1457  * @param gmc function to call to get the message
1458  * @param gmc_cls closure for @a gmc
1459  * @return handle to cancel request
1460  */
1461 struct GSF_PeerTransmitHandle *
1462 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1463                     int is_query,
1464                     uint32_t priority,
1465                     struct GNUNET_TIME_Relative timeout,
1466                     size_t size,
1467                     GSF_GetMessageCallback gmc, void *gmc_cls)
1468 {
1469   struct GSF_PeerTransmitHandle *pth;
1470   struct GSF_PeerTransmitHandle *pos;
1471   struct GSF_PeerTransmitHandle *prev;
1472
1473   pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1474   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1475   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1476   pth->gmc = gmc;
1477   pth->gmc_cls = gmc_cls;
1478   pth->size = size;
1479   pth->is_query = is_query;
1480   pth->priority = priority;
1481   pth->cp = cp;
1482   /* insertion sort (by priority, descending) */
1483   prev = NULL;
1484   pos = cp->pth_head;
1485   while ((NULL != pos) && (pos->priority > priority))
1486   {
1487     prev = pos;
1488     pos = pos->next;
1489   }
1490   GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1491                                      cp->pth_tail,
1492                                      prev,
1493                                      pth);
1494   if (GNUNET_YES == is_query)
1495     cp->ppd.pending_queries++;
1496   else if (GNUNET_NO == is_query)
1497     cp->ppd.pending_replies++;
1498   pth->timeout_task
1499     = GNUNET_SCHEDULER_add_delayed (timeout,
1500                                     &peer_transmit_timeout,
1501                                     pth);
1502   schedule_transmission (pth);
1503   return pth;
1504 }
1505
1506
1507 /**
1508  * Cancel an earlier request for transmission.
1509  *
1510  * @param pth request to cancel
1511  */
1512 void
1513 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1514 {
1515   struct GSF_ConnectedPeer *cp;
1516
1517   if (NULL != pth->timeout_task)
1518   {
1519     GNUNET_SCHEDULER_cancel (pth->timeout_task);
1520     pth->timeout_task = NULL;
1521   }
1522   cp = pth->cp;
1523   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1524                                cp->pth_tail,
1525                                pth);
1526   if (GNUNET_YES == pth->is_query)
1527     GNUNET_assert (0 < cp->ppd.pending_queries--);
1528   else if (GNUNET_NO == pth->is_query)
1529     GNUNET_assert (0 < cp->ppd.pending_replies--);
1530   GNUNET_free (pth);
1531 }
1532
1533
1534 /**
1535  * Report on receiving a reply; update the performance record of the given peer.
1536  *
1537  * @param cp responding peer (will be updated)
1538  * @param request_time time at which the original query was transmitted
1539  * @param request_priority priority of the original request
1540  */
1541 void
1542 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1543                               struct GNUNET_TIME_Absolute request_time,
1544                               uint32_t request_priority)
1545 {
1546   struct GNUNET_TIME_Relative delay;
1547
1548   delay = GNUNET_TIME_absolute_get_duration (request_time);
1549   cp->ppd.avg_reply_delay.rel_value_us =
1550       (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
1551        delay.rel_value_us) / RUNAVG_DELAY_N;
1552   cp->ppd.avg_priority =
1553       (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1554        request_priority) / RUNAVG_DELAY_N;
1555 }
1556
1557
1558 /**
1559  * Report on receiving a reply in response to an initiating client.
1560  * Remember that this peer is good for this client.
1561  *
1562  * @param cp responding peer (will be updated)
1563  * @param initiator_client local client on responsible for query
1564  */
1565 void
1566 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1567                                    struct GSF_LocalClient *initiator_client)
1568 {
1569   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1570                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1571 }
1572
1573
1574 /**
1575  * Report on receiving a reply in response to an initiating peer.
1576  * Remember that this peer is good for this initiating peer.
1577  *
1578  * @param cp responding peer (will be updated)
1579  * @param initiator_peer other peer responsible for query
1580  */
1581 void
1582 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1583                                  const struct GSF_ConnectedPeer *initiator_peer)
1584 {
1585   unsigned int woff;
1586
1587   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1588   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1589   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1590   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1591   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1592 }
1593
1594
1595 /**
1596  * Write peer-respect information to a file - flush the buffer entry!
1597  *
1598  * @param cls unused
1599  * @param key peer identity
1600  * @param value the `struct GSF_ConnectedPeer` to flush
1601  * @return #GNUNET_OK to continue iteration
1602  */
1603 static int
1604 flush_respect (void *cls,
1605                const struct GNUNET_PeerIdentity *key,
1606                void *value)
1607 {
1608   struct GSF_ConnectedPeer *cp = value;
1609   struct GNUNET_PeerIdentity pid;
1610
1611   if (cp->ppd.respect == cp->disk_respect)
1612     return GNUNET_OK;           /* unchanged */
1613   GNUNET_assert (0 != cp->ppd.pid);
1614   GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1615   GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1616                           sizeof (cp->ppd.respect),
1617                           GNUNET_TIME_UNIT_FOREVER_ABS,
1618                           GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL);
1619   return GNUNET_OK;
1620 }
1621
1622
1623 /**
1624  * A peer disconnected from us.  Tear down the connected peer
1625  * record.
1626  *
1627  * @param cls unused
1628  * @param peer identity of peer that connected
1629  */
1630 void
1631 GSF_peer_disconnect_handler_ (void *cls,
1632                               const struct GNUNET_PeerIdentity *peer)
1633 {
1634   struct GSF_ConnectedPeer *cp;
1635   struct GSF_PeerTransmitHandle *pth;
1636   struct GSF_DelayedHandle *dh;
1637
1638   cp = GSF_peer_get_ (peer);
1639   if (NULL == cp)
1640     return;                     /* must have been disconnect from core with
1641                                  * 'peer' == my_id, ignore */
1642   flush_respect (NULL, peer, cp);
1643   GNUNET_assert (GNUNET_YES ==
1644                  GNUNET_CONTAINER_multipeermap_remove (cp_map,
1645                                                        peer,
1646                                                        cp));
1647   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1648                          GNUNET_CONTAINER_multipeermap_size (cp_map),
1649                          GNUNET_NO);
1650   if (NULL != cp->respect_iterate_req)
1651   {
1652     GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1653     cp->respect_iterate_req = NULL;
1654   }
1655   if (NULL != cp->migration_pth)
1656   {
1657     GSF_peer_transmit_cancel_ (cp->migration_pth);
1658     cp->migration_pth = NULL;
1659   }
1660   if (NULL != cp->rc)
1661   {
1662     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1663     cp->rc = NULL;
1664   }
1665   if (NULL != cp->rc_delay_task)
1666   {
1667     GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1668     cp->rc_delay_task = NULL;
1669   }
1670   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1671                                          &cancel_pending_request, cp);
1672   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1673   cp->request_map = NULL;
1674   GSF_plan_notify_peer_disconnect_ (cp);
1675   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1676   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1677   memset (cp->ppd.last_p2p_replies, 0, sizeof (cp->ppd.last_p2p_replies));
1678   GSF_push_stop_ (cp);
1679   if (NULL != cp->cth)
1680   {
1681     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1682     cp->cth = NULL;
1683   }
1684   GNUNET_assert (0 == cp->cth_in_progress);
1685   while (NULL != (pth = cp->pth_head))
1686   {
1687     if (pth->timeout_task != NULL)
1688     {
1689       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1690       pth->timeout_task = NULL;
1691     }
1692     GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1693                                  cp->pth_tail,
1694                                  pth);
1695     if (GNUNET_YES == pth->is_query)
1696       GNUNET_assert (0 < cp->ppd.pending_queries--);
1697     else if (GNUNET_NO == pth->is_query)
1698       GNUNET_assert (0 < cp->ppd.pending_replies--);
1699     pth->gmc (pth->gmc_cls, 0, NULL);
1700     GNUNET_free (pth);
1701   }
1702   while (NULL != (dh = cp->delayed_head))
1703   {
1704     GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1705                                  cp->delayed_tail,
1706                                  dh);
1707     cp->delay_queue_size--;
1708     GNUNET_SCHEDULER_cancel (dh->delay_task);
1709     GNUNET_free (dh->pm);
1710     GNUNET_free (dh);
1711   }
1712   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1713   if (NULL != cp->mig_revive_task)
1714   {
1715     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1716     cp->mig_revive_task = NULL;
1717   }
1718   GNUNET_break (0 == cp->ppd.pending_queries);
1719   GNUNET_break (0 == cp->ppd.pending_replies);
1720   GNUNET_free (cp);
1721 }
1722
1723
1724 /**
1725  * Closure for #call_iterator().
1726  */
1727 struct IterationContext
1728 {
1729   /**
1730    * Function to call on each entry.
1731    */
1732   GSF_ConnectedPeerIterator it;
1733
1734   /**
1735    * Closure for @e it.
1736    */
1737   void *it_cls;
1738 };
1739
1740
1741 /**
1742  * Function that calls the callback for each peer.
1743  *
1744  * @param cls the `struct IterationContext *`
1745  * @param key identity of the peer
1746  * @param value the `struct GSF_ConnectedPeer *`
1747  * @return #GNUNET_YES to continue iteration
1748  */
1749 static int
1750 call_iterator (void *cls,
1751                const struct GNUNET_PeerIdentity *key,
1752                void *value)
1753 {
1754   struct IterationContext *ic = cls;
1755   struct GSF_ConnectedPeer *cp = value;
1756
1757   ic->it (ic->it_cls,
1758           key, cp,
1759           &cp->ppd);
1760   return GNUNET_YES;
1761 }
1762
1763
1764 /**
1765  * Iterate over all connected peers.
1766  *
1767  * @param it function to call for each peer
1768  * @param it_cls closure for @a it
1769  */
1770 void
1771 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1772                               void *it_cls)
1773 {
1774   struct IterationContext ic;
1775
1776   ic.it = it;
1777   ic.it_cls = it_cls;
1778   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1779                                          &call_iterator,
1780                                          &ic);
1781 }
1782
1783
1784 /**
1785  * Obtain the identity of a connected peer.
1786  *
1787  * @param cp peer to get identity of
1788  * @param id identity to set (written to)
1789  */
1790 void
1791 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1792                                   struct GNUNET_PeerIdentity *id)
1793 {
1794   GNUNET_assert (0 != cp->ppd.pid);
1795   GNUNET_PEER_resolve (cp->ppd.pid, id);
1796 }
1797
1798
1799 /**
1800  * Obtain the identity of a connected peer.
1801  *
1802  * @param cp peer to get identity of
1803  * @return reference to peer identity, valid until peer disconnects (!)
1804  */
1805 const struct GNUNET_PeerIdentity *
1806 GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1807 {
1808   GNUNET_assert (0 != cp->ppd.pid);
1809   return GNUNET_PEER_resolve2 (cp->ppd.pid);
1810 }
1811
1812
1813 /**
1814  * Assemble a migration stop message for transmission.
1815  *
1816  * @param cls the `struct GSF_ConnectedPeer` to use
1817  * @param size number of bytes we're allowed to write to @a buf
1818  * @param buf where to copy the message
1819  * @return number of bytes copied to @a buf
1820  */
1821 static size_t
1822 create_migration_stop_message (void *cls,
1823                                size_t size,
1824                                void *buf)
1825 {
1826   struct GSF_ConnectedPeer *cp = cls;
1827   struct MigrationStopMessage msm;
1828
1829   cp->migration_pth = NULL;
1830   if (NULL == buf)
1831     return 0;
1832   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1833   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1834   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1835   msm.reserved = htonl (0);
1836   msm.duration =
1837       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1838                                  (cp->last_migration_block));
1839   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1840   GNUNET_STATISTICS_update (GSF_stats,
1841                             gettext_noop ("# migration stop messages sent"),
1842                             1, GNUNET_NO);
1843   return sizeof (struct MigrationStopMessage);
1844 }
1845
1846
1847 /**
1848  * Ask a peer to stop migrating data to us until the given point
1849  * in time.
1850  *
1851  * @param cp peer to ask
1852  * @param block_time until when to block
1853  */
1854 void
1855 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1856                            struct GNUNET_TIME_Absolute block_time)
1857 {
1858   if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1859   {
1860     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1861                 "Migration already blocked for another %s\n",
1862                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
1863                                                         (cp->last_migration_block), GNUNET_YES));
1864     return;                     /* already blocked */
1865   }
1866   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1867               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
1868                                                       GNUNET_YES));
1869   cp->last_migration_block = block_time;
1870   if (NULL != cp->migration_pth)
1871     GSF_peer_transmit_cancel_ (cp->migration_pth);
1872   cp->migration_pth =
1873       GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1874                           GNUNET_TIME_UNIT_FOREVER_REL,
1875                           sizeof (struct MigrationStopMessage),
1876                           &create_migration_stop_message, cp);
1877 }
1878
1879
1880 /**
1881  * Notify core about a preference we have for the given peer
1882  * (to allocate more resources towards it).  The change will
1883  * be communicated the next time we reserve bandwidth with
1884  * core (not instantly).
1885  *
1886  * @param cp peer to reserve bandwidth from
1887  * @param pref preference change
1888  */
1889 void
1890 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1891                                        uint64_t pref)
1892 {
1893   cp->inc_preference += pref;
1894 }
1895
1896
1897 /**
1898  * Call this method periodically to flush respect information to disk.
1899  *
1900  * @param cls closure, not used
1901  * @param tc task context, not used
1902  */
1903 static void
1904 cron_flush_respect (void *cls,
1905                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1906 {
1907   if (NULL == cp_map)
1908     return;
1909   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1910                                          &flush_respect, NULL);
1911   if (NULL == tc)
1912     return;
1913   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1914     return;
1915   GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1916                                               GNUNET_SCHEDULER_PRIORITY_HIGH,
1917                                               &cron_flush_respect, NULL);
1918 }
1919
1920
1921 /**
1922  * Initialize peer management subsystem.
1923  */
1924 void
1925 GSF_connected_peer_init_ ()
1926 {
1927   cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1928   peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
1929   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1930                                       &cron_flush_respect, NULL);
1931 }
1932
1933
1934 /**
1935  * Iterator to free peer entries.
1936  *
1937  * @param cls closure, unused
1938  * @param key current key code
1939  * @param value value in the hash map (peer entry)
1940  * @return #GNUNET_YES (we should continue to iterate)
1941  */
1942 static int
1943 clean_peer (void *cls,
1944             const struct GNUNET_PeerIdentity *key,
1945             void *value)
1946 {
1947   GSF_peer_disconnect_handler_ (NULL, key);
1948   return GNUNET_YES;
1949 }
1950
1951
1952 /**
1953  * Shutdown peer management subsystem.
1954  */
1955 void
1956 GSF_connected_peer_done_ ()
1957 {
1958   cron_flush_respect (NULL, NULL);
1959   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1960                                          &clean_peer, NULL);
1961   GNUNET_CONTAINER_multipeermap_destroy (cp_map);
1962   cp_map = NULL;
1963   GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
1964 }
1965
1966
1967 /**
1968  * Iterator to remove references to LC entry.
1969  *
1970  * @param cls the `struct GSF_LocalClient *` to look for
1971  * @param key current key code
1972  * @param value value in the hash map (peer entry)
1973  * @return #GNUNET_YES (we should continue to iterate)
1974  */
1975 static int
1976 clean_local_client (void *cls,
1977                     const struct GNUNET_PeerIdentity *key,
1978                     void *value)
1979 {
1980   const struct GSF_LocalClient *lc = cls;
1981   struct GSF_ConnectedPeer *cp = value;
1982   unsigned int i;
1983
1984   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1985     if (cp->ppd.last_client_replies[i] == lc)
1986       cp->ppd.last_client_replies[i] = NULL;
1987   return GNUNET_YES;
1988 }
1989
1990
1991 /**
1992  * Notification that a local client disconnected.  Clean up all of our
1993  * references to the given handle.
1994  *
1995  * @param lc handle to the local client (henceforth invalid)
1996  */
1997 void
1998 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1999 {
2000   if (NULL == cp_map)
2001     return;                     /* already cleaned up */
2002   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client,
2003                                          (void *) lc);
2004 }
2005
2006
2007 /* end of gnunet-service-fs_cp.c */