-preparations for replacement of try_connect call
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file fs/gnunet-service-fs_cp.c
22  * @brief API to handle 'connected peers'
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_pe.h"
31 #include "gnunet-service-fs_pr.h"
32 #include "gnunet-service-fs_push.h"
33 #include "gnunet_peerstore_service.h"
34
35
36 /**
37  * Ratio for moving average delay calculation.  The previous
38  * average goes in with a factor of (n-1) into the calculation.
39  * Must be > 0.
40  */
41 #define RUNAVG_DELAY_N 16
42
43 /**
44  * How often do we flush respect values to disk?
45  */
46 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
47
48 /**
49  * After how long do we discard a reply?
50  */
51 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
52
53 /**
54  * Collect an instane number of statistics?  May cause excessive IPC.
55  */
56 #define INSANE_STATISTICS GNUNET_NO
57
58
59 /**
60  * Handle to cancel a transmission request.
61  */
62 struct GSF_PeerTransmitHandle
63 {
64
65   /**
66    * Kept in a doubly-linked list.
67    */
68   struct GSF_PeerTransmitHandle *next;
69
70   /**
71    * Kept in a doubly-linked list.
72    */
73   struct GSF_PeerTransmitHandle *prev;
74
75   /**
76    * Time when this transmission request was issued.
77    */
78   struct GNUNET_TIME_Absolute transmission_request_start_time;
79
80   /**
81    * Timeout for this request.
82    */
83   struct GNUNET_TIME_Absolute timeout;
84
85   /**
86    * Task called on timeout, or 0 for none.
87    */
88   struct GNUNET_SCHEDULER_Task *timeout_task;
89
90   /**
91    * Function to call to get the actual message.
92    */
93   GSF_GetMessageCallback gmc;
94
95   /**
96    * Peer this request targets.
97    */
98   struct GSF_ConnectedPeer *cp;
99
100   /**
101    * Closure for @e gmc.
102    */
103   void *gmc_cls;
104
105   /**
106    * Size of the message to be transmitted.
107    */
108   size_t size;
109
110   /**
111    * #GNUNET_YES if this is a query, #GNUNET_NO for content.
112    */
113   int is_query;
114
115   /**
116    * Did we get a reservation already?
117    */
118   int was_reserved;
119
120   /**
121    * Priority of this request.
122    */
123   uint32_t priority;
124
125 };
126
127
128 /**
129  * Handle for an entry in our delay list.
130  */
131 struct GSF_DelayedHandle
132 {
133
134   /**
135    * Kept in a doubly-linked list.
136    */
137   struct GSF_DelayedHandle *next;
138
139   /**
140    * Kept in a doubly-linked list.
141    */
142   struct GSF_DelayedHandle *prev;
143
144   /**
145    * Peer this transmission belongs to.
146    */
147   struct GSF_ConnectedPeer *cp;
148
149   /**
150    * The PUT that was delayed.
151    */
152   struct PutMessage *pm;
153
154   /**
155    * Task for the delay.
156    */
157   struct GNUNET_SCHEDULER_Task *delay_task;
158
159   /**
160    * Size of the message.
161    */
162   size_t msize;
163
164 };
165
166
167 /**
168  * Information per peer and request.
169  */
170 struct PeerRequest
171 {
172
173   /**
174    * Handle to generic request (generic: from peer or local client).
175    */
176   struct GSF_PendingRequest *pr;
177
178   /**
179    * Which specific peer issued this request?
180    */
181   struct GSF_ConnectedPeer *cp;
182
183   /**
184    * Task for asynchronous stopping of this request.
185    */
186   struct GNUNET_SCHEDULER_Task *kill_task;
187
188 };
189
190
191 /**
192  * A connected peer.
193  */
194 struct GSF_ConnectedPeer
195 {
196
197   /**
198    * Performance data for this peer.
199    */
200   struct GSF_PeerPerformanceData ppd;
201
202   /**
203    * Time until when we blocked this peer from migrating
204    * data to us.
205    */
206   struct GNUNET_TIME_Absolute last_migration_block;
207
208   /**
209    * Task scheduled to revive migration to this peer.
210    */
211   struct GNUNET_SCHEDULER_Task *mig_revive_task;
212
213   /**
214    * Messages (replies, queries, content migration) we would like to
215    * send to this peer in the near future.  Sorted by priority, head.
216    */
217   struct GSF_PeerTransmitHandle *pth_head;
218
219   /**
220    * Messages (replies, queries, content migration) we would like to
221    * send to this peer in the near future.  Sorted by priority, tail.
222    */
223   struct GSF_PeerTransmitHandle *pth_tail;
224
225   /**
226    * Messages (replies, queries, content migration) we would like to
227    * send to this peer in the near future.  Sorted by priority, head.
228    */
229   struct GSF_DelayedHandle *delayed_head;
230
231   /**
232    * Messages (replies, queries, content migration) we would like to
233    * send to this peer in the near future.  Sorted by priority, tail.
234    */
235   struct GSF_DelayedHandle *delayed_tail;
236
237   /**
238    * Migration stop message in our queue, or NULL if we have none pending.
239    */
240   struct GSF_PeerTransmitHandle *migration_pth;
241
242   /**
243    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
244    */
245   struct GNUNET_ATS_ReservationContext *rc;
246
247   /**
248    * Task scheduled if we need to retry bandwidth reservation later.
249    */
250   struct GNUNET_SCHEDULER_Task *rc_delay_task;
251
252   /**
253    * Active requests from this neighbour, map of query to `struct PeerRequest`.
254    */
255   struct GNUNET_CONTAINER_MultiHashMap *request_map;
256
257   /**
258    * Handle for an active request for transmission to this
259    * peer, or NULL (if core queue was full).
260    */
261   struct GNUNET_CORE_TransmitHandle *cth;
262
263   /**
264    * Increase in traffic preference still to be submitted
265    * to the core service for this peer.
266    */
267   uint64_t inc_preference;
268
269   /**
270    * Set to 1 if we're currently in the process of calling
271    * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
272    * NULL, we should not call notify_transmit_ready for this
273    * handle right now).
274    */
275   unsigned int cth_in_progress;
276
277   /**
278    * Number of entries in @e delayed_head DLL.
279    */
280   unsigned int delay_queue_size;
281
282   /**
283    * Respect rating for this peer on disk.
284    */
285   uint32_t disk_respect;
286
287   /**
288    * Which offset in @e last_p2p_replies will be updated next?
289    * (we go round-robin).
290    */
291   unsigned int last_p2p_replies_woff;
292
293   /**
294    * Which offset in @e last_client_replies will be updated next?
295    * (we go round-robin).
296    */
297   unsigned int last_client_replies_woff;
298
299   /**
300    * Current offset into @e last_request_times ring buffer.
301    */
302   unsigned int last_request_times_off;
303
304   /**
305    * #GNUNET_YES if we did successfully reserve 32k bandwidth,
306    * #GNUNET_NO if not.
307    */
308   int did_reserve;
309
310   /**
311    * Function called when the creation of this record is complete.
312    */
313   GSF_ConnectedPeerCreationCallback creation_cb;
314
315   /**
316    * Closure for @e creation_cb
317    */
318   void *creation_cb_cls;
319
320   /**
321    * Handle to the PEERSTORE iterate request for peer respect value
322    */
323   struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
324
325 };
326
327
328 /**
329  * Map from peer identities to `struct GSF_ConnectPeer` entries.
330  */
331 static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
332
333 /**
334  * Handle to peerstore service.
335  */
336 static struct GNUNET_PEERSTORE_Handle *peerstore;
337
338
339 /**
340  * Update the latency information kept for the given peer.
341  *
342  * @param id peer record to update
343  * @param latency current latency value
344  */
345 void
346 GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
347                           struct GNUNET_TIME_Relative latency)
348 {
349   struct GSF_ConnectedPeer *cp;
350
351   cp = GSF_peer_get_ (id);
352   if (NULL == cp)
353     return; /* we're not yet connected at the core level, ignore */
354   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
355                                  latency);
356 }
357
358
359 /**
360  * Return the performance data record for the given peer
361  *
362  * @param cp peer to query
363  * @return performance data record for the peer
364  */
365 struct GSF_PeerPerformanceData *
366 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
367 {
368   return &cp->ppd;
369 }
370
371
372 /**
373  * Core is ready to transmit to a peer, get the message.
374  *
375  * @param cls the `struct GSF_PeerTransmitHandle` of the message
376  * @param size number of bytes core is willing to take
377  * @param buf where to copy the message
378  * @return number of bytes copied to @a buf
379  */
380 static size_t
381 peer_transmit_ready_cb (void *cls,
382                         size_t size,
383                         void *buf);
384
385
386 /**
387  * Function called by core upon success or failure of our bandwidth reservation request.
388  *
389  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
390  * @param peer identifies the peer
391  * @param amount set to the amount that was actually reserved or unreserved;
392  *               either the full requested amount or zero (no partial reservations)
393  * @param res_delay if the reservation could not be satisfied (amount was 0), how
394  *        long should the client wait until re-trying?
395  */
396 static void
397 ats_reserve_callback (void *cls,
398                       const struct GNUNET_PeerIdentity *peer,
399                       int32_t amount,
400                       struct GNUNET_TIME_Relative res_delay);
401
402
403 /**
404  * If ready (bandwidth reserved), try to schedule transmission via
405  * core for the given handle.
406  *
407  * @param pth transmission handle to schedule
408  */
409 static void
410 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
411 {
412   struct GSF_ConnectedPeer *cp;
413   struct GNUNET_PeerIdentity target;
414
415   cp = pth->cp;
416   if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
417     return;                     /* already done */
418   GNUNET_assert (0 != cp->ppd.pid);
419   GNUNET_PEER_resolve (cp->ppd.pid, &target);
420
421   if (0 != cp->inc_preference)
422   {
423     GNUNET_ATS_performance_change_preference (GSF_ats,
424                                               &target,
425                                               GNUNET_ATS_PREFERENCE_BANDWIDTH,
426                                               (double) cp->inc_preference,
427                                               GNUNET_ATS_PREFERENCE_END);
428     cp->inc_preference = 0;
429   }
430
431   if ( (GNUNET_YES == pth->is_query) &&
432        (GNUNET_YES != pth->was_reserved) )
433   {
434     /* query, need reservation */
435     if (GNUNET_YES != cp->did_reserve)
436       return;                   /* not ready */
437     cp->did_reserve = GNUNET_NO;
438     /* reservation already done! */
439     pth->was_reserved = GNUNET_YES;
440     cp->rc = GNUNET_ATS_reserve_bandwidth (GSF_ats,
441                                            &target,
442                                            DBLOCK_SIZE,
443                                            &ats_reserve_callback,
444                                            cp);
445     return;
446   }
447   GNUNET_assert (NULL == cp->cth);
448   cp->cth_in_progress++;
449   cp->cth =
450     GNUNET_CORE_notify_transmit_ready (GSF_core,
451                                        GNUNET_YES,
452                                        GNUNET_CORE_PRIO_BACKGROUND,
453                                        GNUNET_TIME_absolute_get_remaining (pth->timeout),
454                                        &target,
455                                        pth->size,
456                                        &peer_transmit_ready_cb, cp);
457   GNUNET_assert (NULL != cp->cth);
458   GNUNET_assert (0 < cp->cth_in_progress--);
459 }
460
461
462 /**
463  * Core is ready to transmit to a peer, get the message.
464  *
465  * @param cls the `struct GSF_PeerTransmitHandle` of the message
466  * @param size number of bytes core is willing to take
467  * @param buf where to copy the message
468  * @return number of bytes copied to @a buf
469  */
470 static size_t
471 peer_transmit_ready_cb (void *cls,
472                         size_t size,
473                         void *buf)
474 {
475   struct GSF_ConnectedPeer *cp = cls;
476   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
477   struct GSF_PeerTransmitHandle *pos;
478   size_t ret;
479
480   cp->cth = NULL;
481   if (NULL == pth)
482     return 0;
483   if (pth->size > size)
484   {
485     schedule_transmission (pth);
486     return 0;
487   }
488   if (NULL != pth->timeout_task)
489   {
490     GNUNET_SCHEDULER_cancel (pth->timeout_task);
491     pth->timeout_task = NULL;
492   }
493   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
494                                cp->pth_tail,
495                                pth);
496   if (GNUNET_YES == pth->is_query)
497   {
498     cp->ppd.last_request_times[(cp->last_request_times_off++) %
499                                MAX_QUEUE_PER_PEER] =
500       GNUNET_TIME_absolute_get ();
501     GNUNET_assert (0 < cp->ppd.pending_queries--);
502   }
503   else if (GNUNET_NO == pth->is_query)
504   {
505     GNUNET_assert (0 < cp->ppd.pending_replies--);
506   }
507   GNUNET_LOAD_update (cp->ppd.transmission_delay,
508                       GNUNET_TIME_absolute_get_duration
509                       (pth->transmission_request_start_time).rel_value_us);
510   ret = pth->gmc (pth->gmc_cls, size, buf);
511   if (NULL != (pos = cp->pth_head))
512   {
513     GNUNET_assert (pos != pth);
514     schedule_transmission (pos);
515   }
516   GNUNET_free (pth);
517   return ret;
518 }
519
520
521 /**
522  * (re)try to reserve bandwidth from the given peer.
523  *
524  * @param cls the `struct GSF_ConnectedPeer` to reserve from
525  * @param tc scheduler context
526  */
527 static void
528 retry_reservation (void *cls,
529                    const struct GNUNET_SCHEDULER_TaskContext *tc)
530 {
531   struct GSF_ConnectedPeer *cp = cls;
532   struct GNUNET_PeerIdentity target;
533
534   GNUNET_PEER_resolve (cp->ppd.pid, &target);
535   cp->rc_delay_task = NULL;
536   cp->rc =
537     GNUNET_ATS_reserve_bandwidth (GSF_ats,
538                                   &target,
539                                   DBLOCK_SIZE,
540                                   &ats_reserve_callback, cp);
541 }
542
543
544 /**
545  * Function called by core upon success or failure of our bandwidth reservation request.
546  *
547  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
548  * @param peer identifies the peer
549  * @param amount set to the amount that was actually reserved or unreserved;
550  *               either the full requested amount or zero (no partial reservations)
551  * @param res_delay if the reservation could not be satisfied (amount was 0), how
552  *        long should the client wait until re-trying?
553  */
554 static void
555 ats_reserve_callback (void *cls,
556                       const struct GNUNET_PeerIdentity *peer,
557                       int32_t amount,
558                       struct GNUNET_TIME_Relative res_delay)
559 {
560   struct GSF_ConnectedPeer *cp = cls;
561   struct GSF_PeerTransmitHandle *pth;
562
563   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
564               "Reserved %d bytes / need to wait %s for reservation\n",
565               (int) amount,
566               GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
567   cp->rc = NULL;
568   if (0 == amount)
569   {
570     cp->rc_delay_task =
571         GNUNET_SCHEDULER_add_delayed (res_delay,
572                                       &retry_reservation,
573                                       cp);
574     return;
575   }
576   cp->did_reserve = GNUNET_YES;
577   pth = cp->pth_head;
578   if ( (NULL != pth) &&
579        (NULL == cp->cth) &&
580        (0 == cp->cth_in_progress) )
581   {
582     /* reservation success, try transmission now! */
583     cp->cth_in_progress++;
584     cp->cth =
585         GNUNET_CORE_notify_transmit_ready (GSF_core,
586                                            GNUNET_YES,
587                                            GNUNET_CORE_PRIO_BACKGROUND,
588                                            GNUNET_TIME_absolute_get_remaining (pth->timeout),
589                                            peer,
590                                            pth->size,
591                                            &peer_transmit_ready_cb,
592                                            cp);
593     GNUNET_assert (NULL != cp->cth);
594     GNUNET_assert (0 < cp->cth_in_progress--);
595   }
596 }
597
598
599 /**
600  * Function called by PEERSTORE with peer respect record
601  *
602  * @param cls handle to connected peer entry
603  * @param record peerstore record information
604  * @param emsg error message, or NULL if no errors
605  * @return #GNUNET_NO to stop iterating since we only expect 0 or 1 records
606  */
607 static int
608 peer_respect_cb (void *cls,
609                  const struct GNUNET_PEERSTORE_Record *record,
610                  const char *emsg)
611 {
612   struct GSF_ConnectedPeer *cp = cls;
613
614   GNUNET_assert (NULL != cp->respect_iterate_req);
615   cp->respect_iterate_req = NULL;
616   if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size))
617     cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value);
618   GSF_push_start_ (cp);
619   if (NULL != cp->creation_cb)
620     cp->creation_cb (cp->creation_cb_cls, cp);
621   return GNUNET_NO;
622 }
623
624
625 /**
626  * A peer connected to us.  Setup the connected peer
627  * records.
628  *
629  * @param peer identity of peer that connected
630  * @param creation_cb callback function when the record is created.
631  * @param creation_cb_cls closure for @creation_cb
632  */
633 void
634 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
635                            GSF_ConnectedPeerCreationCallback creation_cb,
636                            void *creation_cb_cls)
637 {
638   struct GSF_ConnectedPeer *cp;
639
640   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
641               "Connected to peer %s\n",
642               GNUNET_i2s (peer));
643   cp = GNUNET_new (struct GSF_ConnectedPeer);
644   cp->ppd.pid = GNUNET_PEER_intern (peer);
645   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
646   cp->rc =
647       GNUNET_ATS_reserve_bandwidth (GSF_ats,
648                                     peer,
649                                     DBLOCK_SIZE,
650                                     &ats_reserve_callback, cp);
651   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128,
652                                                           GNUNET_YES);
653   GNUNET_break (GNUNET_OK ==
654                 GNUNET_CONTAINER_multipeermap_put (cp_map,
655                GSF_connected_peer_get_identity2_ (cp),
656                                                    cp,
657                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
658   GNUNET_STATISTICS_set (GSF_stats,
659                          gettext_noop ("# peers connected"),
660                          GNUNET_CONTAINER_multipeermap_size (cp_map),
661                          GNUNET_NO);
662   cp->creation_cb = creation_cb;
663   cp->creation_cb_cls = creation_cb_cls;
664   cp->respect_iterate_req =
665       GNUNET_PEERSTORE_iterate (peerstore, "fs",
666                                 peer, "respect",
667                                 GNUNET_TIME_UNIT_FOREVER_REL,
668                                 &peer_respect_cb,
669                                 cp);
670 }
671
672
673 /**
674  * It may be time to re-start migrating content to this
675  * peer.  Check, and if so, restart migration.
676  *
677  * @param cls the `struct GSF_ConnectedPeer`
678  * @param tc scheduler context
679  */
680 static void
681 revive_migration (void *cls,
682                   const struct GNUNET_SCHEDULER_TaskContext *tc)
683 {
684   struct GSF_ConnectedPeer *cp = cls;
685   struct GNUNET_TIME_Relative bt;
686
687   cp->mig_revive_task = NULL;
688   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
689   if (0 != bt.rel_value_us)
690   {
691     /* still time left... */
692     cp->mig_revive_task =
693         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
694     return;
695   }
696   GSF_push_start_ (cp);
697 }
698
699
700 /**
701  * Get a handle for a connected peer.
702  *
703  * @param peer peer's identity
704  * @return NULL if the peer is not currently connected
705  */
706 struct GSF_ConnectedPeer *
707 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
708 {
709   if (NULL == cp_map)
710     return NULL;
711   return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
712 }
713
714
715 /**
716  * Handle P2P "MIGRATION_STOP" message.
717  *
718  * @param cls closure, always NULL
719  * @param other the other peer involved (sender or receiver, NULL
720  *        for loopback messages where we are both sender and receiver)
721  * @param message the actual message
722  * @return #GNUNET_OK to keep the connection open,
723  *         #GNUNET_SYSERR to close it (signal serious error)
724  */
725 int
726 GSF_handle_p2p_migration_stop_ (void *cls,
727                                 const struct GNUNET_PeerIdentity *other,
728                                 const struct GNUNET_MessageHeader *message)
729 {
730   struct GSF_ConnectedPeer *cp;
731   const struct MigrationStopMessage *msm;
732   struct GNUNET_TIME_Relative bt;
733
734   msm = (const struct MigrationStopMessage *) message;
735   cp = GSF_peer_get_ (other);
736   if (NULL == cp)
737   {
738     GNUNET_break (0);
739     return GNUNET_OK;
740   }
741   GNUNET_STATISTICS_update (GSF_stats,
742                             gettext_noop ("# migration stop messages received"),
743                             1, GNUNET_NO);
744   bt = GNUNET_TIME_relative_ntoh (msm->duration);
745   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
746               _("Migration of content to peer `%s' blocked for %s\n"),
747               GNUNET_i2s (other),
748               GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
749   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
750   if ( (NULL == cp->mig_revive_task) &&
751        (NULL == cp->respect_iterate_req) )
752   {
753     GSF_push_stop_ (cp);
754     cp->mig_revive_task =
755         GNUNET_SCHEDULER_add_delayed (bt,
756                                       &revive_migration, cp);
757   }
758   return GNUNET_OK;
759 }
760
761
762 /**
763  * Copy reply and free put message.
764  *
765  * @param cls the `struct PutMessage`
766  * @param buf_size number of bytes available in @a buf
767  * @param buf where to copy the message, NULL on error (peer disconnect)
768  * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
769  */
770 static size_t
771 copy_reply (void *cls,
772             size_t buf_size,
773             void *buf)
774 {
775   struct PutMessage *pm = cls;
776   size_t size;
777
778   if (NULL != buf)
779   {
780     GNUNET_assert (buf_size >= ntohs (pm->header.size));
781     size = ntohs (pm->header.size);
782     memcpy (buf, pm, size);
783     GNUNET_STATISTICS_update (GSF_stats,
784                               gettext_noop ("# replies transmitted to other peers"),
785                               1,
786                               GNUNET_NO);
787   }
788   else
789   {
790     size = 0;
791     GNUNET_STATISTICS_update (GSF_stats,
792                               gettext_noop ("# replies dropped"),
793                               1,
794                               GNUNET_NO);
795   }
796   GNUNET_free (pm);
797   return size;
798 }
799
800
801 /**
802  * Free resources associated with the given peer request.
803  *
804  * @param peerreq request to free
805  */
806 static void
807 free_pending_request (struct PeerRequest *peerreq)
808 {
809   struct GSF_ConnectedPeer *cp = peerreq->cp;
810   struct GSF_PendingRequestData *prd;
811
812   prd = GSF_pending_request_get_data_ (peerreq->pr);
813   if (NULL != peerreq->kill_task)
814   {
815     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
816     peerreq->kill_task = NULL;
817   }
818   GNUNET_STATISTICS_update (GSF_stats,
819                             gettext_noop ("# P2P searches active"),
820                             -1,
821                             GNUNET_NO);
822   GNUNET_break (GNUNET_YES ==
823                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
824                                                       &prd->query,
825                                                       peerreq));
826   GNUNET_free (peerreq);
827 }
828
829
830 /**
831  * Cancel all requests associated with the peer.
832  *
833  * @param cls unused
834  * @param query hash code of the request
835  * @param value the `struct GSF_PendingRequest`
836  * @return #GNUNET_YES (continue to iterate)
837  */
838 static int
839 cancel_pending_request (void *cls,
840                         const struct GNUNET_HashCode *query,
841                         void *value)
842 {
843   struct PeerRequest *peerreq = value;
844   struct GSF_PendingRequest *pr = peerreq->pr;
845
846   free_pending_request (peerreq);
847   GSF_pending_request_cancel_ (pr,
848                                GNUNET_NO);
849   return GNUNET_OK;
850 }
851
852
853 /**
854  * Free the given request.
855  *
856  * @param cls the request to free
857  * @param tc task context
858  */
859 static void
860 peer_request_destroy (void *cls,
861                       const struct GNUNET_SCHEDULER_TaskContext *tc)
862 {
863   struct PeerRequest *peerreq = cls;
864   struct GSF_PendingRequest *pr = peerreq->pr;
865   struct GSF_PendingRequestData *prd;
866
867   peerreq->kill_task = NULL;
868   prd = GSF_pending_request_get_data_ (pr);
869   cancel_pending_request (NULL,
870                           &prd->query,
871                           peerreq);
872 }
873
874
875 /**
876  * The artificial delay is over, transmit the message now.
877  *
878  * @param cls the `struct GSF_DelayedHandle` with the message
879  * @param tc scheduler context
880  */
881 static void
882 transmit_delayed_now (void *cls,
883                       const struct GNUNET_SCHEDULER_TaskContext *tc)
884 {
885   struct GSF_DelayedHandle *dh = cls;
886   struct GSF_ConnectedPeer *cp = dh->cp;
887
888   GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
889                                cp->delayed_tail,
890                                dh);
891   cp->delay_queue_size--;
892   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
893   {
894     GNUNET_free (dh->pm);
895     GNUNET_free (dh);
896     return;
897   }
898   (void) GSF_peer_transmit_ (cp,
899                              GNUNET_NO,
900                              UINT32_MAX,
901                              REPLY_TIMEOUT,
902                              dh->msize,
903                              &copy_reply,
904                              dh->pm);
905   GNUNET_free (dh);
906 }
907
908
909 /**
910  * Get the randomized delay a response should be subjected to.
911  *
912  * @return desired delay
913  */
914 static struct GNUNET_TIME_Relative
915 get_randomized_delay ()
916 {
917   struct GNUNET_TIME_Relative ret;
918
919   ret =
920       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
921                                      GNUNET_CRYPTO_random_u32
922                                      (GNUNET_CRYPTO_QUALITY_WEAK,
923                                       2 * GSF_avg_latency.rel_value_us + 1));
924 #if INSANE_STATISTICS
925   GNUNET_STATISTICS_update (GSF_stats,
926                             gettext_noop
927                             ("# artificial delays introduced (ms)"),
928                             ret.rel_value_us / 1000LL, GNUNET_NO);
929 #endif
930   return ret;
931 }
932
933
934 /**
935  * Handle a reply to a pending request.  Also called if a request
936  * expires (then with data == NULL).  The handler may be called
937  * many times (depending on the request type), but will not be
938  * called during or after a call to GSF_pending_request_cancel
939  * and will also not be called anymore after a call signalling
940  * expiration.
941  *
942  * @param cls `struct PeerRequest` this is an answer for
943  * @param eval evaluation of the result
944  * @param pr handle to the original pending request
945  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
946  * @param expiration when does @a data expire?
947  * @param last_transmission when did we last transmit a request for this block
948  * @param type type of the block
949  * @param data response data, NULL on request expiration
950  * @param data_len number of bytes in @a data
951  */
952 static void
953 handle_p2p_reply (void *cls,
954                   enum GNUNET_BLOCK_EvaluationResult eval,
955                   struct GSF_PendingRequest *pr,
956                   uint32_t reply_anonymity_level,
957                   struct GNUNET_TIME_Absolute expiration,
958                   struct GNUNET_TIME_Absolute last_transmission,
959                   enum GNUNET_BLOCK_Type type,
960                   const void *data,
961                   size_t data_len)
962 {
963   struct PeerRequest *peerreq = cls;
964   struct GSF_ConnectedPeer *cp = peerreq->cp;
965   struct GSF_PendingRequestData *prd;
966   struct PutMessage *pm;
967   size_t msize;
968
969   GNUNET_assert (data_len + sizeof (struct PutMessage) <
970                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
971   GNUNET_assert (peerreq->pr == pr);
972   prd = GSF_pending_request_get_data_ (pr);
973   if (NULL == data)
974   {
975     free_pending_request (peerreq);
976     return;
977   }
978   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
979   if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
980   {
981     GNUNET_STATISTICS_update (GSF_stats,
982                               gettext_noop
983                               ("# replies dropped due to type mismatch"),
984                                 1, GNUNET_NO);
985     return;
986   }
987   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
988               "Transmitting result for query `%s' to peer\n",
989               GNUNET_h2s (&prd->query));
990   GNUNET_STATISTICS_update (GSF_stats,
991                             gettext_noop ("# replies received for other peers"),
992                             1, GNUNET_NO);
993   msize = sizeof (struct PutMessage) + data_len;
994   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
995   {
996     GNUNET_break (0);
997     return;
998   }
999   if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
1000   {
1001     if (reply_anonymity_level - 1 > GSF_cover_content_count)
1002     {
1003       GNUNET_STATISTICS_update (GSF_stats,
1004                                 gettext_noop
1005                                 ("# replies dropped due to insufficient cover traffic"),
1006                                 1, GNUNET_NO);
1007       return;
1008     }
1009     GSF_cover_content_count -= (reply_anonymity_level - 1);
1010   }
1011
1012   pm = GNUNET_malloc (msize);
1013   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1014   pm->header.size = htons (msize);
1015   pm->type = htonl (type);
1016   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
1017   memcpy (&pm[1], data, data_len);
1018   if ( (UINT32_MAX != reply_anonymity_level) &&
1019        (0 != reply_anonymity_level) &&
1020        (GNUNET_YES == GSF_enable_randomized_delays) )
1021   {
1022     struct GSF_DelayedHandle *dh;
1023
1024     dh = GNUNET_new (struct GSF_DelayedHandle);
1025     dh->cp = cp;
1026     dh->pm = pm;
1027     dh->msize = msize;
1028     GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
1029                                  cp->delayed_tail,
1030                                  dh);
1031     cp->delay_queue_size++;
1032     dh->delay_task =
1033         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
1034                                       &transmit_delayed_now,
1035                                       dh);
1036   }
1037   else
1038   {
1039     (void) GSF_peer_transmit_ (cp,
1040                                GNUNET_NO,
1041                                UINT32_MAX,
1042                                REPLY_TIMEOUT,
1043                                msize,
1044                                &copy_reply,
1045                                pm);
1046   }
1047   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
1048     return;
1049   if (NULL == peerreq->kill_task)
1050   {
1051     GNUNET_STATISTICS_update (GSF_stats,
1052                               gettext_noop
1053                               ("# P2P searches destroyed due to ultimate reply"),
1054                               1,
1055                               GNUNET_NO);
1056     peerreq->kill_task =
1057         GNUNET_SCHEDULER_add_now (&peer_request_destroy,
1058                                   peerreq);
1059   }
1060 }
1061
1062
1063 /**
1064  * Increase the peer's respect by a value.
1065  *
1066  * @param cp which peer to change the respect value on
1067  * @param value is the int value by which the
1068  *  peer's credit is to be increased or decreased
1069  * @returns the actual change in respect (positive or negative)
1070  */
1071 static int
1072 change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
1073 {
1074   if (0 == value)
1075     return 0;
1076   GNUNET_assert (NULL != cp);
1077   if (value > 0)
1078   {
1079     if (cp->ppd.respect + value < cp->ppd.respect)
1080     {
1081       value = UINT32_MAX - cp->ppd.respect;
1082       cp->ppd.respect = UINT32_MAX;
1083     }
1084     else
1085       cp->ppd.respect += value;
1086   }
1087   else
1088   {
1089     if (cp->ppd.respect < -value)
1090     {
1091       value = -cp->ppd.respect;
1092       cp->ppd.respect = 0;
1093     }
1094     else
1095       cp->ppd.respect += value;
1096   }
1097   return value;
1098 }
1099
1100
1101 /**
1102  * We've received a request with the specified priority.  Bound it
1103  * according to how much we respect the given peer.
1104  *
1105  * @param prio_in requested priority
1106  * @param cp the peer making the request
1107  * @return effective priority
1108  */
1109 static int32_t
1110 bound_priority (uint32_t prio_in,
1111                 struct GSF_ConnectedPeer *cp)
1112 {
1113 #define N ((double)128.0)
1114   uint32_t ret;
1115   double rret;
1116   int ld;
1117
1118   ld = GSF_test_get_load_too_high_ (0);
1119   if (GNUNET_SYSERR == ld)
1120   {
1121 #if INSANE_STATISTICS
1122     GNUNET_STATISTICS_update (GSF_stats,
1123                               gettext_noop
1124                               ("# requests done for free (low load)"), 1,
1125                               GNUNET_NO);
1126 #endif
1127     return 0;                   /* excess resources */
1128   }
1129   if (prio_in > INT32_MAX)
1130     prio_in = INT32_MAX;
1131   ret = -change_peer_respect (cp, -(int) prio_in);
1132   if (ret > 0)
1133   {
1134     if (ret > GSF_current_priorities + N)
1135       rret = GSF_current_priorities + N;
1136     else
1137       rret = ret;
1138     GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1139   }
1140   if ((GNUNET_YES == ld) && (ret > 0))
1141   {
1142     /* try with charging */
1143     ld = GSF_test_get_load_too_high_ (ret);
1144   }
1145   if (GNUNET_YES == ld)
1146   {
1147     GNUNET_STATISTICS_update (GSF_stats,
1148                               gettext_noop
1149                               ("# request dropped, priority insufficient"), 1,
1150                               GNUNET_NO);
1151     /* undo charge */
1152     change_peer_respect (cp, (int) ret);
1153     return -1;                  /* not enough resources */
1154   }
1155   else
1156   {
1157     GNUNET_STATISTICS_update (GSF_stats,
1158                               gettext_noop
1159                               ("# requests done for a price (normal load)"), 1,
1160                               GNUNET_NO);
1161   }
1162 #undef N
1163   return ret;
1164 }
1165
1166
1167 /**
1168  * The priority level imposes a bound on the maximum
1169  * value for the ttl that can be requested.
1170  *
1171  * @param ttl_in requested ttl
1172  * @param prio given priority
1173  * @return @a ttl_in if @a ttl_in is below the limit,
1174  *         otherwise the ttl-limit for the given @a prio
1175  */
1176 static int32_t
1177 bound_ttl (int32_t ttl_in,
1178            uint32_t prio)
1179 {
1180   unsigned long long allowed;
1181
1182   if (ttl_in <= 0)
1183     return ttl_in;
1184   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1185   if (ttl_in > allowed)
1186   {
1187     if (allowed >= (1 << 30))
1188       return 1 << 30;
1189     return allowed;
1190   }
1191   return ttl_in;
1192 }
1193
1194
1195 /**
1196  * Closure for #test_exist_cb().
1197  */
1198 struct TestExistClosure
1199 {
1200
1201   /**
1202    * Priority of the incoming request.
1203    */
1204   int32_t priority;
1205
1206   /**
1207    * Relative TTL of the incoming request.
1208    */
1209   int32_t ttl;
1210
1211   /**
1212    * Type of the incoming request.
1213    */
1214   enum GNUNET_BLOCK_Type type;
1215
1216   /**
1217    * Set to #GNUNET_YES if we are done handling the query.
1218    */
1219   int finished;
1220
1221 };
1222
1223
1224 /**
1225  * Test if the query already exists.  If so, merge it, otherwise
1226  * keep `finished` at #GNUNET_NO.
1227  *
1228  * @param cls our `struct TestExistClosure`
1229  * @param hc the key of the query
1230  * @param value the existing `struct PeerRequest`.
1231  * @return #GNUNET_YES to continue to iterate,
1232  *         #GNUNET_NO if we successfully merged
1233  */
1234 static int
1235 test_exist_cb (void *cls,
1236                const struct GNUNET_HashCode *hc,
1237                void *value)
1238 {
1239   struct TestExistClosure *tec = cls;
1240   struct PeerRequest *peerreq = value;
1241   struct GSF_PendingRequest *pr;
1242   struct GSF_PendingRequestData *prd;
1243
1244   pr = peerreq->pr;
1245   prd = GSF_pending_request_get_data_ (pr);
1246   if (prd->type != tec->type)
1247     return GNUNET_YES;
1248   if (prd->ttl.abs_value_us >=
1249       GNUNET_TIME_absolute_get ().abs_value_us + tec->ttl * 1000LL)
1250   {
1251     /* existing request has higher TTL, drop new one! */
1252     prd->priority += tec->priority;
1253     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1254                 "Have existing request with higher TTL, dropping new request.\n");
1255     GNUNET_STATISTICS_update (GSF_stats,
1256                               gettext_noop
1257                               ("# requests dropped due to higher-TTL request"),
1258                               1, GNUNET_NO);
1259     tec->finished = GNUNET_YES;
1260     return GNUNET_NO;
1261   }
1262   /* existing request has lower TTL, drop old one! */
1263   tec->priority += prd->priority;
1264   free_pending_request (peerreq);
1265   GSF_pending_request_cancel_ (pr,
1266                                GNUNET_YES);
1267   return GNUNET_NO;
1268 }
1269
1270
1271 /**
1272  * Handle P2P "QUERY" message.  Creates the pending request entry
1273  * and sets up all of the data structures to that we will
1274  * process replies properly.  Does not initiate forwarding or
1275  * local database lookups.
1276  *
1277  * @param other the other peer involved (sender or receiver, NULL
1278  *        for loopback messages where we are both sender and receiver)
1279  * @param message the actual message
1280  * @return pending request handle, NULL on error
1281  */
1282 struct GSF_PendingRequest *
1283 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1284                        const struct GNUNET_MessageHeader *message)
1285 {
1286   struct PeerRequest *peerreq;
1287   struct GSF_PendingRequest *pr;
1288   struct GSF_ConnectedPeer *cp;
1289   struct GSF_ConnectedPeer *cps;
1290   const struct GNUNET_PeerIdentity *target;
1291   enum GSF_PendingRequestOptions options;
1292   uint16_t msize;
1293   const struct GetMessage *gm;
1294   unsigned int bits;
1295   const struct GNUNET_PeerIdentity *opt;
1296   uint32_t bm;
1297   size_t bfsize;
1298   uint32_t ttl_decrement;
1299   struct TestExistClosure tec;
1300   GNUNET_PEER_Id spid;
1301   const struct GSF_PendingRequestData *prd;
1302
1303   msize = ntohs (message->size);
1304   if (msize < sizeof (struct GetMessage))
1305   {
1306     GNUNET_break_op (0);
1307     return NULL;
1308   }
1309   GNUNET_STATISTICS_update (GSF_stats,
1310                             gettext_noop
1311                             ("# GET requests received (from other peers)"),
1312                             1,
1313                             GNUNET_NO);
1314   gm = (const struct GetMessage *) message;
1315   tec.type = ntohl (gm->type);
1316   bm = ntohl (gm->hash_bitmap);
1317   bits = 0;
1318   while (bm > 0)
1319   {
1320     if (1 == (bm & 1))
1321       bits++;
1322     bm >>= 1;
1323   }
1324   if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
1325   {
1326     GNUNET_break_op (0);
1327     return NULL;
1328   }
1329   opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1330   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
1331   /* bfsize must be power of 2, check! */
1332   if (0 != ((bfsize - 1) & bfsize))
1333   {
1334     GNUNET_break_op (0);
1335     return NULL;
1336   }
1337   GSF_cover_query_count++;
1338   bm = ntohl (gm->hash_bitmap);
1339   bits = 0;
1340   cps = GSF_peer_get_ (other);
1341   if (NULL == cps)
1342   {
1343     /* peer must have just disconnected */
1344     GNUNET_STATISTICS_update (GSF_stats,
1345                               gettext_noop
1346                               ("# requests dropped due to initiator not being connected"),
1347                               1, GNUNET_NO);
1348     return NULL;
1349   }
1350   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1351     cp = GSF_peer_get_ (&opt[bits++]);
1352   else
1353     cp = cps;
1354   if (NULL == cp)
1355   {
1356     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1357       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1358                   "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1359                   GNUNET_i2s (&opt[bits - 1]));
1360
1361     else
1362       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1363                   "Failed to find peer `%s' in connection set. Dropping query.\n",
1364                   GNUNET_i2s (other));
1365     GNUNET_STATISTICS_update (GSF_stats,
1366                               gettext_noop
1367                               ("# requests dropped due to missing reverse route"),
1368                               1,
1369                               GNUNET_NO);
1370     return NULL;
1371   }
1372   if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER)
1373   {
1374     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1375                 "Peer `%s' has too many replies queued already. Dropping query.\n",
1376                 GNUNET_i2s (other));
1377     GNUNET_STATISTICS_update (GSF_stats,
1378                               gettext_noop ("# requests dropped due to full reply queue"),
1379                               1,
1380                               GNUNET_NO);
1381     return NULL;
1382   }
1383   /* note that we can really only check load here since otherwise
1384    * peers could find out that we are overloaded by not being
1385    * disconnected after sending us a malformed query... */
1386   tec.priority = bound_priority (ntohl (gm->priority),
1387                                  cps);
1388   if (tec.priority < 0)
1389   {
1390     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1391                 "Dropping query from `%s', this peer is too busy.\n",
1392                 GNUNET_i2s (other));
1393     return NULL;
1394   }
1395   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1396               "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1397               GNUNET_h2s (&gm->query),
1398               (unsigned int) tec.type,
1399               GNUNET_i2s (other),
1400               (unsigned int) bm);
1401   target =
1402       (0 !=
1403        (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1404   options = GSF_PRO_DEFAULTS;
1405   spid = 0;
1406   if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + tec.priority))
1407       || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1408           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
1409           GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1410   {
1411     /* don't have BW to send to peer, or would likely take longer than we have for it,
1412      * so at best indirect the query */
1413     tec.priority = 0;
1414     options |= GSF_PRO_FORWARD_ONLY;
1415     spid = GNUNET_PEER_intern (other);
1416     GNUNET_assert (0 != spid);
1417   }
1418   tec.ttl = bound_ttl (ntohl (gm->ttl),
1419                        tec.priority);
1420   /* decrement ttl (always) */
1421   ttl_decrement =
1422       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1423                                                     TTL_DECREMENT);
1424   if ((tec.ttl < 0) && (((int32_t) (tec.ttl - ttl_decrement)) > 0))
1425   {
1426     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1427                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1428                 GNUNET_i2s (other),
1429                 tec.ttl,
1430                 ttl_decrement);
1431     GNUNET_STATISTICS_update (GSF_stats,
1432                               gettext_noop
1433                               ("# requests dropped due TTL underflow"), 1,
1434                               GNUNET_NO);
1435     /* integer underflow => drop (should be very rare)! */
1436     return NULL;
1437   }
1438   tec.ttl -= ttl_decrement;
1439
1440   /* test if the request already exists */
1441   tec.finished = GNUNET_NO;
1442   GNUNET_CONTAINER_multihashmap_get_multiple (cp->request_map,
1443                                               &gm->query,
1444                                               &test_exist_cb,
1445                                               &tec);
1446   if (GNUNET_YES == tec.finished)
1447     return NULL; /* merged into existing request, we're done */
1448
1449   peerreq = GNUNET_new (struct PeerRequest);
1450   peerreq->cp = cp;
1451   pr = GSF_pending_request_create_ (options,
1452                                     tec.type,
1453                                     &gm->query,
1454                                     target,
1455                                     (bfsize > 0)
1456                                     ? (const char *) &opt[bits]
1457                                     : NULL,
1458                                     bfsize,
1459                                     ntohl (gm->filter_mutator),
1460                                     1 /* anonymity */,
1461                                     (uint32_t) tec.priority,
1462                                     tec.ttl,
1463                                     spid,
1464                                     GNUNET_PEER_intern (other),
1465                                     NULL, 0,        /* replies_seen */
1466                                     &handle_p2p_reply,
1467                                     peerreq);
1468   GNUNET_assert (NULL != pr);
1469   prd = GSF_pending_request_get_data_ (pr);
1470   peerreq->pr = pr;
1471   GNUNET_break (GNUNET_OK ==
1472                 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1473                                                    &prd->query,
1474                                                    peerreq,
1475                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1476   GNUNET_STATISTICS_update (GSF_stats,
1477                             gettext_noop ("# P2P query messages received and processed"),
1478                             1,
1479                             GNUNET_NO);
1480   GNUNET_STATISTICS_update (GSF_stats,
1481                             gettext_noop ("# P2P searches active"),
1482                             1,
1483                             GNUNET_NO);
1484   return pr;
1485 }
1486
1487
1488 /**
1489  * Function called if there has been a timeout trying to satisfy
1490  * a transmission request.
1491  *
1492  * @param cls the `struct GSF_PeerTransmitHandle` of the request
1493  * @param tc scheduler context
1494  */
1495 static void
1496 peer_transmit_timeout (void *cls,
1497                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1498 {
1499   struct GSF_PeerTransmitHandle *pth = cls;
1500   struct GSF_ConnectedPeer *cp;
1501
1502   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1503               "Timeout trying to transmit to other peer\n");
1504   pth->timeout_task = NULL;
1505   cp = pth->cp;
1506   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1507                                cp->pth_tail,
1508                                pth);
1509   if (GNUNET_YES == pth->is_query)
1510     GNUNET_assert (0 < cp->ppd.pending_queries--);
1511   else if (GNUNET_NO == pth->is_query)
1512     GNUNET_assert (0 < cp->ppd.pending_replies--);
1513   GNUNET_LOAD_update (cp->ppd.transmission_delay,
1514                       UINT64_MAX);
1515   if (NULL != cp->cth)
1516   {
1517     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1518     cp->cth = NULL;
1519   }
1520   pth->gmc (pth->gmc_cls, 0, NULL);
1521   GNUNET_assert (0 == cp->cth_in_progress);
1522   GNUNET_free (pth);
1523 }
1524
1525
1526 /**
1527  * Transmit a message to the given peer as soon as possible.
1528  * If the peer disconnects before the transmission can happen,
1529  * the callback is invoked with a `NULL` @a buffer.
1530  *
1531  * @param cp target peer
1532  * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1533  * @param priority how important is this request?
1534  * @param timeout when does this request timeout (call gmc with error)
1535  * @param size number of bytes we would like to send to the peer
1536  * @param gmc function to call to get the message
1537  * @param gmc_cls closure for @a gmc
1538  * @return handle to cancel request
1539  */
1540 struct GSF_PeerTransmitHandle *
1541 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1542                     int is_query,
1543                     uint32_t priority,
1544                     struct GNUNET_TIME_Relative timeout,
1545                     size_t size,
1546                     GSF_GetMessageCallback gmc, void *gmc_cls)
1547 {
1548   struct GSF_PeerTransmitHandle *pth;
1549   struct GSF_PeerTransmitHandle *pos;
1550   struct GSF_PeerTransmitHandle *prev;
1551
1552   pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1553   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1554   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1555   pth->gmc = gmc;
1556   pth->gmc_cls = gmc_cls;
1557   pth->size = size;
1558   pth->is_query = is_query;
1559   pth->priority = priority;
1560   pth->cp = cp;
1561   /* insertion sort (by priority, descending) */
1562   prev = NULL;
1563   pos = cp->pth_head;
1564   while ((NULL != pos) && (pos->priority > priority))
1565   {
1566     prev = pos;
1567     pos = pos->next;
1568   }
1569   GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1570                                      cp->pth_tail,
1571                                      prev,
1572                                      pth);
1573   if (GNUNET_YES == is_query)
1574     cp->ppd.pending_queries++;
1575   else if (GNUNET_NO == is_query)
1576     cp->ppd.pending_replies++;
1577   pth->timeout_task
1578     = GNUNET_SCHEDULER_add_delayed (timeout,
1579                                     &peer_transmit_timeout,
1580                                     pth);
1581   schedule_transmission (pth);
1582   return pth;
1583 }
1584
1585
1586 /**
1587  * Cancel an earlier request for transmission.
1588  *
1589  * @param pth request to cancel
1590  */
1591 void
1592 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1593 {
1594   struct GSF_ConnectedPeer *cp;
1595
1596   if (NULL != pth->timeout_task)
1597   {
1598     GNUNET_SCHEDULER_cancel (pth->timeout_task);
1599     pth->timeout_task = NULL;
1600   }
1601   cp = pth->cp;
1602   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1603                                cp->pth_tail,
1604                                pth);
1605   if (GNUNET_YES == pth->is_query)
1606     GNUNET_assert (0 < cp->ppd.pending_queries--);
1607   else if (GNUNET_NO == pth->is_query)
1608     GNUNET_assert (0 < cp->ppd.pending_replies--);
1609   GNUNET_free (pth);
1610 }
1611
1612
1613 /**
1614  * Report on receiving a reply; update the performance record of the given peer.
1615  *
1616  * @param cp responding peer (will be updated)
1617  * @param request_time time at which the original query was transmitted
1618  * @param request_priority priority of the original request
1619  */
1620 void
1621 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1622                               struct GNUNET_TIME_Absolute request_time,
1623                               uint32_t request_priority)
1624 {
1625   struct GNUNET_TIME_Relative delay;
1626
1627   delay = GNUNET_TIME_absolute_get_duration (request_time);
1628   cp->ppd.avg_reply_delay.rel_value_us =
1629       (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
1630        delay.rel_value_us) / RUNAVG_DELAY_N;
1631   cp->ppd.avg_priority =
1632       (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1633        request_priority) / RUNAVG_DELAY_N;
1634 }
1635
1636
1637 /**
1638  * Report on receiving a reply in response to an initiating client.
1639  * Remember that this peer is good for this client.
1640  *
1641  * @param cp responding peer (will be updated)
1642  * @param initiator_client local client on responsible for query
1643  */
1644 void
1645 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1646                                    struct GSF_LocalClient *initiator_client)
1647 {
1648   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1649                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1650 }
1651
1652
1653 /**
1654  * Report on receiving a reply in response to an initiating peer.
1655  * Remember that this peer is good for this initiating peer.
1656  *
1657  * @param cp responding peer (will be updated)
1658  * @param initiator_peer other peer responsible for query
1659  */
1660 void
1661 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1662                                  const struct GSF_ConnectedPeer *initiator_peer)
1663 {
1664   unsigned int woff;
1665
1666   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1667   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1668   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1669   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1670   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1671 }
1672
1673
1674 /**
1675  * Write peer-respect information to a file - flush the buffer entry!
1676  *
1677  * @param cls unused
1678  * @param key peer identity
1679  * @param value the `struct GSF_ConnectedPeer` to flush
1680  * @return #GNUNET_OK to continue iteration
1681  */
1682 static int
1683 flush_respect (void *cls,
1684                const struct GNUNET_PeerIdentity *key,
1685                void *value)
1686 {
1687   struct GSF_ConnectedPeer *cp = value;
1688   struct GNUNET_PeerIdentity pid;
1689
1690   if (cp->ppd.respect == cp->disk_respect)
1691     return GNUNET_OK;           /* unchanged */
1692   GNUNET_assert (0 != cp->ppd.pid);
1693   GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1694   GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1695                           sizeof (cp->ppd.respect),
1696                           GNUNET_TIME_UNIT_FOREVER_ABS,
1697                           GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL);
1698   return GNUNET_OK;
1699 }
1700
1701
1702 /**
1703  * A peer disconnected from us.  Tear down the connected peer
1704  * record.
1705  *
1706  * @param cls unused
1707  * @param peer identity of peer that connected
1708  */
1709 void
1710 GSF_peer_disconnect_handler_ (void *cls,
1711                               const struct GNUNET_PeerIdentity *peer)
1712 {
1713   struct GSF_ConnectedPeer *cp;
1714   struct GSF_PeerTransmitHandle *pth;
1715   struct GSF_DelayedHandle *dh;
1716
1717   cp = GSF_peer_get_ (peer);
1718   if (NULL == cp)
1719     return;                     /* must have been disconnect from core with
1720                                  * 'peer' == my_id, ignore */
1721   flush_respect (NULL, peer, cp);
1722   GNUNET_assert (GNUNET_YES ==
1723                  GNUNET_CONTAINER_multipeermap_remove (cp_map,
1724                                                        peer,
1725                                                        cp));
1726   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1727                          GNUNET_CONTAINER_multipeermap_size (cp_map),
1728                          GNUNET_NO);
1729   if (NULL != cp->respect_iterate_req)
1730   {
1731     GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1732     cp->respect_iterate_req = NULL;
1733   }
1734   if (NULL != cp->migration_pth)
1735   {
1736     GSF_peer_transmit_cancel_ (cp->migration_pth);
1737     cp->migration_pth = NULL;
1738   }
1739   if (NULL != cp->rc)
1740   {
1741     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1742     cp->rc = NULL;
1743   }
1744   if (NULL != cp->rc_delay_task)
1745   {
1746     GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1747     cp->rc_delay_task = NULL;
1748   }
1749   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1750                                          &cancel_pending_request,
1751                                          cp);
1752   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1753   cp->request_map = NULL;
1754   GSF_plan_notify_peer_disconnect_ (cp);
1755   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1756   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies,
1757                              P2P_SUCCESS_LIST_SIZE);
1758   memset (cp->ppd.last_p2p_replies,
1759           0,
1760           sizeof (cp->ppd.last_p2p_replies));
1761   GSF_push_stop_ (cp);
1762   if (NULL != cp->cth)
1763   {
1764     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1765     cp->cth = NULL;
1766   }
1767   GNUNET_assert (0 == cp->cth_in_progress);
1768   while (NULL != (pth = cp->pth_head))
1769   {
1770     if (pth->timeout_task != NULL)
1771     {
1772       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1773       pth->timeout_task = NULL;
1774     }
1775     GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1776                                  cp->pth_tail,
1777                                  pth);
1778     if (GNUNET_YES == pth->is_query)
1779       GNUNET_assert (0 < cp->ppd.pending_queries--);
1780     else if (GNUNET_NO == pth->is_query)
1781       GNUNET_assert (0 < cp->ppd.pending_replies--);
1782     pth->gmc (pth->gmc_cls, 0, NULL);
1783     GNUNET_free (pth);
1784   }
1785   while (NULL != (dh = cp->delayed_head))
1786   {
1787     GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1788                                  cp->delayed_tail,
1789                                  dh);
1790     cp->delay_queue_size--;
1791     GNUNET_SCHEDULER_cancel (dh->delay_task);
1792     GNUNET_free (dh->pm);
1793     GNUNET_free (dh);
1794   }
1795   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1796   if (NULL != cp->mig_revive_task)
1797   {
1798     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1799     cp->mig_revive_task = NULL;
1800   }
1801   GNUNET_break (0 == cp->ppd.pending_queries);
1802   GNUNET_break (0 == cp->ppd.pending_replies);
1803   GNUNET_free (cp);
1804 }
1805
1806
1807 /**
1808  * Closure for #call_iterator().
1809  */
1810 struct IterationContext
1811 {
1812   /**
1813    * Function to call on each entry.
1814    */
1815   GSF_ConnectedPeerIterator it;
1816
1817   /**
1818    * Closure for @e it.
1819    */
1820   void *it_cls;
1821 };
1822
1823
1824 /**
1825  * Function that calls the callback for each peer.
1826  *
1827  * @param cls the `struct IterationContext *`
1828  * @param key identity of the peer
1829  * @param value the `struct GSF_ConnectedPeer *`
1830  * @return #GNUNET_YES to continue iteration
1831  */
1832 static int
1833 call_iterator (void *cls,
1834                const struct GNUNET_PeerIdentity *key,
1835                void *value)
1836 {
1837   struct IterationContext *ic = cls;
1838   struct GSF_ConnectedPeer *cp = value;
1839
1840   ic->it (ic->it_cls,
1841           key, cp,
1842           &cp->ppd);
1843   return GNUNET_YES;
1844 }
1845
1846
1847 /**
1848  * Iterate over all connected peers.
1849  *
1850  * @param it function to call for each peer
1851  * @param it_cls closure for @a it
1852  */
1853 void
1854 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1855                               void *it_cls)
1856 {
1857   struct IterationContext ic;
1858
1859   ic.it = it;
1860   ic.it_cls = it_cls;
1861   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1862                                          &call_iterator,
1863                                          &ic);
1864 }
1865
1866
1867 /**
1868  * Obtain the identity of a connected peer.
1869  *
1870  * @param cp peer to get identity of
1871  * @param id identity to set (written to)
1872  */
1873 void
1874 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1875                                   struct GNUNET_PeerIdentity *id)
1876 {
1877   GNUNET_assert (0 != cp->ppd.pid);
1878   GNUNET_PEER_resolve (cp->ppd.pid, id);
1879 }
1880
1881
1882 /**
1883  * Obtain the identity of a connected peer.
1884  *
1885  * @param cp peer to get identity of
1886  * @return reference to peer identity, valid until peer disconnects (!)
1887  */
1888 const struct GNUNET_PeerIdentity *
1889 GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1890 {
1891   GNUNET_assert (0 != cp->ppd.pid);
1892   return GNUNET_PEER_resolve2 (cp->ppd.pid);
1893 }
1894
1895
1896 /**
1897  * Assemble a migration stop message for transmission.
1898  *
1899  * @param cls the `struct GSF_ConnectedPeer` to use
1900  * @param size number of bytes we're allowed to write to @a buf
1901  * @param buf where to copy the message
1902  * @return number of bytes copied to @a buf
1903  */
1904 static size_t
1905 create_migration_stop_message (void *cls,
1906                                size_t size,
1907                                void *buf)
1908 {
1909   struct GSF_ConnectedPeer *cp = cls;
1910   struct MigrationStopMessage msm;
1911
1912   cp->migration_pth = NULL;
1913   if (NULL == buf)
1914     return 0;
1915   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1916   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1917   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1918   msm.reserved = htonl (0);
1919   msm.duration =
1920       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1921                                  (cp->last_migration_block));
1922   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1923   GNUNET_STATISTICS_update (GSF_stats,
1924                             gettext_noop ("# migration stop messages sent"),
1925                             1, GNUNET_NO);
1926   return sizeof (struct MigrationStopMessage);
1927 }
1928
1929
1930 /**
1931  * Ask a peer to stop migrating data to us until the given point
1932  * in time.
1933  *
1934  * @param cp peer to ask
1935  * @param block_time until when to block
1936  */
1937 void
1938 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1939                            struct GNUNET_TIME_Absolute block_time)
1940 {
1941   if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1942   {
1943     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1944                 "Migration already blocked for another %s\n",
1945                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
1946                                                         (cp->last_migration_block), GNUNET_YES));
1947     return;                     /* already blocked */
1948   }
1949   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1950               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
1951                                                       GNUNET_YES));
1952   cp->last_migration_block = block_time;
1953   if (NULL != cp->migration_pth)
1954     GSF_peer_transmit_cancel_ (cp->migration_pth);
1955   cp->migration_pth =
1956       GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1957                           GNUNET_TIME_UNIT_FOREVER_REL,
1958                           sizeof (struct MigrationStopMessage),
1959                           &create_migration_stop_message, cp);
1960 }
1961
1962
1963 /**
1964  * Notify core about a preference we have for the given peer
1965  * (to allocate more resources towards it).  The change will
1966  * be communicated the next time we reserve bandwidth with
1967  * core (not instantly).
1968  *
1969  * @param cp peer to reserve bandwidth from
1970  * @param pref preference change
1971  */
1972 void
1973 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1974                                        uint64_t pref)
1975 {
1976   cp->inc_preference += pref;
1977 }
1978
1979
1980 /**
1981  * Call this method periodically to flush respect information to disk.
1982  *
1983  * @param cls closure, not used
1984  * @param tc task context, not used
1985  */
1986 static void
1987 cron_flush_respect (void *cls,
1988                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1989 {
1990   if (NULL == cp_map)
1991     return;
1992   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1993                                          &flush_respect, NULL);
1994   if (NULL == tc)
1995     return;
1996   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1997     return;
1998   GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1999                                               GNUNET_SCHEDULER_PRIORITY_HIGH,
2000                                               &cron_flush_respect, NULL);
2001 }
2002
2003
2004 /**
2005  * Initialize peer management subsystem.
2006  */
2007 void
2008 GSF_connected_peer_init_ ()
2009 {
2010   cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
2011   peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
2012   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
2013                                       &cron_flush_respect, NULL);
2014 }
2015
2016
2017 /**
2018  * Iterator to free peer entries.
2019  *
2020  * @param cls closure, unused
2021  * @param key current key code
2022  * @param value value in the hash map (peer entry)
2023  * @return #GNUNET_YES (we should continue to iterate)
2024  */
2025 static int
2026 clean_peer (void *cls,
2027             const struct GNUNET_PeerIdentity *key,
2028             void *value)
2029 {
2030   GSF_peer_disconnect_handler_ (NULL, key);
2031   return GNUNET_YES;
2032 }
2033
2034
2035 /**
2036  * Shutdown peer management subsystem.
2037  */
2038 void
2039 GSF_connected_peer_done_ ()
2040 {
2041   cron_flush_respect (NULL, NULL);
2042   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
2043                                          &clean_peer, NULL);
2044   GNUNET_CONTAINER_multipeermap_destroy (cp_map);
2045   cp_map = NULL;
2046   GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
2047 }
2048
2049
2050 /**
2051  * Iterator to remove references to LC entry.
2052  *
2053  * @param cls the `struct GSF_LocalClient *` to look for
2054  * @param key current key code
2055  * @param value value in the hash map (peer entry)
2056  * @return #GNUNET_YES (we should continue to iterate)
2057  */
2058 static int
2059 clean_local_client (void *cls,
2060                     const struct GNUNET_PeerIdentity *key,
2061                     void *value)
2062 {
2063   const struct GSF_LocalClient *lc = cls;
2064   struct GSF_ConnectedPeer *cp = value;
2065   unsigned int i;
2066
2067   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
2068     if (cp->ppd.last_client_replies[i] == lc)
2069       cp->ppd.last_client_replies[i] = NULL;
2070   return GNUNET_YES;
2071 }
2072
2073
2074 /**
2075  * Notification that a local client disconnected.  Clean up all of our
2076  * references to the given handle.
2077  *
2078  * @param lc handle to the local client (henceforth invalid)
2079  */
2080 void
2081 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
2082 {
2083   if (NULL == cp_map)
2084     return;                     /* already cleaned up */
2085   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client,
2086                                          (void *) lc);
2087 }
2088
2089
2090 /* end of gnunet-service-fs_cp.c */