Fix for #4553
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file fs/gnunet-service-fs_cp.c
22  * @brief API to handle 'connected peers'
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_pe.h"
31 #include "gnunet-service-fs_pr.h"
32 #include "gnunet-service-fs_push.h"
33 #include "gnunet_peerstore_service.h"
34
35
36 /**
37  * Ratio for moving average delay calculation.  The previous
38  * average goes in with a factor of (n-1) into the calculation.
39  * Must be > 0.
40  */
41 #define RUNAVG_DELAY_N 16
42
43 /**
44  * How often do we flush respect values to disk?
45  */
46 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
47
48 /**
49  * After how long do we discard a reply?
50  */
51 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
52
53 /**
54  * Collect an instane number of statistics?  May cause excessive IPC.
55  */
56 #define INSANE_STATISTICS GNUNET_NO
57
58
59 /**
60  * Handle to cancel a transmission request.
61  */
62 struct GSF_PeerTransmitHandle
63 {
64
65   /**
66    * Kept in a doubly-linked list.
67    */
68   struct GSF_PeerTransmitHandle *next;
69
70   /**
71    * Kept in a doubly-linked list.
72    */
73   struct GSF_PeerTransmitHandle *prev;
74
75   /**
76    * Time when this transmission request was issued.
77    */
78   struct GNUNET_TIME_Absolute transmission_request_start_time;
79
80   /**
81    * Timeout for this request.
82    */
83   struct GNUNET_TIME_Absolute timeout;
84
85   /**
86    * Task called on timeout, or 0 for none.
87    */
88   struct GNUNET_SCHEDULER_Task *timeout_task;
89
90   /**
91    * Function to call to get the actual message.
92    */
93   GSF_GetMessageCallback gmc;
94
95   /**
96    * Peer this request targets.
97    */
98   struct GSF_ConnectedPeer *cp;
99
100   /**
101    * Closure for @e gmc.
102    */
103   void *gmc_cls;
104
105   /**
106    * Size of the message to be transmitted.
107    */
108   size_t size;
109
110   /**
111    * #GNUNET_YES if this is a query, #GNUNET_NO for content.
112    */
113   int is_query;
114
115   /**
116    * Did we get a reservation already?
117    */
118   int was_reserved;
119
120   /**
121    * Priority of this request.
122    */
123   uint32_t priority;
124
125 };
126
127
128 /**
129  * Handle for an entry in our delay list.
130  */
131 struct GSF_DelayedHandle
132 {
133
134   /**
135    * Kept in a doubly-linked list.
136    */
137   struct GSF_DelayedHandle *next;
138
139   /**
140    * Kept in a doubly-linked list.
141    */
142   struct GSF_DelayedHandle *prev;
143
144   /**
145    * Peer this transmission belongs to.
146    */
147   struct GSF_ConnectedPeer *cp;
148
149   /**
150    * The PUT that was delayed.
151    */
152   struct PutMessage *pm;
153
154   /**
155    * Task for the delay.
156    */
157   struct GNUNET_SCHEDULER_Task *delay_task;
158
159   /**
160    * Size of the message.
161    */
162   size_t msize;
163
164 };
165
166
167 /**
168  * Information per peer and request.
169  */
170 struct PeerRequest
171 {
172
173   /**
174    * Handle to generic request (generic: from peer or local client).
175    */
176   struct GSF_PendingRequest *pr;
177
178   /**
179    * Which specific peer issued this request?
180    */
181   struct GSF_ConnectedPeer *cp;
182
183   /**
184    * Task for asynchronous stopping of this request.
185    */
186   struct GNUNET_SCHEDULER_Task *kill_task;
187
188 };
189
190
191 /**
192  * A connected peer.
193  */
194 struct GSF_ConnectedPeer
195 {
196
197   /**
198    * Performance data for this peer.
199    */
200   struct GSF_PeerPerformanceData ppd;
201
202   /**
203    * Time until when we blocked this peer from migrating
204    * data to us.
205    */
206   struct GNUNET_TIME_Absolute last_migration_block;
207
208   /**
209    * Task scheduled to revive migration to this peer.
210    */
211   struct GNUNET_SCHEDULER_Task *mig_revive_task;
212
213   /**
214    * Messages (replies, queries, content migration) we would like to
215    * send to this peer in the near future.  Sorted by priority, head.
216    */
217   struct GSF_PeerTransmitHandle *pth_head;
218
219   /**
220    * Messages (replies, queries, content migration) we would like to
221    * send to this peer in the near future.  Sorted by priority, tail.
222    */
223   struct GSF_PeerTransmitHandle *pth_tail;
224
225   /**
226    * Messages (replies, queries, content migration) we would like to
227    * send to this peer in the near future.  Sorted by priority, head.
228    */
229   struct GSF_DelayedHandle *delayed_head;
230
231   /**
232    * Messages (replies, queries, content migration) we would like to
233    * send to this peer in the near future.  Sorted by priority, tail.
234    */
235   struct GSF_DelayedHandle *delayed_tail;
236
237   /**
238    * Migration stop message in our queue, or NULL if we have none pending.
239    */
240   struct GSF_PeerTransmitHandle *migration_pth;
241
242   /**
243    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
244    */
245   struct GNUNET_ATS_ReservationContext *rc;
246
247   /**
248    * Task scheduled if we need to retry bandwidth reservation later.
249    */
250   struct GNUNET_SCHEDULER_Task *rc_delay_task;
251
252   /**
253    * Active requests from this neighbour, map of query to `struct PeerRequest`.
254    */
255   struct GNUNET_CONTAINER_MultiHashMap *request_map;
256
257   /**
258    * Handle for an active request for transmission to this
259    * peer, or NULL (if core queue was full).
260    */
261   struct GNUNET_CORE_TransmitHandle *cth;
262
263   /**
264    * Increase in traffic preference still to be submitted
265    * to the core service for this peer.
266    */
267   uint64_t inc_preference;
268
269   /**
270    * Set to 1 if we're currently in the process of calling
271    * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
272    * NULL, we should not call notify_transmit_ready for this
273    * handle right now).
274    */
275   unsigned int cth_in_progress;
276
277   /**
278    * Number of entries in @e delayed_head DLL.
279    */
280   unsigned int delay_queue_size;
281
282   /**
283    * Respect rating for this peer on disk.
284    */
285   uint32_t disk_respect;
286
287   /**
288    * Which offset in @e last_p2p_replies will be updated next?
289    * (we go round-robin).
290    */
291   unsigned int last_p2p_replies_woff;
292
293   /**
294    * Which offset in @e last_client_replies will be updated next?
295    * (we go round-robin).
296    */
297   unsigned int last_client_replies_woff;
298
299   /**
300    * Current offset into @e last_request_times ring buffer.
301    */
302   unsigned int last_request_times_off;
303
304   /**
305    * #GNUNET_YES if we did successfully reserve 32k bandwidth,
306    * #GNUNET_NO if not.
307    */
308   int did_reserve;
309
310   /**
311    * Function called when the creation of this record is complete.
312    */
313   GSF_ConnectedPeerCreationCallback creation_cb;
314
315   /**
316    * Closure for @e creation_cb
317    */
318   void *creation_cb_cls;
319
320   /**
321    * Handle to the PEERSTORE iterate request for peer respect value
322    */
323   struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
324
325 };
326
327
328 /**
329  * Map from peer identities to `struct GSF_ConnectPeer` entries.
330  */
331 static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
332
333 /**
334  * Handle to peerstore service.
335  */
336 static struct GNUNET_PEERSTORE_Handle *peerstore;
337
338 /**
339  * Task used to flush respect values to disk.
340  */
341 static struct GNUNET_SCHEDULER_Task *fr_task;
342
343
344 /**
345  * Update the latency information kept for the given peer.
346  *
347  * @param id peer record to update
348  * @param latency current latency value
349  */
350 void
351 GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
352                           struct GNUNET_TIME_Relative latency)
353 {
354   struct GSF_ConnectedPeer *cp;
355
356   cp = GSF_peer_get_ (id);
357   if (NULL == cp)
358     return; /* we're not yet connected at the core level, ignore */
359   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
360                                  latency);
361 }
362
363
364 /**
365  * Return the performance data record for the given peer
366  *
367  * @param cp peer to query
368  * @return performance data record for the peer
369  */
370 struct GSF_PeerPerformanceData *
371 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
372 {
373   return &cp->ppd;
374 }
375
376
377 /**
378  * Core is ready to transmit to a peer, get the message.
379  *
380  * @param cls the `struct GSF_PeerTransmitHandle` of the message
381  * @param size number of bytes core is willing to take
382  * @param buf where to copy the message
383  * @return number of bytes copied to @a buf
384  */
385 static size_t
386 peer_transmit_ready_cb (void *cls,
387                         size_t size,
388                         void *buf);
389
390
391 /**
392  * Function called by core upon success or failure of our bandwidth reservation request.
393  *
394  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
395  * @param peer identifies the peer
396  * @param amount set to the amount that was actually reserved or unreserved;
397  *               either the full requested amount or zero (no partial reservations)
398  * @param res_delay if the reservation could not be satisfied (amount was 0), how
399  *        long should the client wait until re-trying?
400  */
401 static void
402 ats_reserve_callback (void *cls,
403                       const struct GNUNET_PeerIdentity *peer,
404                       int32_t amount,
405                       struct GNUNET_TIME_Relative res_delay);
406
407
408 /**
409  * If ready (bandwidth reserved), try to schedule transmission via
410  * core for the given handle.
411  *
412  * @param pth transmission handle to schedule
413  */
414 static void
415 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
416 {
417   struct GSF_ConnectedPeer *cp;
418   struct GNUNET_PeerIdentity target;
419
420   cp = pth->cp;
421   if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
422     return;                     /* already done */
423   GNUNET_assert (0 != cp->ppd.pid);
424   GNUNET_PEER_resolve (cp->ppd.pid, &target);
425
426   if (0 != cp->inc_preference)
427   {
428     GNUNET_ATS_performance_change_preference (GSF_ats,
429                                               &target,
430                                               GNUNET_ATS_PREFERENCE_BANDWIDTH,
431                                               (double) cp->inc_preference,
432                                               GNUNET_ATS_PREFERENCE_END);
433     cp->inc_preference = 0;
434   }
435
436   if ( (GNUNET_YES == pth->is_query) &&
437        (GNUNET_YES != pth->was_reserved) )
438   {
439     /* query, need reservation */
440     if (GNUNET_YES != cp->did_reserve)
441       return;                   /* not ready */
442     cp->did_reserve = GNUNET_NO;
443     /* reservation already done! */
444     pth->was_reserved = GNUNET_YES;
445     cp->rc = GNUNET_ATS_reserve_bandwidth (GSF_ats,
446                                            &target,
447                                            DBLOCK_SIZE,
448                                            &ats_reserve_callback,
449                                            cp);
450     return;
451   }
452   GNUNET_assert (NULL == cp->cth);
453   cp->cth_in_progress++;
454   cp->cth =
455     GNUNET_CORE_notify_transmit_ready (GSF_core,
456                                        GNUNET_YES,
457                                        GNUNET_CORE_PRIO_BACKGROUND,
458                                        GNUNET_TIME_absolute_get_remaining (pth->timeout),
459                                        &target,
460                                        pth->size,
461                                        &peer_transmit_ready_cb, cp);
462   GNUNET_assert (NULL != cp->cth);
463   GNUNET_assert (0 < cp->cth_in_progress--);
464 }
465
466
467 /**
468  * Core is ready to transmit to a peer, get the message.
469  *
470  * @param cls the `struct GSF_PeerTransmitHandle` of the message
471  * @param size number of bytes core is willing to take
472  * @param buf where to copy the message
473  * @return number of bytes copied to @a buf
474  */
475 static size_t
476 peer_transmit_ready_cb (void *cls,
477                         size_t size,
478                         void *buf)
479 {
480   struct GSF_ConnectedPeer *cp = cls;
481   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
482   struct GSF_PeerTransmitHandle *pos;
483   size_t ret;
484
485   cp->cth = NULL;
486   if (NULL == pth)
487     return 0;
488   if (pth->size > size)
489   {
490     schedule_transmission (pth);
491     return 0;
492   }
493   if (NULL != pth->timeout_task)
494   {
495     GNUNET_SCHEDULER_cancel (pth->timeout_task);
496     pth->timeout_task = NULL;
497   }
498   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
499                                cp->pth_tail,
500                                pth);
501   if (GNUNET_YES == pth->is_query)
502   {
503     cp->ppd.last_request_times[(cp->last_request_times_off++) %
504                                MAX_QUEUE_PER_PEER] =
505       GNUNET_TIME_absolute_get ();
506     GNUNET_assert (0 < cp->ppd.pending_queries--);
507   }
508   else if (GNUNET_NO == pth->is_query)
509   {
510     GNUNET_assert (0 < cp->ppd.pending_replies--);
511   }
512   GNUNET_LOAD_update (cp->ppd.transmission_delay,
513                       GNUNET_TIME_absolute_get_duration
514                       (pth->transmission_request_start_time).rel_value_us);
515   ret = pth->gmc (pth->gmc_cls, size, buf);
516   if (NULL != (pos = cp->pth_head))
517   {
518     GNUNET_assert (pos != pth);
519     schedule_transmission (pos);
520   }
521   GNUNET_free (pth);
522   return ret;
523 }
524
525
526 /**
527  * (re)try to reserve bandwidth from the given peer.
528  *
529  * @param cls the `struct GSF_ConnectedPeer` to reserve from
530  */
531 static void
532 retry_reservation (void *cls)
533 {
534   struct GSF_ConnectedPeer *cp = cls;
535   struct GNUNET_PeerIdentity target;
536
537   GNUNET_PEER_resolve (cp->ppd.pid, &target);
538   cp->rc_delay_task = NULL;
539   cp->rc =
540     GNUNET_ATS_reserve_bandwidth (GSF_ats,
541                                   &target,
542                                   DBLOCK_SIZE,
543                                   &ats_reserve_callback, cp);
544 }
545
546
547 /**
548  * Function called by core upon success or failure of our bandwidth reservation request.
549  *
550  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
551  * @param peer identifies the peer
552  * @param amount set to the amount that was actually reserved or unreserved;
553  *               either the full requested amount or zero (no partial reservations)
554  * @param res_delay if the reservation could not be satisfied (amount was 0), how
555  *        long should the client wait until re-trying?
556  */
557 static void
558 ats_reserve_callback (void *cls,
559                       const struct GNUNET_PeerIdentity *peer,
560                       int32_t amount,
561                       struct GNUNET_TIME_Relative res_delay)
562 {
563   struct GSF_ConnectedPeer *cp = cls;
564   struct GSF_PeerTransmitHandle *pth;
565
566   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
567               "Reserved %d bytes / need to wait %s for reservation\n",
568               (int) amount,
569               GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
570   cp->rc = NULL;
571   if (0 == amount)
572   {
573     cp->rc_delay_task =
574         GNUNET_SCHEDULER_add_delayed (res_delay,
575                                       &retry_reservation,
576                                       cp);
577     return;
578   }
579   cp->did_reserve = GNUNET_YES;
580   pth = cp->pth_head;
581   if ( (NULL != pth) &&
582        (NULL == cp->cth) &&
583        (0 == cp->cth_in_progress) )
584   {
585     /* reservation success, try transmission now! */
586     cp->cth_in_progress++;
587     cp->cth =
588         GNUNET_CORE_notify_transmit_ready (GSF_core,
589                                            GNUNET_YES,
590                                            GNUNET_CORE_PRIO_BACKGROUND,
591                                            GNUNET_TIME_absolute_get_remaining (pth->timeout),
592                                            peer,
593                                            pth->size,
594                                            &peer_transmit_ready_cb,
595                                            cp);
596     GNUNET_assert (NULL != cp->cth);
597     GNUNET_assert (0 < cp->cth_in_progress--);
598   }
599 }
600
601
602 /**
603  * Function called by PEERSTORE with peer respect record
604  *
605  * @param cls handle to connected peer entry
606  * @param record peerstore record information
607  * @param emsg error message, or NULL if no errors
608  */
609 static void
610 peer_respect_cb (void *cls,
611                  const struct GNUNET_PEERSTORE_Record *record,
612                  const char *emsg)
613 {
614   struct GSF_ConnectedPeer *cp = cls;
615
616   GNUNET_assert (NULL != cp->respect_iterate_req);
617   printf("Got a record!\n");
618   if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size))
619     cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value);
620   GSF_push_start_ (cp);
621   if (NULL != cp->creation_cb)
622     cp->creation_cb (cp->creation_cb_cls, cp);
623   if (NULL != record)
624   {
625     printf("Cancelling!\n");
626     GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
627     cp->respect_iterate_req = NULL;
628   }
629 }
630
631
632 /**
633  * A peer connected to us.  Setup the connected peer
634  * records.
635  *
636  * @param peer identity of peer that connected
637  * @param creation_cb callback function when the record is created.
638  * @param creation_cb_cls closure for @creation_cb
639  */
640 void
641 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
642                            GSF_ConnectedPeerCreationCallback creation_cb,
643                            void *creation_cb_cls)
644 {
645   struct GSF_ConnectedPeer *cp;
646
647   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
648               "Connected to peer %s\n",
649               GNUNET_i2s (peer));
650   cp = GNUNET_new (struct GSF_ConnectedPeer);
651   cp->ppd.pid = GNUNET_PEER_intern (peer);
652   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
653   cp->rc =
654       GNUNET_ATS_reserve_bandwidth (GSF_ats,
655                                     peer,
656                                     DBLOCK_SIZE,
657                                     &ats_reserve_callback, cp);
658   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128,
659                                                           GNUNET_YES);
660   GNUNET_break (GNUNET_OK ==
661                 GNUNET_CONTAINER_multipeermap_put (cp_map,
662                GSF_connected_peer_get_identity2_ (cp),
663                                                    cp,
664                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
665   GNUNET_STATISTICS_set (GSF_stats,
666                          gettext_noop ("# peers connected"),
667                          GNUNET_CONTAINER_multipeermap_size (cp_map),
668                          GNUNET_NO);
669   cp->creation_cb = creation_cb;
670   cp->creation_cb_cls = creation_cb_cls;
671   cp->respect_iterate_req =
672       GNUNET_PEERSTORE_iterate (peerstore, "fs",
673                                 peer, "respect",
674                                 GNUNET_TIME_UNIT_FOREVER_REL,
675                                 &peer_respect_cb,
676                                 cp);
677 }
678
679
680 /**
681  * It may be time to re-start migrating content to this
682  * peer.  Check, and if so, restart migration.
683  *
684  * @param cls the `struct GSF_ConnectedPeer`
685  */
686 static void
687 revive_migration (void *cls)
688 {
689   struct GSF_ConnectedPeer *cp = cls;
690   struct GNUNET_TIME_Relative bt;
691
692   cp->mig_revive_task = NULL;
693   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
694   if (0 != bt.rel_value_us)
695   {
696     /* still time left... */
697     cp->mig_revive_task =
698         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
699     return;
700   }
701   GSF_push_start_ (cp);
702 }
703
704
705 /**
706  * Get a handle for a connected peer.
707  *
708  * @param peer peer's identity
709  * @return NULL if the peer is not currently connected
710  */
711 struct GSF_ConnectedPeer *
712 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
713 {
714   if (NULL == cp_map)
715     return NULL;
716   return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
717 }
718
719
720 /**
721  * Handle P2P "MIGRATION_STOP" message.
722  *
723  * @param cls closure, always NULL
724  * @param other the other peer involved (sender or receiver, NULL
725  *        for loopback messages where we are both sender and receiver)
726  * @param message the actual message
727  * @return #GNUNET_OK to keep the connection open,
728  *         #GNUNET_SYSERR to close it (signal serious error)
729  */
730 int
731 GSF_handle_p2p_migration_stop_ (void *cls,
732                                 const struct GNUNET_PeerIdentity *other,
733                                 const struct GNUNET_MessageHeader *message)
734 {
735   struct GSF_ConnectedPeer *cp;
736   const struct MigrationStopMessage *msm;
737   struct GNUNET_TIME_Relative bt;
738
739   msm = (const struct MigrationStopMessage *) message;
740   cp = GSF_peer_get_ (other);
741   if (NULL == cp)
742   {
743     GNUNET_break (0);
744     return GNUNET_OK;
745   }
746   GNUNET_STATISTICS_update (GSF_stats,
747                             gettext_noop ("# migration stop messages received"),
748                             1, GNUNET_NO);
749   bt = GNUNET_TIME_relative_ntoh (msm->duration);
750   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
751               _("Migration of content to peer `%s' blocked for %s\n"),
752               GNUNET_i2s (other),
753               GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
754   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
755   if ( (NULL == cp->mig_revive_task) &&
756        (NULL == cp->respect_iterate_req) )
757   {
758     GSF_push_stop_ (cp);
759     cp->mig_revive_task =
760         GNUNET_SCHEDULER_add_delayed (bt,
761                                       &revive_migration, cp);
762   }
763   return GNUNET_OK;
764 }
765
766
767 /**
768  * Copy reply and free put message.
769  *
770  * @param cls the `struct PutMessage`
771  * @param buf_size number of bytes available in @a buf
772  * @param buf where to copy the message, NULL on error (peer disconnect)
773  * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
774  */
775 static size_t
776 copy_reply (void *cls,
777             size_t buf_size,
778             void *buf)
779 {
780   struct PutMessage *pm = cls;
781   size_t size;
782
783   if (NULL != buf)
784   {
785     GNUNET_assert (buf_size >= ntohs (pm->header.size));
786     size = ntohs (pm->header.size);
787     memcpy (buf, pm, size);
788     GNUNET_STATISTICS_update (GSF_stats,
789                               gettext_noop ("# replies transmitted to other peers"),
790                               1,
791                               GNUNET_NO);
792   }
793   else
794   {
795     size = 0;
796     GNUNET_STATISTICS_update (GSF_stats,
797                               gettext_noop ("# replies dropped"),
798                               1,
799                               GNUNET_NO);
800   }
801   GNUNET_free (pm);
802   return size;
803 }
804
805
806 /**
807  * Free resources associated with the given peer request.
808  *
809  * @param peerreq request to free
810  */
811 static void
812 free_pending_request (struct PeerRequest *peerreq)
813 {
814   struct GSF_ConnectedPeer *cp = peerreq->cp;
815   struct GSF_PendingRequestData *prd;
816
817   prd = GSF_pending_request_get_data_ (peerreq->pr);
818   if (NULL != peerreq->kill_task)
819   {
820     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
821     peerreq->kill_task = NULL;
822   }
823   GNUNET_STATISTICS_update (GSF_stats,
824                             gettext_noop ("# P2P searches active"),
825                             -1,
826                             GNUNET_NO);
827   GNUNET_break (GNUNET_YES ==
828                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
829                                                       &prd->query,
830                                                       peerreq));
831   GNUNET_free (peerreq);
832 }
833
834
835 /**
836  * Cancel all requests associated with the peer.
837  *
838  * @param cls unused
839  * @param query hash code of the request
840  * @param value the `struct GSF_PendingRequest`
841  * @return #GNUNET_YES (continue to iterate)
842  */
843 static int
844 cancel_pending_request (void *cls,
845                         const struct GNUNET_HashCode *query,
846                         void *value)
847 {
848   struct PeerRequest *peerreq = value;
849   struct GSF_PendingRequest *pr = peerreq->pr;
850
851   free_pending_request (peerreq);
852   GSF_pending_request_cancel_ (pr,
853                                GNUNET_NO);
854   return GNUNET_OK;
855 }
856
857
858 /**
859  * Free the given request.
860  *
861  * @param cls the request to free
862  */
863 static void
864 peer_request_destroy (void *cls)
865 {
866   struct PeerRequest *peerreq = cls;
867   struct GSF_PendingRequest *pr = peerreq->pr;
868   struct GSF_PendingRequestData *prd;
869
870   peerreq->kill_task = NULL;
871   prd = GSF_pending_request_get_data_ (pr);
872   cancel_pending_request (NULL,
873                           &prd->query,
874                           peerreq);
875 }
876
877
878 /**
879  * The artificial delay is over, transmit the message now.
880  *
881  * @param cls the `struct GSF_DelayedHandle` with the message
882  */
883 static void
884 transmit_delayed_now (void *cls)
885 {
886   struct GSF_DelayedHandle *dh = cls;
887   struct GSF_ConnectedPeer *cp = dh->cp;
888
889   GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
890                                cp->delayed_tail,
891                                dh);
892   cp->delay_queue_size--;
893   (void) GSF_peer_transmit_ (cp,
894                              GNUNET_NO,
895                              UINT32_MAX,
896                              REPLY_TIMEOUT,
897                              dh->msize,
898                              &copy_reply,
899                              dh->pm);
900   GNUNET_free (dh);
901 }
902
903
904 /**
905  * Get the randomized delay a response should be subjected to.
906  *
907  * @return desired delay
908  */
909 static struct GNUNET_TIME_Relative
910 get_randomized_delay ()
911 {
912   struct GNUNET_TIME_Relative ret;
913
914   ret =
915       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
916                                      GNUNET_CRYPTO_random_u32
917                                      (GNUNET_CRYPTO_QUALITY_WEAK,
918                                       2 * GSF_avg_latency.rel_value_us + 1));
919 #if INSANE_STATISTICS
920   GNUNET_STATISTICS_update (GSF_stats,
921                             gettext_noop
922                             ("# artificial delays introduced (ms)"),
923                             ret.rel_value_us / 1000LL, GNUNET_NO);
924 #endif
925   return ret;
926 }
927
928
929 /**
930  * Handle a reply to a pending request.  Also called if a request
931  * expires (then with data == NULL).  The handler may be called
932  * many times (depending on the request type), but will not be
933  * called during or after a call to GSF_pending_request_cancel
934  * and will also not be called anymore after a call signalling
935  * expiration.
936  *
937  * @param cls `struct PeerRequest` this is an answer for
938  * @param eval evaluation of the result
939  * @param pr handle to the original pending request
940  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
941  * @param expiration when does @a data expire?
942  * @param last_transmission when did we last transmit a request for this block
943  * @param type type of the block
944  * @param data response data, NULL on request expiration
945  * @param data_len number of bytes in @a data
946  */
947 static void
948 handle_p2p_reply (void *cls,
949                   enum GNUNET_BLOCK_EvaluationResult eval,
950                   struct GSF_PendingRequest *pr,
951                   uint32_t reply_anonymity_level,
952                   struct GNUNET_TIME_Absolute expiration,
953                   struct GNUNET_TIME_Absolute last_transmission,
954                   enum GNUNET_BLOCK_Type type,
955                   const void *data,
956                   size_t data_len)
957 {
958   struct PeerRequest *peerreq = cls;
959   struct GSF_ConnectedPeer *cp = peerreq->cp;
960   struct GSF_PendingRequestData *prd;
961   struct PutMessage *pm;
962   size_t msize;
963
964   GNUNET_assert (data_len + sizeof (struct PutMessage) <
965                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
966   GNUNET_assert (peerreq->pr == pr);
967   prd = GSF_pending_request_get_data_ (pr);
968   if (NULL == data)
969   {
970     free_pending_request (peerreq);
971     return;
972   }
973   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
974   if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
975   {
976     GNUNET_STATISTICS_update (GSF_stats,
977                               gettext_noop
978                               ("# replies dropped due to type mismatch"),
979                                 1, GNUNET_NO);
980     return;
981   }
982   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
983               "Transmitting result for query `%s' to peer\n",
984               GNUNET_h2s (&prd->query));
985   GNUNET_STATISTICS_update (GSF_stats,
986                             gettext_noop ("# replies received for other peers"),
987                             1, GNUNET_NO);
988   msize = sizeof (struct PutMessage) + data_len;
989   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
990   {
991     GNUNET_break (0);
992     return;
993   }
994   if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
995   {
996     if (reply_anonymity_level - 1 > GSF_cover_content_count)
997     {
998       GNUNET_STATISTICS_update (GSF_stats,
999                                 gettext_noop
1000                                 ("# replies dropped due to insufficient cover traffic"),
1001                                 1, GNUNET_NO);
1002       return;
1003     }
1004     GSF_cover_content_count -= (reply_anonymity_level - 1);
1005   }
1006
1007   pm = GNUNET_malloc (msize);
1008   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1009   pm->header.size = htons (msize);
1010   pm->type = htonl (type);
1011   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
1012   memcpy (&pm[1], data, data_len);
1013   if ( (UINT32_MAX != reply_anonymity_level) &&
1014        (0 != reply_anonymity_level) &&
1015        (GNUNET_YES == GSF_enable_randomized_delays) )
1016   {
1017     struct GSF_DelayedHandle *dh;
1018
1019     dh = GNUNET_new (struct GSF_DelayedHandle);
1020     dh->cp = cp;
1021     dh->pm = pm;
1022     dh->msize = msize;
1023     GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
1024                                  cp->delayed_tail,
1025                                  dh);
1026     cp->delay_queue_size++;
1027     dh->delay_task =
1028         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
1029                                       &transmit_delayed_now,
1030                                       dh);
1031   }
1032   else
1033   {
1034     (void) GSF_peer_transmit_ (cp,
1035                                GNUNET_NO,
1036                                UINT32_MAX,
1037                                REPLY_TIMEOUT,
1038                                msize,
1039                                &copy_reply,
1040                                pm);
1041   }
1042   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
1043     return;
1044   if (NULL == peerreq->kill_task)
1045   {
1046     GNUNET_STATISTICS_update (GSF_stats,
1047                               gettext_noop
1048                               ("# P2P searches destroyed due to ultimate reply"),
1049                               1,
1050                               GNUNET_NO);
1051     peerreq->kill_task =
1052         GNUNET_SCHEDULER_add_now (&peer_request_destroy,
1053                                   peerreq);
1054   }
1055 }
1056
1057
1058 /**
1059  * Increase the peer's respect by a value.
1060  *
1061  * @param cp which peer to change the respect value on
1062  * @param value is the int value by which the
1063  *  peer's credit is to be increased or decreased
1064  * @returns the actual change in respect (positive or negative)
1065  */
1066 static int
1067 change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
1068 {
1069   if (0 == value)
1070     return 0;
1071   GNUNET_assert (NULL != cp);
1072   if (value > 0)
1073   {
1074     if (cp->ppd.respect + value < cp->ppd.respect)
1075     {
1076       value = UINT32_MAX - cp->ppd.respect;
1077       cp->ppd.respect = UINT32_MAX;
1078     }
1079     else
1080       cp->ppd.respect += value;
1081   }
1082   else
1083   {
1084     if (cp->ppd.respect < -value)
1085     {
1086       value = -cp->ppd.respect;
1087       cp->ppd.respect = 0;
1088     }
1089     else
1090       cp->ppd.respect += value;
1091   }
1092   return value;
1093 }
1094
1095
1096 /**
1097  * We've received a request with the specified priority.  Bound it
1098  * according to how much we respect the given peer.
1099  *
1100  * @param prio_in requested priority
1101  * @param cp the peer making the request
1102  * @return effective priority
1103  */
1104 static int32_t
1105 bound_priority (uint32_t prio_in,
1106                 struct GSF_ConnectedPeer *cp)
1107 {
1108 #define N ((double)128.0)
1109   uint32_t ret;
1110   double rret;
1111   int ld;
1112
1113   ld = GSF_test_get_load_too_high_ (0);
1114   if (GNUNET_SYSERR == ld)
1115   {
1116 #if INSANE_STATISTICS
1117     GNUNET_STATISTICS_update (GSF_stats,
1118                               gettext_noop
1119                               ("# requests done for free (low load)"), 1,
1120                               GNUNET_NO);
1121 #endif
1122     return 0;                   /* excess resources */
1123   }
1124   if (prio_in > INT32_MAX)
1125     prio_in = INT32_MAX;
1126   ret = -change_peer_respect (cp, -(int) prio_in);
1127   if (ret > 0)
1128   {
1129     if (ret > GSF_current_priorities + N)
1130       rret = GSF_current_priorities + N;
1131     else
1132       rret = ret;
1133     GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1134   }
1135   if ((GNUNET_YES == ld) && (ret > 0))
1136   {
1137     /* try with charging */
1138     ld = GSF_test_get_load_too_high_ (ret);
1139   }
1140   if (GNUNET_YES == ld)
1141   {
1142     GNUNET_STATISTICS_update (GSF_stats,
1143                               gettext_noop
1144                               ("# request dropped, priority insufficient"), 1,
1145                               GNUNET_NO);
1146     /* undo charge */
1147     change_peer_respect (cp, (int) ret);
1148     return -1;                  /* not enough resources */
1149   }
1150   else
1151   {
1152     GNUNET_STATISTICS_update (GSF_stats,
1153                               gettext_noop
1154                               ("# requests done for a price (normal load)"), 1,
1155                               GNUNET_NO);
1156   }
1157 #undef N
1158   return ret;
1159 }
1160
1161
1162 /**
1163  * The priority level imposes a bound on the maximum
1164  * value for the ttl that can be requested.
1165  *
1166  * @param ttl_in requested ttl
1167  * @param prio given priority
1168  * @return @a ttl_in if @a ttl_in is below the limit,
1169  *         otherwise the ttl-limit for the given @a prio
1170  */
1171 static int32_t
1172 bound_ttl (int32_t ttl_in,
1173            uint32_t prio)
1174 {
1175   unsigned long long allowed;
1176
1177   if (ttl_in <= 0)
1178     return ttl_in;
1179   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1180   if (ttl_in > allowed)
1181   {
1182     if (allowed >= (1 << 30))
1183       return 1 << 30;
1184     return allowed;
1185   }
1186   return ttl_in;
1187 }
1188
1189
1190 /**
1191  * Closure for #test_exist_cb().
1192  */
1193 struct TestExistClosure
1194 {
1195
1196   /**
1197    * Priority of the incoming request.
1198    */
1199   int32_t priority;
1200
1201   /**
1202    * Relative TTL of the incoming request.
1203    */
1204   int32_t ttl;
1205
1206   /**
1207    * Type of the incoming request.
1208    */
1209   enum GNUNET_BLOCK_Type type;
1210
1211   /**
1212    * Set to #GNUNET_YES if we are done handling the query.
1213    */
1214   int finished;
1215
1216 };
1217
1218
1219 /**
1220  * Test if the query already exists.  If so, merge it, otherwise
1221  * keep `finished` at #GNUNET_NO.
1222  *
1223  * @param cls our `struct TestExistClosure`
1224  * @param hc the key of the query
1225  * @param value the existing `struct PeerRequest`.
1226  * @return #GNUNET_YES to continue to iterate,
1227  *         #GNUNET_NO if we successfully merged
1228  */
1229 static int
1230 test_exist_cb (void *cls,
1231                const struct GNUNET_HashCode *hc,
1232                void *value)
1233 {
1234   struct TestExistClosure *tec = cls;
1235   struct PeerRequest *peerreq = value;
1236   struct GSF_PendingRequest *pr;
1237   struct GSF_PendingRequestData *prd;
1238
1239   pr = peerreq->pr;
1240   prd = GSF_pending_request_get_data_ (pr);
1241   if (prd->type != tec->type)
1242     return GNUNET_YES;
1243   if (prd->ttl.abs_value_us >=
1244       GNUNET_TIME_absolute_get ().abs_value_us + tec->ttl * 1000LL)
1245   {
1246     /* existing request has higher TTL, drop new one! */
1247     prd->priority += tec->priority;
1248     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1249                 "Have existing request with higher TTL, dropping new request.\n");
1250     GNUNET_STATISTICS_update (GSF_stats,
1251                               gettext_noop
1252                               ("# requests dropped due to higher-TTL request"),
1253                               1, GNUNET_NO);
1254     tec->finished = GNUNET_YES;
1255     return GNUNET_NO;
1256   }
1257   /* existing request has lower TTL, drop old one! */
1258   tec->priority += prd->priority;
1259   free_pending_request (peerreq);
1260   GSF_pending_request_cancel_ (pr,
1261                                GNUNET_YES);
1262   return GNUNET_NO;
1263 }
1264
1265
1266 /**
1267  * Handle P2P "QUERY" message.  Creates the pending request entry
1268  * and sets up all of the data structures to that we will
1269  * process replies properly.  Does not initiate forwarding or
1270  * local database lookups.
1271  *
1272  * @param other the other peer involved (sender or receiver, NULL
1273  *        for loopback messages where we are both sender and receiver)
1274  * @param message the actual message
1275  * @return pending request handle, NULL on error
1276  */
1277 struct GSF_PendingRequest *
1278 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1279                        const struct GNUNET_MessageHeader *message)
1280 {
1281   struct PeerRequest *peerreq;
1282   struct GSF_PendingRequest *pr;
1283   struct GSF_ConnectedPeer *cp;
1284   struct GSF_ConnectedPeer *cps;
1285   const struct GNUNET_PeerIdentity *target;
1286   enum GSF_PendingRequestOptions options;
1287   uint16_t msize;
1288   const struct GetMessage *gm;
1289   unsigned int bits;
1290   const struct GNUNET_PeerIdentity *opt;
1291   uint32_t bm;
1292   size_t bfsize;
1293   uint32_t ttl_decrement;
1294   struct TestExistClosure tec;
1295   GNUNET_PEER_Id spid;
1296   const struct GSF_PendingRequestData *prd;
1297
1298   msize = ntohs (message->size);
1299   if (msize < sizeof (struct GetMessage))
1300   {
1301     GNUNET_break_op (0);
1302     return NULL;
1303   }
1304   GNUNET_STATISTICS_update (GSF_stats,
1305                             gettext_noop
1306                             ("# GET requests received (from other peers)"),
1307                             1,
1308                             GNUNET_NO);
1309   gm = (const struct GetMessage *) message;
1310   tec.type = ntohl (gm->type);
1311   bm = ntohl (gm->hash_bitmap);
1312   bits = 0;
1313   while (bm > 0)
1314   {
1315     if (1 == (bm & 1))
1316       bits++;
1317     bm >>= 1;
1318   }
1319   if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
1320   {
1321     GNUNET_break_op (0);
1322     return NULL;
1323   }
1324   opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1325   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
1326   /* bfsize must be power of 2, check! */
1327   if (0 != ((bfsize - 1) & bfsize))
1328   {
1329     GNUNET_break_op (0);
1330     return NULL;
1331   }
1332   GSF_cover_query_count++;
1333   bm = ntohl (gm->hash_bitmap);
1334   bits = 0;
1335   cps = GSF_peer_get_ (other);
1336   if (NULL == cps)
1337   {
1338     /* peer must have just disconnected */
1339     GNUNET_STATISTICS_update (GSF_stats,
1340                               gettext_noop
1341                               ("# requests dropped due to initiator not being connected"),
1342                               1, GNUNET_NO);
1343     return NULL;
1344   }
1345   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1346     cp = GSF_peer_get_ (&opt[bits++]);
1347   else
1348     cp = cps;
1349   if (NULL == cp)
1350   {
1351     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1352       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1353                   "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1354                   GNUNET_i2s (&opt[bits - 1]));
1355
1356     else
1357       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1358                   "Failed to find peer `%s' in connection set. Dropping query.\n",
1359                   GNUNET_i2s (other));
1360     GNUNET_STATISTICS_update (GSF_stats,
1361                               gettext_noop
1362                               ("# requests dropped due to missing reverse route"),
1363                               1,
1364                               GNUNET_NO);
1365     return NULL;
1366   }
1367   if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER)
1368   {
1369     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1370                 "Peer `%s' has too many replies queued already. Dropping query.\n",
1371                 GNUNET_i2s (other));
1372     GNUNET_STATISTICS_update (GSF_stats,
1373                               gettext_noop ("# requests dropped due to full reply queue"),
1374                               1,
1375                               GNUNET_NO);
1376     return NULL;
1377   }
1378   /* note that we can really only check load here since otherwise
1379    * peers could find out that we are overloaded by not being
1380    * disconnected after sending us a malformed query... */
1381   tec.priority = bound_priority (ntohl (gm->priority),
1382                                  cps);
1383   if (tec.priority < 0)
1384   {
1385     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386                 "Dropping query from `%s', this peer is too busy.\n",
1387                 GNUNET_i2s (other));
1388     return NULL;
1389   }
1390   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1391               "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1392               GNUNET_h2s (&gm->query),
1393               (unsigned int) tec.type,
1394               GNUNET_i2s (other),
1395               (unsigned int) bm);
1396   target =
1397       (0 !=
1398        (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1399   options = GSF_PRO_DEFAULTS;
1400   spid = 0;
1401   if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + tec.priority))
1402       || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1403           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
1404           GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1405   {
1406     /* don't have BW to send to peer, or would likely take longer than we have for it,
1407      * so at best indirect the query */
1408     tec.priority = 0;
1409     options |= GSF_PRO_FORWARD_ONLY;
1410     spid = GNUNET_PEER_intern (other);
1411     GNUNET_assert (0 != spid);
1412   }
1413   tec.ttl = bound_ttl (ntohl (gm->ttl),
1414                        tec.priority);
1415   /* decrement ttl (always) */
1416   ttl_decrement =
1417       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1418                                                     TTL_DECREMENT);
1419   if ((tec.ttl < 0) && (((int32_t) (tec.ttl - ttl_decrement)) > 0))
1420   {
1421     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1422                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1423                 GNUNET_i2s (other),
1424                 tec.ttl,
1425                 ttl_decrement);
1426     GNUNET_STATISTICS_update (GSF_stats,
1427                               gettext_noop
1428                               ("# requests dropped due TTL underflow"), 1,
1429                               GNUNET_NO);
1430     /* integer underflow => drop (should be very rare)! */
1431     return NULL;
1432   }
1433   tec.ttl -= ttl_decrement;
1434
1435   /* test if the request already exists */
1436   tec.finished = GNUNET_NO;
1437   GNUNET_CONTAINER_multihashmap_get_multiple (cp->request_map,
1438                                               &gm->query,
1439                                               &test_exist_cb,
1440                                               &tec);
1441   if (GNUNET_YES == tec.finished)
1442     return NULL; /* merged into existing request, we're done */
1443
1444   peerreq = GNUNET_new (struct PeerRequest);
1445   peerreq->cp = cp;
1446   pr = GSF_pending_request_create_ (options,
1447                                     tec.type,
1448                                     &gm->query,
1449                                     target,
1450                                     (bfsize > 0)
1451                                     ? (const char *) &opt[bits]
1452                                     : NULL,
1453                                     bfsize,
1454                                     ntohl (gm->filter_mutator),
1455                                     1 /* anonymity */,
1456                                     (uint32_t) tec.priority,
1457                                     tec.ttl,
1458                                     spid,
1459                                     GNUNET_PEER_intern (other),
1460                                     NULL, 0,        /* replies_seen */
1461                                     &handle_p2p_reply,
1462                                     peerreq);
1463   GNUNET_assert (NULL != pr);
1464   prd = GSF_pending_request_get_data_ (pr);
1465   peerreq->pr = pr;
1466   GNUNET_break (GNUNET_OK ==
1467                 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1468                                                    &prd->query,
1469                                                    peerreq,
1470                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1471   GNUNET_STATISTICS_update (GSF_stats,
1472                             gettext_noop ("# P2P query messages received and processed"),
1473                             1,
1474                             GNUNET_NO);
1475   GNUNET_STATISTICS_update (GSF_stats,
1476                             gettext_noop ("# P2P searches active"),
1477                             1,
1478                             GNUNET_NO);
1479   return pr;
1480 }
1481
1482
1483 /**
1484  * Function called if there has been a timeout trying to satisfy
1485  * a transmission request.
1486  *
1487  * @param cls the `struct GSF_PeerTransmitHandle` of the request
1488  */
1489 static void
1490 peer_transmit_timeout (void *cls)
1491 {
1492   struct GSF_PeerTransmitHandle *pth = cls;
1493   struct GSF_ConnectedPeer *cp;
1494
1495   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1496               "Timeout trying to transmit to other peer\n");
1497   pth->timeout_task = NULL;
1498   cp = pth->cp;
1499   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1500                                cp->pth_tail,
1501                                pth);
1502   if (GNUNET_YES == pth->is_query)
1503     GNUNET_assert (0 < cp->ppd.pending_queries--);
1504   else if (GNUNET_NO == pth->is_query)
1505     GNUNET_assert (0 < cp->ppd.pending_replies--);
1506   GNUNET_LOAD_update (cp->ppd.transmission_delay,
1507                       UINT64_MAX);
1508   if (NULL != cp->cth)
1509   {
1510     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1511     cp->cth = NULL;
1512   }
1513   pth->gmc (pth->gmc_cls, 0, NULL);
1514   GNUNET_assert (0 == cp->cth_in_progress);
1515   GNUNET_free (pth);
1516 }
1517
1518
1519 /**
1520  * Transmit a message to the given peer as soon as possible.
1521  * If the peer disconnects before the transmission can happen,
1522  * the callback is invoked with a `NULL` @a buffer.
1523  *
1524  * @param cp target peer
1525  * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1526  * @param priority how important is this request?
1527  * @param timeout when does this request timeout (call gmc with error)
1528  * @param size number of bytes we would like to send to the peer
1529  * @param gmc function to call to get the message
1530  * @param gmc_cls closure for @a gmc
1531  * @return handle to cancel request
1532  */
1533 struct GSF_PeerTransmitHandle *
1534 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1535                     int is_query,
1536                     uint32_t priority,
1537                     struct GNUNET_TIME_Relative timeout,
1538                     size_t size,
1539                     GSF_GetMessageCallback gmc, void *gmc_cls)
1540 {
1541   struct GSF_PeerTransmitHandle *pth;
1542   struct GSF_PeerTransmitHandle *pos;
1543   struct GSF_PeerTransmitHandle *prev;
1544
1545   pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1546   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1547   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1548   pth->gmc = gmc;
1549   pth->gmc_cls = gmc_cls;
1550   pth->size = size;
1551   pth->is_query = is_query;
1552   pth->priority = priority;
1553   pth->cp = cp;
1554   /* insertion sort (by priority, descending) */
1555   prev = NULL;
1556   pos = cp->pth_head;
1557   while ((NULL != pos) && (pos->priority > priority))
1558   {
1559     prev = pos;
1560     pos = pos->next;
1561   }
1562   GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1563                                      cp->pth_tail,
1564                                      prev,
1565                                      pth);
1566   if (GNUNET_YES == is_query)
1567     cp->ppd.pending_queries++;
1568   else if (GNUNET_NO == is_query)
1569     cp->ppd.pending_replies++;
1570   pth->timeout_task
1571     = GNUNET_SCHEDULER_add_delayed (timeout,
1572                                     &peer_transmit_timeout,
1573                                     pth);
1574   schedule_transmission (pth);
1575   return pth;
1576 }
1577
1578
1579 /**
1580  * Cancel an earlier request for transmission.
1581  *
1582  * @param pth request to cancel
1583  */
1584 void
1585 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1586 {
1587   struct GSF_ConnectedPeer *cp;
1588
1589   if (NULL != pth->timeout_task)
1590   {
1591     GNUNET_SCHEDULER_cancel (pth->timeout_task);
1592     pth->timeout_task = NULL;
1593   }
1594   cp = pth->cp;
1595   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1596                                cp->pth_tail,
1597                                pth);
1598   if (GNUNET_YES == pth->is_query)
1599     GNUNET_assert (0 < cp->ppd.pending_queries--);
1600   else if (GNUNET_NO == pth->is_query)
1601     GNUNET_assert (0 < cp->ppd.pending_replies--);
1602   GNUNET_free (pth);
1603 }
1604
1605
1606 /**
1607  * Report on receiving a reply; update the performance record of the given peer.
1608  *
1609  * @param cp responding peer (will be updated)
1610  * @param request_time time at which the original query was transmitted
1611  * @param request_priority priority of the original request
1612  */
1613 void
1614 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1615                               struct GNUNET_TIME_Absolute request_time,
1616                               uint32_t request_priority)
1617 {
1618   struct GNUNET_TIME_Relative delay;
1619
1620   delay = GNUNET_TIME_absolute_get_duration (request_time);
1621   cp->ppd.avg_reply_delay.rel_value_us =
1622       (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
1623        delay.rel_value_us) / RUNAVG_DELAY_N;
1624   cp->ppd.avg_priority =
1625       (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1626        request_priority) / RUNAVG_DELAY_N;
1627 }
1628
1629
1630 /**
1631  * Report on receiving a reply in response to an initiating client.
1632  * Remember that this peer is good for this client.
1633  *
1634  * @param cp responding peer (will be updated)
1635  * @param initiator_client local client on responsible for query
1636  */
1637 void
1638 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1639                                    struct GSF_LocalClient *initiator_client)
1640 {
1641   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1642                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1643 }
1644
1645
1646 /**
1647  * Report on receiving a reply in response to an initiating peer.
1648  * Remember that this peer is good for this initiating peer.
1649  *
1650  * @param cp responding peer (will be updated)
1651  * @param initiator_peer other peer responsible for query
1652  */
1653 void
1654 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1655                                  const struct GSF_ConnectedPeer *initiator_peer)
1656 {
1657   unsigned int woff;
1658
1659   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1660   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1661   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1662   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1663   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1664 }
1665
1666
1667 /**
1668  * Write peer-respect information to a file - flush the buffer entry!
1669  *
1670  * @param cls unused
1671  * @param key peer identity
1672  * @param value the `struct GSF_ConnectedPeer` to flush
1673  * @return #GNUNET_OK to continue iteration
1674  */
1675 static int
1676 flush_respect (void *cls,
1677                const struct GNUNET_PeerIdentity *key,
1678                void *value)
1679 {
1680   struct GSF_ConnectedPeer *cp = value;
1681   struct GNUNET_PeerIdentity pid;
1682
1683   if (cp->ppd.respect == cp->disk_respect)
1684     return GNUNET_OK;           /* unchanged */
1685   GNUNET_assert (0 != cp->ppd.pid);
1686   GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1687   GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1688                           sizeof (cp->ppd.respect),
1689                           GNUNET_TIME_UNIT_FOREVER_ABS,
1690                           GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL);
1691   return GNUNET_OK;
1692 }
1693
1694
1695 /**
1696  * A peer disconnected from us.  Tear down the connected peer
1697  * record.
1698  *
1699  * @param cls unused
1700  * @param peer identity of peer that connected
1701  */
1702 void
1703 GSF_peer_disconnect_handler_ (void *cls,
1704                               const struct GNUNET_PeerIdentity *peer)
1705 {
1706   struct GSF_ConnectedPeer *cp;
1707   struct GSF_PeerTransmitHandle *pth;
1708   struct GSF_DelayedHandle *dh;
1709
1710   cp = GSF_peer_get_ (peer);
1711   if (NULL == cp)
1712     return;                     /* must have been disconnect from core with
1713                                  * 'peer' == my_id, ignore */
1714   flush_respect (NULL, peer, cp);
1715   GNUNET_assert (GNUNET_YES ==
1716                  GNUNET_CONTAINER_multipeermap_remove (cp_map,
1717                                                        peer,
1718                                                        cp));
1719   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1720                          GNUNET_CONTAINER_multipeermap_size (cp_map),
1721                          GNUNET_NO);
1722   if (NULL != cp->respect_iterate_req)
1723   {
1724     GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1725     cp->respect_iterate_req = NULL;
1726   }
1727   if (NULL != cp->migration_pth)
1728   {
1729     GSF_peer_transmit_cancel_ (cp->migration_pth);
1730     cp->migration_pth = NULL;
1731   }
1732   if (NULL != cp->rc)
1733   {
1734     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1735     cp->rc = NULL;
1736   }
1737   if (NULL != cp->rc_delay_task)
1738   {
1739     GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1740     cp->rc_delay_task = NULL;
1741   }
1742   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1743                                          &cancel_pending_request,
1744                                          cp);
1745   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1746   cp->request_map = NULL;
1747   GSF_plan_notify_peer_disconnect_ (cp);
1748   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1749   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies,
1750                              P2P_SUCCESS_LIST_SIZE);
1751   memset (cp->ppd.last_p2p_replies,
1752           0,
1753           sizeof (cp->ppd.last_p2p_replies));
1754   GSF_push_stop_ (cp);
1755   if (NULL != cp->cth)
1756   {
1757     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1758     cp->cth = NULL;
1759   }
1760   GNUNET_assert (0 == cp->cth_in_progress);
1761   while (NULL != (pth = cp->pth_head))
1762   {
1763     if (pth->timeout_task != NULL)
1764     {
1765       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1766       pth->timeout_task = NULL;
1767     }
1768     GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1769                                  cp->pth_tail,
1770                                  pth);
1771     if (GNUNET_YES == pth->is_query)
1772       GNUNET_assert (0 < cp->ppd.pending_queries--);
1773     else if (GNUNET_NO == pth->is_query)
1774       GNUNET_assert (0 < cp->ppd.pending_replies--);
1775     pth->gmc (pth->gmc_cls, 0, NULL);
1776     GNUNET_free (pth);
1777   }
1778   while (NULL != (dh = cp->delayed_head))
1779   {
1780     GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1781                                  cp->delayed_tail,
1782                                  dh);
1783     cp->delay_queue_size--;
1784     GNUNET_SCHEDULER_cancel (dh->delay_task);
1785     GNUNET_free (dh->pm);
1786     GNUNET_free (dh);
1787   }
1788   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1789   if (NULL != cp->mig_revive_task)
1790   {
1791     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1792     cp->mig_revive_task = NULL;
1793   }
1794   GNUNET_break (0 == cp->ppd.pending_queries);
1795   GNUNET_break (0 == cp->ppd.pending_replies);
1796   GNUNET_free (cp);
1797 }
1798
1799
1800 /**
1801  * Closure for #call_iterator().
1802  */
1803 struct IterationContext
1804 {
1805   /**
1806    * Function to call on each entry.
1807    */
1808   GSF_ConnectedPeerIterator it;
1809
1810   /**
1811    * Closure for @e it.
1812    */
1813   void *it_cls;
1814 };
1815
1816
1817 /**
1818  * Function that calls the callback for each peer.
1819  *
1820  * @param cls the `struct IterationContext *`
1821  * @param key identity of the peer
1822  * @param value the `struct GSF_ConnectedPeer *`
1823  * @return #GNUNET_YES to continue iteration
1824  */
1825 static int
1826 call_iterator (void *cls,
1827                const struct GNUNET_PeerIdentity *key,
1828                void *value)
1829 {
1830   struct IterationContext *ic = cls;
1831   struct GSF_ConnectedPeer *cp = value;
1832
1833   ic->it (ic->it_cls,
1834           key, cp,
1835           &cp->ppd);
1836   return GNUNET_YES;
1837 }
1838
1839
1840 /**
1841  * Iterate over all connected peers.
1842  *
1843  * @param it function to call for each peer
1844  * @param it_cls closure for @a it
1845  */
1846 void
1847 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1848                               void *it_cls)
1849 {
1850   struct IterationContext ic;
1851
1852   ic.it = it;
1853   ic.it_cls = it_cls;
1854   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1855                                          &call_iterator,
1856                                          &ic);
1857 }
1858
1859
1860 /**
1861  * Obtain the identity of a connected peer.
1862  *
1863  * @param cp peer to get identity of
1864  * @param id identity to set (written to)
1865  */
1866 void
1867 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1868                                   struct GNUNET_PeerIdentity *id)
1869 {
1870   GNUNET_assert (0 != cp->ppd.pid);
1871   GNUNET_PEER_resolve (cp->ppd.pid, id);
1872 }
1873
1874
1875 /**
1876  * Obtain the identity of a connected peer.
1877  *
1878  * @param cp peer to get identity of
1879  * @return reference to peer identity, valid until peer disconnects (!)
1880  */
1881 const struct GNUNET_PeerIdentity *
1882 GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1883 {
1884   GNUNET_assert (0 != cp->ppd.pid);
1885   return GNUNET_PEER_resolve2 (cp->ppd.pid);
1886 }
1887
1888
1889 /**
1890  * Assemble a migration stop message for transmission.
1891  *
1892  * @param cls the `struct GSF_ConnectedPeer` to use
1893  * @param size number of bytes we're allowed to write to @a buf
1894  * @param buf where to copy the message
1895  * @return number of bytes copied to @a buf
1896  */
1897 static size_t
1898 create_migration_stop_message (void *cls,
1899                                size_t size,
1900                                void *buf)
1901 {
1902   struct GSF_ConnectedPeer *cp = cls;
1903   struct MigrationStopMessage msm;
1904
1905   cp->migration_pth = NULL;
1906   if (NULL == buf)
1907     return 0;
1908   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1909   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1910   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1911   msm.reserved = htonl (0);
1912   msm.duration =
1913       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1914                                  (cp->last_migration_block));
1915   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1916   GNUNET_STATISTICS_update (GSF_stats,
1917                             gettext_noop ("# migration stop messages sent"),
1918                             1, GNUNET_NO);
1919   return sizeof (struct MigrationStopMessage);
1920 }
1921
1922
1923 /**
1924  * Ask a peer to stop migrating data to us until the given point
1925  * in time.
1926  *
1927  * @param cp peer to ask
1928  * @param block_time until when to block
1929  */
1930 void
1931 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1932                            struct GNUNET_TIME_Absolute block_time)
1933 {
1934   if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1935   {
1936     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1937                 "Migration already blocked for another %s\n",
1938                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
1939                                                         (cp->last_migration_block), GNUNET_YES));
1940     return;                     /* already blocked */
1941   }
1942   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1943               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
1944                                                       GNUNET_YES));
1945   cp->last_migration_block = block_time;
1946   if (NULL != cp->migration_pth)
1947     GSF_peer_transmit_cancel_ (cp->migration_pth);
1948   cp->migration_pth =
1949       GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1950                           GNUNET_TIME_UNIT_FOREVER_REL,
1951                           sizeof (struct MigrationStopMessage),
1952                           &create_migration_stop_message, cp);
1953 }
1954
1955
1956 /**
1957  * Notify core about a preference we have for the given peer
1958  * (to allocate more resources towards it).  The change will
1959  * be communicated the next time we reserve bandwidth with
1960  * core (not instantly).
1961  *
1962  * @param cp peer to reserve bandwidth from
1963  * @param pref preference change
1964  */
1965 void
1966 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1967                                        uint64_t pref)
1968 {
1969   cp->inc_preference += pref;
1970 }
1971
1972
1973 /**
1974  * Call this method periodically to flush respect information to disk.
1975  *
1976  * @param cls closure, not used
1977  */
1978 static void
1979 cron_flush_respect (void *cls)
1980 {
1981   fr_task = NULL;
1982   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1983                                          &flush_respect,
1984                                          NULL);
1985   fr_task = GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1986                                                         GNUNET_SCHEDULER_PRIORITY_HIGH,
1987                                                         &cron_flush_respect, NULL);
1988 }
1989
1990
1991 /**
1992  * Initialize peer management subsystem.
1993  */
1994 void
1995 GSF_connected_peer_init_ ()
1996 {
1997   cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1998   peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
1999   fr_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
2000                                                 &cron_flush_respect, NULL);
2001 }
2002
2003
2004 /**
2005  * Iterator to free peer entries.
2006  *
2007  * @param cls closure, unused
2008  * @param key current key code
2009  * @param value value in the hash map (peer entry)
2010  * @return #GNUNET_YES (we should continue to iterate)
2011  */
2012 static int
2013 clean_peer (void *cls,
2014             const struct GNUNET_PeerIdentity *key,
2015             void *value)
2016 {
2017   GSF_peer_disconnect_handler_ (NULL, key);
2018   return GNUNET_YES;
2019 }
2020
2021
2022 /**
2023  * Shutdown peer management subsystem.
2024  */
2025 void
2026 GSF_connected_peer_done_ ()
2027 {
2028   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
2029                                          &flush_respect,
2030                                          NULL);
2031   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
2032                                          &clean_peer,
2033                                          NULL);
2034   GNUNET_SCHEDULER_cancel (fr_task);
2035   fr_task = NULL;
2036   GNUNET_CONTAINER_multipeermap_destroy (cp_map);
2037   cp_map = NULL;
2038   GNUNET_PEERSTORE_disconnect (peerstore,
2039                                GNUNET_YES);
2040   
2041 }
2042
2043
2044 /**
2045  * Iterator to remove references to LC entry.
2046  *
2047  * @param cls the `struct GSF_LocalClient *` to look for
2048  * @param key current key code
2049  * @param value value in the hash map (peer entry)
2050  * @return #GNUNET_YES (we should continue to iterate)
2051  */
2052 static int
2053 clean_local_client (void *cls,
2054                     const struct GNUNET_PeerIdentity *key,
2055                     void *value)
2056 {
2057   const struct GSF_LocalClient *lc = cls;
2058   struct GSF_ConnectedPeer *cp = value;
2059   unsigned int i;
2060
2061   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
2062     if (cp->ppd.last_client_replies[i] == lc)
2063       cp->ppd.last_client_replies[i] = NULL;
2064   return GNUNET_YES;
2065 }
2066
2067
2068 /**
2069  * Notification that a local client disconnected.  Clean up all of our
2070  * references to the given handle.
2071  *
2072  * @param lc handle to the local client (henceforth invalid)
2073  */
2074 void
2075 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
2076 {
2077   if (NULL == cp_map)
2078     return;                     /* already cleaned up */
2079   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client,
2080                                          (void *) lc);
2081 }
2082
2083
2084 /* end of gnunet-service-fs_cp.c */