uncrustify as demanded.
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file fs/gnunet-service-fs_cp.c
22  * @brief API to handle 'connected peers'
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_pe.h"
31 #include "gnunet-service-fs_pr.h"
32 #include "gnunet-service-fs_push.h"
33 #include "gnunet_peerstore_service.h"
34
35
36 /**
37  * Ratio for moving average delay calculation.  The previous
38  * average goes in with a factor of (n-1) into the calculation.
39  * Must be > 0.
40  */
41 #define RUNAVG_DELAY_N 16
42
43 /**
44  * How often do we flush respect values to disk?
45  */
46 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
47
48 /**
49  * After how long do we discard a reply?
50  */
51 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
52
53 /**
54  * Collect an instane number of statistics?  May cause excessive IPC.
55  */
56 #define INSANE_STATISTICS GNUNET_NO
57
58
59 /**
60  * Handle to cancel a transmission request.
61  */
62 struct GSF_PeerTransmitHandle {
63   /**
64    * Kept in a doubly-linked list.
65    */
66   struct GSF_PeerTransmitHandle *next;
67
68   /**
69    * Kept in a doubly-linked list.
70    */
71   struct GSF_PeerTransmitHandle *prev;
72
73   /**
74    * Time when this transmission request was issued.
75    */
76   struct GNUNET_TIME_Absolute transmission_request_start_time;
77
78   /**
79    * Envelope with the actual message.
80    */
81   struct GNUNET_MQ_Envelope *env;
82
83   /**
84    * Peer this request targets.
85    */
86   struct GSF_ConnectedPeer *cp;
87
88   /**
89    * #GNUNET_YES if this is a query, #GNUNET_NO for content.
90    */
91   int is_query;
92
93   /**
94    * Did we get a reservation already?
95    */
96   int was_reserved;
97
98   /**
99    * Priority of this request.
100    */
101   uint32_t priority;
102 };
103
104
105 /**
106  * Handle for an entry in our delay list.
107  */
108 struct GSF_DelayedHandle {
109   /**
110    * Kept in a doubly-linked list.
111    */
112   struct GSF_DelayedHandle *next;
113
114   /**
115    * Kept in a doubly-linked list.
116    */
117   struct GSF_DelayedHandle *prev;
118
119   /**
120    * Peer this transmission belongs to.
121    */
122   struct GSF_ConnectedPeer *cp;
123
124   /**
125    * Envelope of the message that was delayed.
126    */
127   struct GNUNET_MQ_Envelope *env;
128
129   /**
130    * Task for the delay.
131    */
132   struct GNUNET_SCHEDULER_Task *delay_task;
133
134   /**
135    * Size of the message.
136    */
137   size_t msize;
138 };
139
140
141 /**
142  * Information per peer and request.
143  */
144 struct PeerRequest {
145   /**
146    * Handle to generic request (generic: from peer or local client).
147    */
148   struct GSF_PendingRequest *pr;
149
150   /**
151    * Which specific peer issued this request?
152    */
153   struct GSF_ConnectedPeer *cp;
154
155   /**
156    * Task for asynchronous stopping of this request.
157    */
158   struct GNUNET_SCHEDULER_Task *kill_task;
159 };
160
161
162 /**
163  * A connected peer.
164  */
165 struct GSF_ConnectedPeer {
166   /**
167    * Performance data for this peer.
168    */
169   struct GSF_PeerPerformanceData ppd;
170
171   /**
172    * Time until when we blocked this peer from migrating
173    * data to us.
174    */
175   struct GNUNET_TIME_Absolute last_migration_block;
176
177   /**
178    * Task scheduled to revive migration to this peer.
179    */
180   struct GNUNET_SCHEDULER_Task *mig_revive_task;
181
182   /**
183    * Messages (replies, queries, content migration) we would like to
184    * send to this peer in the near future.  Sorted by priority, head.
185    */
186   struct GSF_PeerTransmitHandle *pth_head;
187
188   /**
189    * Messages (replies, queries, content migration) we would like to
190    * send to this peer in the near future.  Sorted by priority, tail.
191    */
192   struct GSF_PeerTransmitHandle *pth_tail;
193
194   /**
195    * Messages (replies, queries, content migration) we would like to
196    * send to this peer in the near future.  Sorted by priority, head.
197    */
198   struct GSF_DelayedHandle *delayed_head;
199
200   /**
201    * Messages (replies, queries, content migration) we would like to
202    * send to this peer in the near future.  Sorted by priority, tail.
203    */
204   struct GSF_DelayedHandle *delayed_tail;
205
206   /**
207    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
208    */
209   struct GNUNET_ATS_ReservationContext *rc;
210
211   /**
212    * Task scheduled if we need to retry bandwidth reservation later.
213    */
214   struct GNUNET_SCHEDULER_Task *rc_delay_task;
215
216   /**
217    * Active requests from this neighbour, map of query to `struct PeerRequest`.
218    */
219   struct GNUNET_CONTAINER_MultiHashMap *request_map;
220
221   /**
222    * Handle for an active request for transmission to this
223    * peer.
224    */
225   struct GNUNET_MQ_Handle *mq;
226
227   /**
228    * Increase in traffic preference still to be submitted
229    * to the core service for this peer.
230    */
231   uint64_t inc_preference;
232
233   /**
234    * Number of entries in @e delayed_head DLL.
235    */
236   unsigned int delay_queue_size;
237
238   /**
239    * Respect rating for this peer on disk.
240    */
241   uint32_t disk_respect;
242
243   /**
244    * Which offset in @e last_p2p_replies will be updated next?
245    * (we go round-robin).
246    */
247   unsigned int last_p2p_replies_woff;
248
249   /**
250    * Which offset in @e last_client_replies will be updated next?
251    * (we go round-robin).
252    */
253   unsigned int last_client_replies_woff;
254
255   /**
256    * Current offset into @e last_request_times ring buffer.
257    */
258   unsigned int last_request_times_off;
259
260   /**
261    * #GNUNET_YES if we did successfully reserve 32k bandwidth,
262    * #GNUNET_NO if not.
263    */
264   int did_reserve;
265
266   /**
267    * Handle to the PEERSTORE iterate request for peer respect value
268    */
269   struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
270 };
271
272
273 /**
274  * Map from peer identities to `struct GSF_ConnectPeer` entries.
275  */
276 static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
277
278 /**
279  * Handle to peerstore service.
280  */
281 static struct GNUNET_PEERSTORE_Handle *peerstore;
282
283 /**
284  * Task used to flush respect values to disk.
285  */
286 static struct GNUNET_SCHEDULER_Task *fr_task;
287
288
289 /**
290  * Update the latency information kept for the given peer.
291  *
292  * @param id peer record to update
293  * @param latency current latency value
294  */
295 void
296 GSF_update_peer_latency_(const struct GNUNET_PeerIdentity *id,
297                          struct GNUNET_TIME_Relative latency)
298 {
299   struct GSF_ConnectedPeer *cp;
300
301   cp = GSF_peer_get_(id);
302   if (NULL == cp)
303     return; /* we're not yet connected at the core level, ignore */
304   GNUNET_LOAD_value_set_decline(cp->ppd.transmission_delay,
305                                 latency);
306 }
307
308
309 /**
310  * Return the performance data record for the given peer
311  *
312  * @param cp peer to query
313  * @return performance data record for the peer
314  */
315 struct GSF_PeerPerformanceData *
316 GSF_get_peer_performance_data_(struct GSF_ConnectedPeer *cp)
317 {
318   return &cp->ppd;
319 }
320
321
322 /**
323  * Core is ready to transmit to a peer, get the message.
324  *
325  * @param cp which peer to send a message to
326  */
327 static void
328 peer_transmit(struct GSF_ConnectedPeer *cp);
329
330
331 /**
332  * Function called by core upon success or failure of our bandwidth reservation request.
333  *
334  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
335  * @param peer identifies the peer
336  * @param amount set to the amount that was actually reserved or unreserved;
337  *               either the full requested amount or zero (no partial reservations)
338  * @param res_delay if the reservation could not be satisfied (amount was 0), how
339  *        long should the client wait until re-trying?
340  */
341 static void
342 ats_reserve_callback(void *cls,
343                      const struct GNUNET_PeerIdentity *peer,
344                      int32_t amount,
345                      struct GNUNET_TIME_Relative res_delay);
346
347
348 /**
349  * If ready (bandwidth reserved), try to schedule transmission via
350  * core for the given handle.
351  *
352  * @param pth transmission handle to schedule
353  */
354 static void
355 schedule_transmission(struct GSF_PeerTransmitHandle *pth)
356 {
357   struct GSF_ConnectedPeer *cp;
358   struct GNUNET_PeerIdentity target;
359
360   cp = pth->cp;
361   GNUNET_assert(0 != cp->ppd.pid);
362   GNUNET_PEER_resolve(cp->ppd.pid, &target);
363
364   if (0 != cp->inc_preference)
365     {
366       GNUNET_ATS_performance_change_preference(GSF_ats,
367                                                &target,
368                                                GNUNET_ATS_PREFERENCE_BANDWIDTH,
369                                                (double)cp->inc_preference,
370                                                GNUNET_ATS_PREFERENCE_END);
371       cp->inc_preference = 0;
372     }
373
374   if ((GNUNET_YES == pth->is_query) &&
375       (GNUNET_YES != pth->was_reserved))
376     {
377       /* query, need reservation */
378       if (GNUNET_YES != cp->did_reserve)
379         return;                 /* not ready */
380       cp->did_reserve = GNUNET_NO;
381       /* reservation already done! */
382       pth->was_reserved = GNUNET_YES;
383       cp->rc = GNUNET_ATS_reserve_bandwidth(GSF_ats,
384                                             &target,
385                                             DBLOCK_SIZE,
386                                             &ats_reserve_callback,
387                                             cp);
388       return;
389     }
390   peer_transmit(cp);
391 }
392
393
394 /**
395  * Core is ready to transmit to a peer, get the message.
396  *
397  * @param cp which peer to send a message to
398  */
399 static void
400 peer_transmit(struct GSF_ConnectedPeer *cp)
401 {
402   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
403   struct GSF_PeerTransmitHandle *pos;
404
405   if (NULL == pth)
406     return;
407   GNUNET_CONTAINER_DLL_remove(cp->pth_head,
408                               cp->pth_tail,
409                               pth);
410   if (GNUNET_YES == pth->is_query)
411     {
412       cp->ppd.last_request_times[(cp->last_request_times_off++) %
413                                  MAX_QUEUE_PER_PEER] =
414         GNUNET_TIME_absolute_get();
415       GNUNET_assert(0 < cp->ppd.pending_queries--);
416     }
417   else if (GNUNET_NO == pth->is_query)
418     {
419       GNUNET_assert(0 < cp->ppd.pending_replies--);
420     }
421   GNUNET_LOAD_update(cp->ppd.transmission_delay,
422                      GNUNET_TIME_absolute_get_duration
423                        (pth->transmission_request_start_time).rel_value_us);
424   GNUNET_MQ_send(cp->mq,
425                  pth->env);
426   GNUNET_free(pth);
427   if (NULL != (pos = cp->pth_head))
428     {
429       GNUNET_assert(pos != pth);
430       schedule_transmission(pos);
431     }
432 }
433
434
435 /**
436  * (re)try to reserve bandwidth from the given peer.
437  *
438  * @param cls the `struct GSF_ConnectedPeer` to reserve from
439  */
440 static void
441 retry_reservation(void *cls)
442 {
443   struct GSF_ConnectedPeer *cp = cls;
444   struct GNUNET_PeerIdentity target;
445
446   GNUNET_PEER_resolve(cp->ppd.pid, &target);
447   cp->rc_delay_task = NULL;
448   cp->rc =
449     GNUNET_ATS_reserve_bandwidth(GSF_ats,
450                                  &target,
451                                  DBLOCK_SIZE,
452                                  &ats_reserve_callback, cp);
453 }
454
455
456 /**
457  * Function called by core upon success or failure of our bandwidth reservation request.
458  *
459  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
460  * @param peer identifies the peer
461  * @param amount set to the amount that was actually reserved or unreserved;
462  *               either the full requested amount or zero (no partial reservations)
463  * @param res_delay if the reservation could not be satisfied (amount was 0), how
464  *        long should the client wait until re-trying?
465  */
466 static void
467 ats_reserve_callback(void *cls,
468                      const struct GNUNET_PeerIdentity *peer,
469                      int32_t amount,
470                      struct GNUNET_TIME_Relative res_delay)
471 {
472   struct GSF_ConnectedPeer *cp = cls;
473   struct GSF_PeerTransmitHandle *pth;
474
475   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
476              "Reserved %d bytes / need to wait %s for reservation\n",
477              (int)amount,
478              GNUNET_STRINGS_relative_time_to_string(res_delay, GNUNET_YES));
479   cp->rc = NULL;
480   if (0 == amount)
481     {
482       cp->rc_delay_task =
483         GNUNET_SCHEDULER_add_delayed(res_delay,
484                                      &retry_reservation,
485                                      cp);
486       return;
487     }
488   cp->did_reserve = GNUNET_YES;
489   pth = cp->pth_head;
490   if (NULL != pth)
491     {
492       /* reservation success, try transmission now! */
493       peer_transmit(cp);
494     }
495 }
496
497
498 /**
499  * Function called by PEERSTORE with peer respect record
500  *
501  * @param cls handle to connected peer entry
502  * @param record peerstore record information
503  * @param emsg error message, or NULL if no errors
504  */
505 static void
506 peer_respect_cb(void *cls,
507                 const struct GNUNET_PEERSTORE_Record *record,
508                 const char *emsg)
509 {
510   struct GSF_ConnectedPeer *cp = cls;
511
512   GNUNET_assert(NULL != cp->respect_iterate_req);
513   if ((NULL != record) &&
514       (sizeof(cp->disk_respect) == record->value_size))
515     {
516       cp->disk_respect = *((uint32_t *)record->value);
517       cp->ppd.respect += *((uint32_t *)record->value);
518     }
519   GSF_push_start_(cp);
520   if (NULL != record)
521     GNUNET_PEERSTORE_iterate_cancel(cp->respect_iterate_req);
522   cp->respect_iterate_req = NULL;
523 }
524
525
526 /**
527  * Function called for each pending request whenever a new
528  * peer connects, giving us a chance to decide about submitting
529  * the existing request to the new peer.
530  *
531  * @param cls the `struct GSF_ConnectedPeer` of the new peer
532  * @param key query for the request
533  * @param pr handle to the pending request
534  * @return #GNUNET_YES to continue to iterate
535  */
536 static int
537 consider_peer_for_forwarding(void *cls,
538                              const struct GNUNET_HashCode *key,
539                              struct GSF_PendingRequest *pr)
540 {
541   struct GSF_ConnectedPeer *cp = cls;
542   struct GNUNET_PeerIdentity pid;
543
544   if (GNUNET_YES !=
545       GSF_pending_request_test_active_(pr))
546     return GNUNET_YES; /* request is not actually active, skip! */
547   GSF_connected_peer_get_identity_(cp, &pid);
548   if (GNUNET_YES !=
549       GSF_pending_request_test_target_(pr, &pid))
550     {
551       GNUNET_STATISTICS_update(GSF_stats,
552                                gettext_noop("# Loopback routes suppressed"),
553                                1,
554                                GNUNET_NO);
555       return GNUNET_YES;
556     }
557   GSF_plan_add_(cp, pr);
558   return GNUNET_YES;
559 }
560
561
562 /**
563  * A peer connected to us.  Setup the connected peer
564  * records.
565  *
566  * @param cls NULL
567  * @param peer identity of peer that connected
568  * @param mq message queue for talking to @a peer
569  * @return our internal handle for the peer
570  */
571 void *
572 GSF_peer_connect_handler(void *cls,
573                          const struct GNUNET_PeerIdentity *peer,
574                          struct GNUNET_MQ_Handle *mq)
575 {
576   struct GSF_ConnectedPeer *cp;
577
578   if (0 ==
579       GNUNET_memcmp(&GSF_my_id,
580                     peer))
581     return NULL;
582   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
583              "Connected to peer %s\n",
584              GNUNET_i2s(peer));
585   cp = GNUNET_new(struct GSF_ConnectedPeer);
586   cp->ppd.pid = GNUNET_PEER_intern(peer);
587   cp->ppd.peer = peer;
588   cp->mq = mq;
589   cp->ppd.transmission_delay = GNUNET_LOAD_value_init(GNUNET_TIME_UNIT_ZERO);
590   cp->rc =
591     GNUNET_ATS_reserve_bandwidth(GSF_ats,
592                                  peer,
593                                  DBLOCK_SIZE,
594                                  &ats_reserve_callback, cp);
595   cp->request_map = GNUNET_CONTAINER_multihashmap_create(128,
596                                                          GNUNET_YES);
597   GNUNET_break(GNUNET_OK ==
598                GNUNET_CONTAINER_multipeermap_put(cp_map,
599                                                  GSF_connected_peer_get_identity2_(cp),
600                                                  cp,
601                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
602   GNUNET_STATISTICS_set(GSF_stats,
603                         gettext_noop("# peers connected"),
604                         GNUNET_CONTAINER_multipeermap_size(cp_map),
605                         GNUNET_NO);
606   cp->respect_iterate_req
607     = GNUNET_PEERSTORE_iterate(peerstore,
608                                "fs",
609                                peer,
610                                "respect",
611                                &peer_respect_cb,
612                                cp);
613   GSF_iterate_pending_requests_(&consider_peer_for_forwarding,
614                                 cp);
615   return cp;
616 }
617
618
619 /**
620  * It may be time to re-start migrating content to this
621  * peer.  Check, and if so, restart migration.
622  *
623  * @param cls the `struct GSF_ConnectedPeer`
624  */
625 static void
626 revive_migration(void *cls)
627 {
628   struct GSF_ConnectedPeer *cp = cls;
629   struct GNUNET_TIME_Relative bt;
630
631   cp->mig_revive_task = NULL;
632   bt = GNUNET_TIME_absolute_get_remaining(cp->ppd.migration_blocked_until);
633   if (0 != bt.rel_value_us)
634     {
635       /* still time left... */
636       cp->mig_revive_task =
637         GNUNET_SCHEDULER_add_delayed(bt, &revive_migration, cp);
638       return;
639     }
640   GSF_push_start_(cp);
641 }
642
643
644 /**
645  * Get a handle for a connected peer.
646  *
647  * @param peer peer's identity
648  * @return NULL if the peer is not currently connected
649  */
650 struct GSF_ConnectedPeer *
651 GSF_peer_get_(const struct GNUNET_PeerIdentity *peer)
652 {
653   if (NULL == cp_map)
654     return NULL;
655   return GNUNET_CONTAINER_multipeermap_get(cp_map, peer);
656 }
657
658
659 /**
660  * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
661  *
662  * @param cls closure, the `struct GSF_ConnectedPeer`
663  * @param msm the actual message
664  */
665 void
666 handle_p2p_migration_stop(void *cls,
667                           const struct MigrationStopMessage *msm)
668 {
669   struct GSF_ConnectedPeer *cp = cls;
670   struct GNUNET_TIME_Relative bt;
671
672   GNUNET_STATISTICS_update(GSF_stats,
673                            gettext_noop("# migration stop messages received"),
674                            1, GNUNET_NO);
675   bt = GNUNET_TIME_relative_ntoh(msm->duration);
676   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
677              _("Migration of content to peer `%s' blocked for %s\n"),
678              GNUNET_i2s(cp->ppd.peer),
679              GNUNET_STRINGS_relative_time_to_string(bt, GNUNET_YES));
680   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute(bt);
681   if ((NULL == cp->mig_revive_task) &&
682       (NULL == cp->respect_iterate_req))
683     {
684       GSF_push_stop_(cp);
685       cp->mig_revive_task =
686         GNUNET_SCHEDULER_add_delayed(bt,
687                                      &revive_migration, cp);
688     }
689 }
690
691
692 /**
693  * Free resources associated with the given peer request.
694  *
695  * @param peerreq request to free
696  */
697 static void
698 free_pending_request(struct PeerRequest *peerreq)
699 {
700   struct GSF_ConnectedPeer *cp = peerreq->cp;
701   struct GSF_PendingRequestData *prd;
702
703   prd = GSF_pending_request_get_data_(peerreq->pr);
704   if (NULL != peerreq->kill_task)
705     {
706       GNUNET_SCHEDULER_cancel(peerreq->kill_task);
707       peerreq->kill_task = NULL;
708     }
709   GNUNET_STATISTICS_update(GSF_stats,
710                            gettext_noop("# P2P searches active"),
711                            -1,
712                            GNUNET_NO);
713   GNUNET_break(GNUNET_YES ==
714                GNUNET_CONTAINER_multihashmap_remove(cp->request_map,
715                                                     &prd->query,
716                                                     peerreq));
717   GNUNET_free(peerreq);
718 }
719
720
721 /**
722  * Cancel all requests associated with the peer.
723  *
724  * @param cls unused
725  * @param query hash code of the request
726  * @param value the `struct GSF_PendingRequest`
727  * @return #GNUNET_YES (continue to iterate)
728  */
729 static int
730 cancel_pending_request(void *cls,
731                        const struct GNUNET_HashCode *query,
732                        void *value)
733 {
734   struct PeerRequest *peerreq = value;
735   struct GSF_PendingRequest *pr = peerreq->pr;
736
737   free_pending_request(peerreq);
738   GSF_pending_request_cancel_(pr,
739                               GNUNET_NO);
740   return GNUNET_OK;
741 }
742
743
744 /**
745  * Free the given request.
746  *
747  * @param cls the request to free
748  */
749 static void
750 peer_request_destroy(void *cls)
751 {
752   struct PeerRequest *peerreq = cls;
753   struct GSF_PendingRequest *pr = peerreq->pr;
754   struct GSF_PendingRequestData *prd;
755
756   peerreq->kill_task = NULL;
757   prd = GSF_pending_request_get_data_(pr);
758   cancel_pending_request(NULL,
759                          &prd->query,
760                          peerreq);
761 }
762
763
764 /**
765  * The artificial delay is over, transmit the message now.
766  *
767  * @param cls the `struct GSF_DelayedHandle` with the message
768  */
769 static void
770 transmit_delayed_now(void *cls)
771 {
772   struct GSF_DelayedHandle *dh = cls;
773   struct GSF_ConnectedPeer *cp = dh->cp;
774
775   GNUNET_CONTAINER_DLL_remove(cp->delayed_head,
776                               cp->delayed_tail,
777                               dh);
778   cp->delay_queue_size--;
779   GSF_peer_transmit_(cp,
780                      GNUNET_NO,
781                      UINT32_MAX,
782                      dh->env);
783   GNUNET_free(dh);
784 }
785
786
787 /**
788  * Get the randomized delay a response should be subjected to.
789  *
790  * @return desired delay
791  */
792 static struct GNUNET_TIME_Relative
793 get_randomized_delay()
794 {
795   struct GNUNET_TIME_Relative ret;
796
797   ret =
798     GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS,
799                                   GNUNET_CRYPTO_random_u32
800                                     (GNUNET_CRYPTO_QUALITY_WEAK,
801                                     2 * GSF_avg_latency.rel_value_us + 1));
802 #if INSANE_STATISTICS
803   GNUNET_STATISTICS_update(GSF_stats,
804                            gettext_noop
805                              ("# artificial delays introduced (ms)"),
806                            ret.rel_value_us / 1000LL, GNUNET_NO);
807 #endif
808   return ret;
809 }
810
811
812 /**
813  * Handle a reply to a pending request.  Also called if a request
814  * expires (then with data == NULL).  The handler may be called
815  * many times (depending on the request type), but will not be
816  * called during or after a call to GSF_pending_request_cancel
817  * and will also not be called anymore after a call signalling
818  * expiration.
819  *
820  * @param cls `struct PeerRequest` this is an answer for
821  * @param eval evaluation of the result
822  * @param pr handle to the original pending request
823  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
824  * @param expiration when does @a data expire?
825  * @param last_transmission when did we last transmit a request for this block
826  * @param type type of the block
827  * @param data response data, NULL on request expiration
828  * @param data_len number of bytes in @a data
829  */
830 static void
831 handle_p2p_reply(void *cls,
832                  enum GNUNET_BLOCK_EvaluationResult eval,
833                  struct GSF_PendingRequest *pr,
834                  uint32_t reply_anonymity_level,
835                  struct GNUNET_TIME_Absolute expiration,
836                  struct GNUNET_TIME_Absolute last_transmission,
837                  enum GNUNET_BLOCK_Type type,
838                  const void *data,
839                  size_t data_len)
840 {
841   struct PeerRequest *peerreq = cls;
842   struct GSF_ConnectedPeer *cp = peerreq->cp;
843   struct GSF_PendingRequestData *prd;
844   struct GNUNET_MQ_Envelope *env;
845   struct PutMessage *pm;
846   size_t msize;
847
848   GNUNET_assert(data_len + sizeof(struct PutMessage) <
849                 GNUNET_MAX_MESSAGE_SIZE);
850   GNUNET_assert(peerreq->pr == pr);
851   prd = GSF_pending_request_get_data_(pr);
852   if (NULL == data)
853     {
854       free_pending_request(peerreq);
855       return;
856     }
857   GNUNET_break(GNUNET_BLOCK_TYPE_ANY != type);
858   if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
859     {
860       GNUNET_STATISTICS_update(GSF_stats,
861                                gettext_noop
862                                  ("# replies dropped due to type mismatch"),
863                                1, GNUNET_NO);
864       return;
865     }
866   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
867              "Transmitting result for query `%s' to peer\n",
868              GNUNET_h2s(&prd->query));
869   GNUNET_STATISTICS_update(GSF_stats,
870                            gettext_noop("# replies received for other peers"),
871                            1, GNUNET_NO);
872   msize = sizeof(struct PutMessage) + data_len;
873   if (msize >= GNUNET_MAX_MESSAGE_SIZE)
874     {
875       GNUNET_break(0);
876       return;
877     }
878   if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
879     {
880       if (reply_anonymity_level - 1 > GSF_cover_content_count)
881         {
882           GNUNET_STATISTICS_update(GSF_stats,
883                                    gettext_noop
884                                      ("# replies dropped due to insufficient cover traffic"),
885                                    1, GNUNET_NO);
886           return;
887         }
888       GSF_cover_content_count -= (reply_anonymity_level - 1);
889     }
890
891   env = GNUNET_MQ_msg_extra(pm,
892                             data_len,
893                             GNUNET_MESSAGE_TYPE_FS_PUT);
894   pm->type = htonl(type);
895   pm->expiration = GNUNET_TIME_absolute_hton(expiration);
896   GNUNET_memcpy(&pm[1],
897                 data,
898                 data_len);
899   if ((UINT32_MAX != reply_anonymity_level) &&
900       (0 != reply_anonymity_level) &&
901       (GNUNET_YES == GSF_enable_randomized_delays))
902     {
903       struct GSF_DelayedHandle *dh;
904
905       dh = GNUNET_new(struct GSF_DelayedHandle);
906       dh->cp = cp;
907       dh->env = env;
908       dh->msize = msize;
909       GNUNET_CONTAINER_DLL_insert(cp->delayed_head,
910                                   cp->delayed_tail,
911                                   dh);
912       cp->delay_queue_size++;
913       dh->delay_task =
914         GNUNET_SCHEDULER_add_delayed(get_randomized_delay(),
915                                      &transmit_delayed_now,
916                                      dh);
917     }
918   else
919     {
920       GSF_peer_transmit_(cp,
921                          GNUNET_NO,
922                          UINT32_MAX,
923                          env);
924     }
925   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
926     return;
927   if (NULL == peerreq->kill_task)
928     {
929       GNUNET_STATISTICS_update(GSF_stats,
930                                gettext_noop
931                                  ("# P2P searches destroyed due to ultimate reply"),
932                                1,
933                                GNUNET_NO);
934       peerreq->kill_task =
935         GNUNET_SCHEDULER_add_now(&peer_request_destroy,
936                                  peerreq);
937     }
938 }
939
940
941 /**
942  * Increase the peer's respect by a value.
943  *
944  * @param cp which peer to change the respect value on
945  * @param value is the int value by which the
946  *  peer's credit is to be increased or decreased
947  * @returns the actual change in respect (positive or negative)
948  */
949 static int
950 change_peer_respect(struct GSF_ConnectedPeer *cp, int value)
951 {
952   if (0 == value)
953     return 0;
954   GNUNET_assert(NULL != cp);
955   if (value > 0)
956     {
957       if (cp->ppd.respect + value < cp->ppd.respect)
958         {
959           value = UINT32_MAX - cp->ppd.respect;
960           cp->ppd.respect = UINT32_MAX;
961         }
962       else
963         cp->ppd.respect += value;
964     }
965   else
966     {
967       if (cp->ppd.respect < -value)
968         {
969           value = -cp->ppd.respect;
970           cp->ppd.respect = 0;
971         }
972       else
973         cp->ppd.respect += value;
974     }
975   return value;
976 }
977
978
979 /**
980  * We've received a request with the specified priority.  Bound it
981  * according to how much we respect the given peer.
982  *
983  * @param prio_in requested priority
984  * @param cp the peer making the request
985  * @return effective priority
986  */
987 static int32_t
988 bound_priority(uint32_t prio_in,
989                struct GSF_ConnectedPeer *cp)
990 {
991 #define N ((double)128.0)
992   uint32_t ret;
993   double rret;
994   int ld;
995
996   ld = GSF_test_get_load_too_high_(0);
997   if (GNUNET_SYSERR == ld)
998     {
999 #if INSANE_STATISTICS
1000       GNUNET_STATISTICS_update(GSF_stats,
1001                                gettext_noop
1002                                  ("# requests done for free (low load)"), 1,
1003                                GNUNET_NO);
1004 #endif
1005       return 0;                 /* excess resources */
1006     }
1007   if (prio_in > INT32_MAX)
1008     prio_in = INT32_MAX;
1009   ret = -change_peer_respect(cp, -(int)prio_in);
1010   if (ret > 0)
1011     {
1012       if (ret > GSF_current_priorities + N)
1013         rret = GSF_current_priorities + N;
1014       else
1015         rret = ret;
1016       GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1017     }
1018   if ((GNUNET_YES == ld) && (ret > 0))
1019     {
1020       /* try with charging */
1021       ld = GSF_test_get_load_too_high_(ret);
1022     }
1023   if (GNUNET_YES == ld)
1024     {
1025       GNUNET_STATISTICS_update(GSF_stats,
1026                                gettext_noop
1027                                  ("# request dropped, priority insufficient"), 1,
1028                                GNUNET_NO);
1029       /* undo charge */
1030       change_peer_respect(cp, (int)ret);
1031       return -1;                /* not enough resources */
1032     }
1033   else
1034     {
1035       GNUNET_STATISTICS_update(GSF_stats,
1036                                gettext_noop
1037                                  ("# requests done for a price (normal load)"), 1,
1038                                GNUNET_NO);
1039     }
1040 #undef N
1041   return ret;
1042 }
1043
1044
1045 /**
1046  * The priority level imposes a bound on the maximum
1047  * value for the ttl that can be requested.
1048  *
1049  * @param ttl_in requested ttl
1050  * @param prio given priority
1051  * @return @a ttl_in if @a ttl_in is below the limit,
1052  *         otherwise the ttl-limit for the given @a prio
1053  */
1054 static int32_t
1055 bound_ttl(int32_t ttl_in,
1056           uint32_t prio)
1057 {
1058   unsigned long long allowed;
1059
1060   if (ttl_in <= 0)
1061     return ttl_in;
1062   allowed = ((unsigned long long)prio) * TTL_DECREMENT / 1000;
1063   if (ttl_in > allowed)
1064     {
1065       if (allowed >= (1 << 30))
1066         return 1 << 30;
1067       return allowed;
1068     }
1069   return ttl_in;
1070 }
1071
1072
1073 /**
1074  * Closure for #test_exist_cb().
1075  */
1076 struct TestExistClosure {
1077   /**
1078    * Priority of the incoming request.
1079    */
1080   int32_t priority;
1081
1082   /**
1083    * Relative TTL of the incoming request.
1084    */
1085   int32_t ttl;
1086
1087   /**
1088    * Type of the incoming request.
1089    */
1090   enum GNUNET_BLOCK_Type type;
1091
1092   /**
1093    * Set to #GNUNET_YES if we are done handling the query.
1094    */
1095   int finished;
1096 };
1097
1098
1099 /**
1100  * Test if the query already exists.  If so, merge it, otherwise
1101  * keep `finished` at #GNUNET_NO.
1102  *
1103  * @param cls our `struct TestExistClosure`
1104  * @param hc the key of the query
1105  * @param value the existing `struct PeerRequest`.
1106  * @return #GNUNET_YES to continue to iterate,
1107  *         #GNUNET_NO if we successfully merged
1108  */
1109 static int
1110 test_exist_cb(void *cls,
1111               const struct GNUNET_HashCode *hc,
1112               void *value)
1113 {
1114   struct TestExistClosure *tec = cls;
1115   struct PeerRequest *peerreq = value;
1116   struct GSF_PendingRequest *pr;
1117   struct GSF_PendingRequestData *prd;
1118
1119   pr = peerreq->pr;
1120   prd = GSF_pending_request_get_data_(pr);
1121   if (prd->type != tec->type)
1122     return GNUNET_YES;
1123   if (prd->ttl.abs_value_us >=
1124       GNUNET_TIME_absolute_get().abs_value_us + tec->ttl * 1000LL)
1125     {
1126       /* existing request has higher TTL, drop new one! */
1127       prd->priority += tec->priority;
1128       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1129                  "Have existing request with higher TTL, dropping new request.\n");
1130       GNUNET_STATISTICS_update(GSF_stats,
1131                                gettext_noop
1132                                  ("# requests dropped due to higher-TTL request"),
1133                                1, GNUNET_NO);
1134       tec->finished = GNUNET_YES;
1135       return GNUNET_NO;
1136     }
1137   /* existing request has lower TTL, drop old one! */
1138   tec->priority += prd->priority;
1139   free_pending_request(peerreq);
1140   GSF_pending_request_cancel_(pr,
1141                               GNUNET_YES);
1142   return GNUNET_NO;
1143 }
1144
1145
1146 /**
1147  * Handle P2P "QUERY" message.  Creates the pending request entry
1148  * and sets up all of the data structures to that we will
1149  * process replies properly.  Does not initiate forwarding or
1150  * local database lookups.
1151  *
1152  * @param cls the other peer involved (sender of the message)
1153  * @param gm the GET message
1154  */
1155 void
1156 handle_p2p_get(void *cls,
1157                const struct GetMessage *gm)
1158 {
1159   struct GSF_ConnectedPeer *cps = cls;
1160   struct PeerRequest *peerreq;
1161   struct GSF_PendingRequest *pr;
1162   struct GSF_ConnectedPeer *cp;
1163   const struct GNUNET_PeerIdentity *target;
1164   enum GSF_PendingRequestOptions options;
1165   uint16_t msize;
1166   unsigned int bits;
1167   const struct GNUNET_PeerIdentity *opt;
1168   uint32_t bm;
1169   size_t bfsize;
1170   uint32_t ttl_decrement;
1171   struct TestExistClosure tec;
1172   GNUNET_PEER_Id spid;
1173   const struct GSF_PendingRequestData *prd;
1174
1175   msize = ntohs(gm->header.size);
1176   tec.type = ntohl(gm->type);
1177   bm = ntohl(gm->hash_bitmap);
1178   bits = 0;
1179   while (bm > 0)
1180     {
1181       if (1 == (bm & 1))
1182         bits++;
1183       bm >>= 1;
1184     }
1185   opt = (const struct GNUNET_PeerIdentity *)&gm[1];
1186   bfsize = msize - sizeof(struct GetMessage) - bits * sizeof(struct GNUNET_PeerIdentity);
1187   GNUNET_STATISTICS_update(GSF_stats,
1188                            gettext_noop
1189                              ("# GET requests received (from other peers)"),
1190                            1,
1191                            GNUNET_NO);
1192   GSF_cover_query_count++;
1193   bm = ntohl(gm->hash_bitmap);
1194   bits = 0;
1195   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1196     cp = GSF_peer_get_(&opt[bits++]);
1197   else
1198     cp = cps;
1199   if (NULL == cp)
1200     {
1201       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1202         GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1203                    "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1204                    GNUNET_i2s(&opt[bits - 1]));
1205
1206       else
1207         GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1208                    "Failed to find peer `%s' in connection set. Dropping query.\n",
1209                    GNUNET_i2s(cps->ppd.peer));
1210       GNUNET_STATISTICS_update(GSF_stats,
1211                                gettext_noop
1212                                  ("# requests dropped due to missing reverse route"),
1213                                1,
1214                                GNUNET_NO);
1215       return;
1216     }
1217   unsigned int queue_size = GNUNET_MQ_get_length(cp->mq);
1218   queue_size += cp->ppd.pending_replies + cp->delay_queue_size;
1219   if (queue_size > MAX_QUEUE_PER_PEER)
1220     {
1221       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1222                  "Peer `%s' has too many replies queued already. Dropping query.\n",
1223                  GNUNET_i2s(cps->ppd.peer));
1224       GNUNET_STATISTICS_update(GSF_stats,
1225                                gettext_noop("# requests dropped due to full reply queue"),
1226                                1,
1227                                GNUNET_NO);
1228       return;
1229     }
1230   /* note that we can really only check load here since otherwise
1231    * peers could find out that we are overloaded by not being
1232    * disconnected after sending us a malformed query... */
1233   tec.priority = bound_priority(ntohl(gm->priority),
1234                                 cps);
1235   if (tec.priority < 0)
1236     {
1237       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1238                  "Dropping query from `%s', this peer is too busy.\n",
1239                  GNUNET_i2s(cps->ppd.peer));
1240       return;
1241     }
1242   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1243              "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1244              GNUNET_h2s(&gm->query),
1245              (unsigned int)tec.type,
1246              GNUNET_i2s(cps->ppd.peer),
1247              (unsigned int)bm);
1248   target =
1249     (0 !=
1250      (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1251   options = GSF_PRO_DEFAULTS;
1252   spid = 0;
1253   if ((GNUNET_LOAD_get_load(cp->ppd.transmission_delay) > 3 * (1 + tec.priority))
1254       || (GNUNET_LOAD_get_average(cp->ppd.transmission_delay) >
1255           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
1256           GNUNET_LOAD_get_average(GSF_rt_entry_lifetime)))
1257     {
1258       /* don't have BW to send to peer, or would likely take longer than we have for it,
1259        * so at best indirect the query */
1260       tec.priority = 0;
1261       options |= GSF_PRO_FORWARD_ONLY;
1262       spid = GNUNET_PEER_intern(cps->ppd.peer);
1263       GNUNET_assert(0 != spid);
1264     }
1265   tec.ttl = bound_ttl(ntohl(gm->ttl),
1266                       tec.priority);
1267   /* decrement ttl (always) */
1268   ttl_decrement =
1269     2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
1270                                                  TTL_DECREMENT);
1271   if ((tec.ttl < 0) &&
1272       (((int32_t)(tec.ttl - ttl_decrement)) > 0))
1273     {
1274       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1275                  "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1276                  GNUNET_i2s(cps->ppd.peer),
1277                  tec.ttl,
1278                  ttl_decrement);
1279       GNUNET_STATISTICS_update(GSF_stats,
1280                                gettext_noop
1281                                  ("# requests dropped due TTL underflow"), 1,
1282                                GNUNET_NO);
1283       /* integer underflow => drop (should be very rare)! */
1284       return;
1285     }
1286   tec.ttl -= ttl_decrement;
1287
1288   /* test if the request already exists */
1289   tec.finished = GNUNET_NO;
1290   GNUNET_CONTAINER_multihashmap_get_multiple(cp->request_map,
1291                                              &gm->query,
1292                                              &test_exist_cb,
1293                                              &tec);
1294   if (GNUNET_YES == tec.finished)
1295     return; /* merged into existing request, we're done */
1296
1297   peerreq = GNUNET_new(struct PeerRequest);
1298   peerreq->cp = cp;
1299   pr = GSF_pending_request_create_(options,
1300                                    tec.type,
1301                                    &gm->query,
1302                                    target,
1303                                    (bfsize > 0)
1304                                    ? (const char *)&opt[bits]
1305                                    : NULL,
1306                                    bfsize,
1307                                    ntohl(gm->filter_mutator),
1308                                    1 /* anonymity */,
1309                                    (uint32_t)tec.priority,
1310                                    tec.ttl,
1311                                    spid,
1312                                    GNUNET_PEER_intern(cps->ppd.peer),
1313                                    NULL, 0,         /* replies_seen */
1314                                    &handle_p2p_reply,
1315                                    peerreq);
1316   GNUNET_assert(NULL != pr);
1317   prd = GSF_pending_request_get_data_(pr);
1318   peerreq->pr = pr;
1319   GNUNET_break(GNUNET_OK ==
1320                GNUNET_CONTAINER_multihashmap_put(cp->request_map,
1321                                                  &prd->query,
1322                                                  peerreq,
1323                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1324   GNUNET_STATISTICS_update(GSF_stats,
1325                            gettext_noop("# P2P query messages received and processed"),
1326                            1,
1327                            GNUNET_NO);
1328   GNUNET_STATISTICS_update(GSF_stats,
1329                            gettext_noop("# P2P searches active"),
1330                            1,
1331                            GNUNET_NO);
1332   GSF_pending_request_get_data_(pr)->has_started = GNUNET_YES;
1333   GSF_local_lookup_(pr,
1334                     &GSF_consider_forwarding,
1335                     NULL);
1336 }
1337
1338
1339 /**
1340  * Transmit a message to the given peer as soon as possible.
1341  * If the peer disconnects before the transmission can happen,
1342  * the callback is invoked with a `NULL` @a buffer.
1343  *
1344  * @param cp target peer
1345  * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1346  * @param priority how important is this request?
1347  * @param timeout when does this request timeout
1348  * @param size number of bytes we would like to send to the peer
1349  * @param env message to send
1350  */
1351 void
1352 GSF_peer_transmit_(struct GSF_ConnectedPeer *cp,
1353                    int is_query,
1354                    uint32_t priority,
1355                    struct GNUNET_MQ_Envelope *env)
1356 {
1357   struct GSF_PeerTransmitHandle *pth;
1358   struct GSF_PeerTransmitHandle *pos;
1359   struct GSF_PeerTransmitHandle *prev;
1360
1361   pth = GNUNET_new(struct GSF_PeerTransmitHandle);
1362   pth->transmission_request_start_time = GNUNET_TIME_absolute_get();
1363   pth->env = env;
1364   pth->is_query = is_query;
1365   pth->priority = priority;
1366   pth->cp = cp;
1367   /* insertion sort (by priority, descending) */
1368   prev = NULL;
1369   pos = cp->pth_head;
1370   while ((NULL != pos) && (pos->priority > priority))
1371     {
1372       prev = pos;
1373       pos = pos->next;
1374     }
1375   GNUNET_CONTAINER_DLL_insert_after(cp->pth_head,
1376                                     cp->pth_tail,
1377                                     prev,
1378                                     pth);
1379   if (GNUNET_YES == is_query)
1380     cp->ppd.pending_queries++;
1381   else if (GNUNET_NO == is_query)
1382     cp->ppd.pending_replies++;
1383   schedule_transmission(pth);
1384 }
1385
1386
1387 /**
1388  * Report on receiving a reply; update the performance record of the given peer.
1389  *
1390  * @param cp responding peer (will be updated)
1391  * @param request_time time at which the original query was transmitted
1392  * @param request_priority priority of the original request
1393  */
1394 void
1395 GSF_peer_update_performance_(struct GSF_ConnectedPeer *cp,
1396                              struct GNUNET_TIME_Absolute request_time,
1397                              uint32_t request_priority)
1398 {
1399   struct GNUNET_TIME_Relative delay;
1400
1401   delay = GNUNET_TIME_absolute_get_duration(request_time);
1402   cp->ppd.avg_reply_delay.rel_value_us =
1403     (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
1404      delay.rel_value_us) / RUNAVG_DELAY_N;
1405   cp->ppd.avg_priority =
1406     (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1407      request_priority) / RUNAVG_DELAY_N;
1408 }
1409
1410
1411 /**
1412  * Report on receiving a reply in response to an initiating client.
1413  * Remember that this peer is good for this client.
1414  *
1415  * @param cp responding peer (will be updated)
1416  * @param initiator_client local client on responsible for query
1417  */
1418 void
1419 GSF_peer_update_responder_client_(struct GSF_ConnectedPeer *cp,
1420                                   struct GSF_LocalClient *initiator_client)
1421 {
1422   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1423                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1424 }
1425
1426
1427 /**
1428  * Report on receiving a reply in response to an initiating peer.
1429  * Remember that this peer is good for this initiating peer.
1430  *
1431  * @param cp responding peer (will be updated)
1432  * @param initiator_peer other peer responsible for query
1433  */
1434 void
1435 GSF_peer_update_responder_peer_(struct GSF_ConnectedPeer *cp,
1436                                 const struct GSF_ConnectedPeer *initiator_peer)
1437 {
1438   unsigned int woff;
1439
1440   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1441   GNUNET_PEER_change_rc(cp->ppd.last_p2p_replies[woff], -1);
1442   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1443   GNUNET_PEER_change_rc(initiator_peer->ppd.pid, 1);
1444   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1445 }
1446
1447
1448 /**
1449  * Write peer-respect information to a file - flush the buffer entry!
1450  *
1451  * @param cls unused
1452  * @param key peer identity
1453  * @param value the `struct GSF_ConnectedPeer` to flush
1454  * @return #GNUNET_OK to continue iteration
1455  */
1456 static int
1457 flush_respect(void *cls,
1458               const struct GNUNET_PeerIdentity *key,
1459               void *value)
1460 {
1461   struct GSF_ConnectedPeer *cp = value;
1462   struct GNUNET_PeerIdentity pid;
1463
1464   if (cp->ppd.respect == cp->disk_respect)
1465     return GNUNET_OK;           /* unchanged */
1466   GNUNET_assert(0 != cp->ppd.pid);
1467   GNUNET_PEER_resolve(cp->ppd.pid, &pid);
1468   GNUNET_PEERSTORE_store(peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1469                          sizeof(cp->ppd.respect),
1470                          GNUNET_TIME_UNIT_FOREVER_ABS,
1471                          GNUNET_PEERSTORE_STOREOPTION_REPLACE,
1472                          NULL,
1473                          NULL);
1474   return GNUNET_OK;
1475 }
1476
1477
1478 /**
1479  * A peer disconnected from us.  Tear down the connected peer
1480  * record.
1481  *
1482  * @param cls unused
1483  * @param peer identity of peer that disconnected
1484  * @param internal_cls the corresponding `struct GSF_ConnectedPeer`
1485  */
1486 void
1487 GSF_peer_disconnect_handler(void *cls,
1488                             const struct GNUNET_PeerIdentity *peer,
1489                             void *internal_cls)
1490 {
1491   struct GSF_ConnectedPeer *cp = internal_cls;
1492   struct GSF_PeerTransmitHandle *pth;
1493   struct GSF_DelayedHandle *dh;
1494
1495   if (NULL == cp)
1496     return;  /* must have been disconnect from core with
1497               * 'peer' == my_id, ignore */
1498   flush_respect(NULL,
1499                 peer,
1500                 cp);
1501   GNUNET_assert(GNUNET_YES ==
1502                 GNUNET_CONTAINER_multipeermap_remove(cp_map,
1503                                                      peer,
1504                                                      cp));
1505   GNUNET_STATISTICS_set(GSF_stats,
1506                         gettext_noop("# peers connected"),
1507                         GNUNET_CONTAINER_multipeermap_size(cp_map),
1508                         GNUNET_NO);
1509   if (NULL != cp->respect_iterate_req)
1510     {
1511       GNUNET_PEERSTORE_iterate_cancel(cp->respect_iterate_req);
1512       cp->respect_iterate_req = NULL;
1513     }
1514   if (NULL != cp->rc)
1515     {
1516       GNUNET_ATS_reserve_bandwidth_cancel(cp->rc);
1517       cp->rc = NULL;
1518     }
1519   if (NULL != cp->rc_delay_task)
1520     {
1521       GNUNET_SCHEDULER_cancel(cp->rc_delay_task);
1522       cp->rc_delay_task = NULL;
1523     }
1524   GNUNET_CONTAINER_multihashmap_iterate(cp->request_map,
1525                                         &cancel_pending_request,
1526                                         cp);
1527   GNUNET_CONTAINER_multihashmap_destroy(cp->request_map);
1528   cp->request_map = NULL;
1529   GSF_plan_notify_peer_disconnect_(cp);
1530   GNUNET_LOAD_value_free(cp->ppd.transmission_delay);
1531   GNUNET_PEER_decrement_rcs(cp->ppd.last_p2p_replies,
1532                             P2P_SUCCESS_LIST_SIZE);
1533   memset(cp->ppd.last_p2p_replies,
1534          0,
1535          sizeof(cp->ppd.last_p2p_replies));
1536   GSF_push_stop_(cp);
1537   while (NULL != (pth = cp->pth_head))
1538     {
1539       GNUNET_CONTAINER_DLL_remove(cp->pth_head,
1540                                   cp->pth_tail,
1541                                   pth);
1542       if (GNUNET_YES == pth->is_query)
1543         GNUNET_assert(0 < cp->ppd.pending_queries--);
1544       else if (GNUNET_NO == pth->is_query)
1545         GNUNET_assert(0 < cp->ppd.pending_replies--);
1546       GNUNET_free(pth);
1547     }
1548   while (NULL != (dh = cp->delayed_head))
1549     {
1550       GNUNET_CONTAINER_DLL_remove(cp->delayed_head,
1551                                   cp->delayed_tail,
1552                                   dh);
1553       GNUNET_MQ_discard(dh->env);
1554       cp->delay_queue_size--;
1555       GNUNET_SCHEDULER_cancel(dh->delay_task);
1556       GNUNET_free(dh);
1557     }
1558   GNUNET_PEER_change_rc(cp->ppd.pid, -1);
1559   if (NULL != cp->mig_revive_task)
1560     {
1561       GNUNET_SCHEDULER_cancel(cp->mig_revive_task);
1562       cp->mig_revive_task = NULL;
1563     }
1564   GNUNET_break(0 == cp->ppd.pending_queries);
1565   GNUNET_break(0 == cp->ppd.pending_replies);
1566   GNUNET_free(cp);
1567 }
1568
1569
1570 /**
1571  * Closure for #call_iterator().
1572  */
1573 struct IterationContext {
1574   /**
1575    * Function to call on each entry.
1576    */
1577   GSF_ConnectedPeerIterator it;
1578
1579   /**
1580    * Closure for @e it.
1581    */
1582   void *it_cls;
1583 };
1584
1585
1586 /**
1587  * Function that calls the callback for each peer.
1588  *
1589  * @param cls the `struct IterationContext *`
1590  * @param key identity of the peer
1591  * @param value the `struct GSF_ConnectedPeer *`
1592  * @return #GNUNET_YES to continue iteration
1593  */
1594 static int
1595 call_iterator(void *cls,
1596               const struct GNUNET_PeerIdentity *key,
1597               void *value)
1598 {
1599   struct IterationContext *ic = cls;
1600   struct GSF_ConnectedPeer *cp = value;
1601
1602   ic->it(ic->it_cls,
1603          key, cp,
1604          &cp->ppd);
1605   return GNUNET_YES;
1606 }
1607
1608
1609 /**
1610  * Iterate over all connected peers.
1611  *
1612  * @param it function to call for each peer
1613  * @param it_cls closure for @a it
1614  */
1615 void
1616 GSF_iterate_connected_peers_(GSF_ConnectedPeerIterator it,
1617                              void *it_cls)
1618 {
1619   struct IterationContext ic;
1620
1621   ic.it = it;
1622   ic.it_cls = it_cls;
1623   GNUNET_CONTAINER_multipeermap_iterate(cp_map,
1624                                         &call_iterator,
1625                                         &ic);
1626 }
1627
1628
1629 /**
1630  * Obtain the identity of a connected peer.
1631  *
1632  * @param cp peer to get identity of
1633  * @param id identity to set (written to)
1634  */
1635 void
1636 GSF_connected_peer_get_identity_(const struct GSF_ConnectedPeer *cp,
1637                                  struct GNUNET_PeerIdentity *id)
1638 {
1639   GNUNET_assert(0 != cp->ppd.pid);
1640   GNUNET_PEER_resolve(cp->ppd.pid, id);
1641 }
1642
1643
1644 /**
1645  * Obtain the identity of a connected peer.
1646  *
1647  * @param cp peer to get identity of
1648  * @return reference to peer identity, valid until peer disconnects (!)
1649  */
1650 const struct GNUNET_PeerIdentity *
1651 GSF_connected_peer_get_identity2_(const struct GSF_ConnectedPeer *cp)
1652 {
1653   GNUNET_assert(0 != cp->ppd.pid);
1654   return GNUNET_PEER_resolve2(cp->ppd.pid);
1655 }
1656
1657
1658 /**
1659  * Ask a peer to stop migrating data to us until the given point
1660  * in time.
1661  *
1662  * @param cp peer to ask
1663  * @param block_time until when to block
1664  */
1665 void
1666 GSF_block_peer_migration_(struct GSF_ConnectedPeer *cp,
1667                           struct GNUNET_TIME_Absolute block_time)
1668 {
1669   struct GNUNET_MQ_Envelope *env;
1670   struct MigrationStopMessage *msm;
1671
1672   if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1673     {
1674       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1675                  "Migration already blocked for another %s\n",
1676                  GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining
1677                                                           (cp->last_migration_block), GNUNET_YES));
1678       return;                   /* already blocked */
1679     }
1680   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1681              GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining(block_time),
1682                                                     GNUNET_YES));
1683   cp->last_migration_block = block_time;
1684   env = GNUNET_MQ_msg(msm,
1685                       GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1686   msm->reserved = htonl(0);
1687   msm->duration
1688     = GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining
1689                                   (cp->last_migration_block));
1690   GNUNET_STATISTICS_update(GSF_stats,
1691                            gettext_noop("# migration stop messages sent"),
1692                            1,
1693                            GNUNET_NO);
1694   GSF_peer_transmit_(cp,
1695                      GNUNET_SYSERR,
1696                      UINT32_MAX,
1697                      env);
1698 }
1699
1700
1701 /**
1702  * Notify core about a preference we have for the given peer
1703  * (to allocate more resources towards it).  The change will
1704  * be communicated the next time we reserve bandwidth with
1705  * core (not instantly).
1706  *
1707  * @param cp peer to reserve bandwidth from
1708  * @param pref preference change
1709  */
1710 void
1711 GSF_connected_peer_change_preference_(struct GSF_ConnectedPeer *cp,
1712                                       uint64_t pref)
1713 {
1714   cp->inc_preference += pref;
1715 }
1716
1717
1718 /**
1719  * Call this method periodically to flush respect information to disk.
1720  *
1721  * @param cls closure, not used
1722  */
1723 static void
1724 cron_flush_respect(void *cls)
1725 {
1726   fr_task = NULL;
1727   GNUNET_CONTAINER_multipeermap_iterate(cp_map,
1728                                         &flush_respect,
1729                                         NULL);
1730   fr_task = GNUNET_SCHEDULER_add_delayed_with_priority(RESPECT_FLUSH_FREQ,
1731                                                        GNUNET_SCHEDULER_PRIORITY_HIGH,
1732                                                        &cron_flush_respect, NULL);
1733 }
1734
1735
1736 /**
1737  * Initialize peer management subsystem.
1738  */
1739 void
1740 GSF_connected_peer_init_()
1741 {
1742   cp_map = GNUNET_CONTAINER_multipeermap_create(128, GNUNET_YES);
1743   peerstore = GNUNET_PEERSTORE_connect(GSF_cfg);
1744   fr_task = GNUNET_SCHEDULER_add_with_priority(GNUNET_SCHEDULER_PRIORITY_HIGH,
1745                                                &cron_flush_respect, NULL);
1746 }
1747
1748
1749 /**
1750  * Shutdown peer management subsystem.
1751  */
1752 void
1753 GSF_connected_peer_done_()
1754 {
1755   GNUNET_CONTAINER_multipeermap_iterate(cp_map,
1756                                         &flush_respect,
1757                                         NULL);
1758   GNUNET_SCHEDULER_cancel(fr_task);
1759   fr_task = NULL;
1760   GNUNET_CONTAINER_multipeermap_destroy(cp_map);
1761   cp_map = NULL;
1762   GNUNET_PEERSTORE_disconnect(peerstore,
1763                               GNUNET_YES);
1764 }
1765
1766
1767 /**
1768  * Iterator to remove references to LC entry.
1769  *
1770  * @param cls the `struct GSF_LocalClient *` to look for
1771  * @param key current key code
1772  * @param value value in the hash map (peer entry)
1773  * @return #GNUNET_YES (we should continue to iterate)
1774  */
1775 static int
1776 clean_local_client(void *cls,
1777                    const struct GNUNET_PeerIdentity *key,
1778                    void *value)
1779 {
1780   const struct GSF_LocalClient *lc = cls;
1781   struct GSF_ConnectedPeer *cp = value;
1782   unsigned int i;
1783
1784   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1785     if (cp->ppd.last_client_replies[i] == lc)
1786       cp->ppd.last_client_replies[i] = NULL;
1787   return GNUNET_YES;
1788 }
1789
1790
1791 /**
1792  * Notification that a local client disconnected.  Clean up all of our
1793  * references to the given handle.
1794  *
1795  * @param lc handle to the local client (henceforth invalid)
1796  */
1797 void
1798 GSF_handle_local_client_disconnect_(const struct GSF_LocalClient *lc)
1799 {
1800   if (NULL == cp_map)
1801     return;                     /* already cleaned up */
1802   GNUNET_CONTAINER_multipeermap_iterate(cp_map,
1803                                         &clean_local_client,
1804                                         (void *)lc);
1805 }
1806
1807
1808 /* end of gnunet-service-fs_cp.c */