-bringing copyright tags up to FSF standard
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_load_lib.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
31 #include "gnunet-service-fs_pe.h"
32 #include "gnunet-service-fs_pr.h"
33 #include "gnunet-service-fs_push.h"
34 #include "gnunet_peerstore_service.h"
35
36
37 /**
38  * Ratio for moving average delay calculation.  The previous
39  * average goes in with a factor of (n-1) into the calculation.
40  * Must be > 0.
41  */
42 #define RUNAVG_DELAY_N 16
43
44 /**
45  * How often do we flush respect values to disk?
46  */
47 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
48
49 /**
50  * After how long do we discard a reply?
51  */
52 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
53
54 /**
55  * Collect an instane number of statistics?  May cause excessive IPC.
56  */
57 #define INSANE_STATISTICS GNUNET_NO
58
59
60 /**
61  * Handle to cancel a transmission request.
62  */
63 struct GSF_PeerTransmitHandle
64 {
65
66   /**
67    * Kept in a doubly-linked list.
68    */
69   struct GSF_PeerTransmitHandle *next;
70
71   /**
72    * Kept in a doubly-linked list.
73    */
74   struct GSF_PeerTransmitHandle *prev;
75
76   /**
77    * Time when this transmission request was issued.
78    */
79   struct GNUNET_TIME_Absolute transmission_request_start_time;
80
81   /**
82    * Timeout for this request.
83    */
84   struct GNUNET_TIME_Absolute timeout;
85
86   /**
87    * Task called on timeout, or 0 for none.
88    */
89   struct GNUNET_SCHEDULER_Task * timeout_task;
90
91   /**
92    * Function to call to get the actual message.
93    */
94   GSF_GetMessageCallback gmc;
95
96   /**
97    * Peer this request targets.
98    */
99   struct GSF_ConnectedPeer *cp;
100
101   /**
102    * Closure for @e gmc.
103    */
104   void *gmc_cls;
105
106   /**
107    * Size of the message to be transmitted.
108    */
109   size_t size;
110
111   /**
112    * #GNUNET_YES if this is a query, #GNUNET_NO for content.
113    */
114   int is_query;
115
116   /**
117    * Did we get a reservation already?
118    */
119   int was_reserved;
120
121   /**
122    * Priority of this request.
123    */
124   uint32_t priority;
125
126 };
127
128
129 /**
130  * Handle for an entry in our delay list.
131  */
132 struct GSF_DelayedHandle
133 {
134
135   /**
136    * Kept in a doubly-linked list.
137    */
138   struct GSF_DelayedHandle *next;
139
140   /**
141    * Kept in a doubly-linked list.
142    */
143   struct GSF_DelayedHandle *prev;
144
145   /**
146    * Peer this transmission belongs to.
147    */
148   struct GSF_ConnectedPeer *cp;
149
150   /**
151    * The PUT that was delayed.
152    */
153   struct PutMessage *pm;
154
155   /**
156    * Task for the delay.
157    */
158   struct GNUNET_SCHEDULER_Task * delay_task;
159
160   /**
161    * Size of the message.
162    */
163   size_t msize;
164
165 };
166
167
168 /**
169  * Information per peer and request.
170  */
171 struct PeerRequest
172 {
173
174   /**
175    * Handle to generic request.
176    */
177   struct GSF_PendingRequest *pr;
178
179   /**
180    * Handle to specific peer.
181    */
182   struct GSF_ConnectedPeer *cp;
183
184   /**
185    * Task for asynchronous stopping of this request.
186    */
187   struct GNUNET_SCHEDULER_Task * kill_task;
188
189 };
190
191
192 /**
193  * A connected peer.
194  */
195 struct GSF_ConnectedPeer
196 {
197
198   /**
199    * Performance data for this peer.
200    */
201   struct GSF_PeerPerformanceData ppd;
202
203   /**
204    * Time until when we blocked this peer from migrating
205    * data to us.
206    */
207   struct GNUNET_TIME_Absolute last_migration_block;
208
209   /**
210    * Task scheduled to revive migration to this peer.
211    */
212   struct GNUNET_SCHEDULER_Task * mig_revive_task;
213
214   /**
215    * Messages (replies, queries, content migration) we would like to
216    * send to this peer in the near future.  Sorted by priority, head.
217    */
218   struct GSF_PeerTransmitHandle *pth_head;
219
220   /**
221    * Messages (replies, queries, content migration) we would like to
222    * send to this peer in the near future.  Sorted by priority, tail.
223    */
224   struct GSF_PeerTransmitHandle *pth_tail;
225
226   /**
227    * Messages (replies, queries, content migration) we would like to
228    * send to this peer in the near future.  Sorted by priority, head.
229    */
230   struct GSF_DelayedHandle *delayed_head;
231
232   /**
233    * Messages (replies, queries, content migration) we would like to
234    * send to this peer in the near future.  Sorted by priority, tail.
235    */
236   struct GSF_DelayedHandle *delayed_tail;
237
238   /**
239    * Migration stop message in our queue, or NULL if we have none pending.
240    */
241   struct GSF_PeerTransmitHandle *migration_pth;
242
243   /**
244    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
245    */
246   struct GNUNET_ATS_ReservationContext *rc;
247
248   /**
249    * Task scheduled if we need to retry bandwidth reservation later.
250    */
251   struct GNUNET_SCHEDULER_Task * rc_delay_task;
252
253   /**
254    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
255    */
256   struct GNUNET_CONTAINER_MultiHashMap *request_map;
257
258   /**
259    * Handle for an active request for transmission to this
260    * peer, or NULL (if core queue was full).
261    */
262   struct GNUNET_CORE_TransmitHandle *cth;
263
264   /**
265    * Increase in traffic preference still to be submitted
266    * to the core service for this peer.
267    */
268   uint64_t inc_preference;
269
270   /**
271    * Set to 1 if we're currently in the process of calling
272    * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
273    * NULL, we should not call notify_transmit_ready for this
274    * handle right now).
275    */
276   unsigned int cth_in_progress;
277
278   /**
279    * Respect rating for this peer on disk.
280    */
281   uint32_t disk_respect;
282
283   /**
284    * Which offset in "last_p2p_replies" will be updated next?
285    * (we go round-robin).
286    */
287   unsigned int last_p2p_replies_woff;
288
289   /**
290    * Which offset in "last_client_replies" will be updated next?
291    * (we go round-robin).
292    */
293   unsigned int last_client_replies_woff;
294
295   /**
296    * Current offset into 'last_request_times' ring buffer.
297    */
298   unsigned int last_request_times_off;
299
300   /**
301    * GNUNET_YES if we did successfully reserve 32k bandwidth,
302    * GNUNET_NO if not.
303    */
304   int did_reserve;
305
306   /**
307    * Function called when the creation of this record is complete.
308    */
309   GSF_ConnectedPeerCreationCallback creation_cb;
310
311   /**
312    * Closure for @e creation_cb
313    */
314   void *creation_cb_cls;
315
316   /**
317    * Handle to the PEERSTORE iterate request for peer respect value
318    */
319   struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
320
321 };
322
323
324 /**
325  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
326  */
327 static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
328
329 /**
330  * Handle to peerstore service.
331  */
332 static struct GNUNET_PEERSTORE_Handle *peerstore;
333
334
335 /**
336  * Update the latency information kept for the given peer.
337  *
338  * @param id peer record to update
339  * @param latency current latency value
340  */
341 void
342 GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
343                           struct GNUNET_TIME_Relative latency)
344 {
345   struct GSF_ConnectedPeer *cp;
346
347   cp = GSF_peer_get_ (id);
348   if (NULL == cp)
349     return; /* we're not yet connected at the core level, ignore */
350   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay, latency);
351   /* LATER: merge atsi into cp's performance data (if we ever care...) */
352 }
353
354
355 /**
356  * Return the performance data record for the given peer
357  *
358  * @param cp peer to query
359  * @return performance data record for the peer
360  */
361 struct GSF_PeerPerformanceData *
362 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
363 {
364   return &cp->ppd;
365 }
366
367
368 /**
369  * Core is ready to transmit to a peer, get the message.
370  *
371  * @param cls the `struct GSF_PeerTransmitHandle` of the message
372  * @param size number of bytes core is willing to take
373  * @param buf where to copy the message
374  * @return number of bytes copied to @a buf
375  */
376 static size_t
377 peer_transmit_ready_cb (void *cls,
378                         size_t size,
379                         void *buf);
380
381
382 /**
383  * Function called by core upon success or failure of our bandwidth reservation request.
384  *
385  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
386  * @param peer identifies the peer
387  * @param amount set to the amount that was actually reserved or unreserved;
388  *               either the full requested amount or zero (no partial reservations)
389  * @param res_delay if the reservation could not be satisfied (amount was 0), how
390  *        long should the client wait until re-trying?
391  */
392 static void
393 ats_reserve_callback (void *cls,
394                       const struct GNUNET_PeerIdentity *peer,
395                       int32_t amount,
396                       struct GNUNET_TIME_Relative res_delay);
397
398
399 /**
400  * If ready (bandwidth reserved), try to schedule transmission via
401  * core for the given handle.
402  *
403  * @param pth transmission handle to schedule
404  */
405 static void
406 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
407 {
408   struct GSF_ConnectedPeer *cp;
409   struct GNUNET_PeerIdentity target;
410
411   cp = pth->cp;
412   if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
413     return;                     /* already done */
414   GNUNET_assert (0 != cp->ppd.pid);
415   GNUNET_PEER_resolve (cp->ppd.pid, &target);
416
417   if (0 != cp->inc_preference)
418   {
419     GNUNET_ATS_performance_change_preference (GSF_ats, &target, GNUNET_ATS_PREFERENCE_BANDWIDTH,
420                                   (double) cp->inc_preference,
421                                   GNUNET_ATS_PREFERENCE_END);
422     cp->inc_preference = 0;
423   }
424
425   if ((GNUNET_YES == pth->is_query) && (GNUNET_YES != pth->was_reserved))
426   {
427     /* query, need reservation */
428     if (GNUNET_YES != cp->did_reserve)
429       return;                   /* not ready */
430     cp->did_reserve = GNUNET_NO;
431     /* reservation already done! */
432     pth->was_reserved = GNUNET_YES;
433     cp->rc =
434         GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
435                                       &ats_reserve_callback, cp);
436     return;
437   }
438   GNUNET_assert (NULL == cp->cth);
439   cp->cth_in_progress++;
440   cp->cth =
441     GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
442                                        GNUNET_CORE_PRIO_BACKGROUND,
443                                        GNUNET_TIME_absolute_get_remaining
444                                        (pth->timeout), &target, pth->size,
445                                        &peer_transmit_ready_cb, cp);
446   GNUNET_assert (NULL != cp->cth);
447   GNUNET_assert (0 < cp->cth_in_progress--);
448 }
449
450
451 /**
452  * Core is ready to transmit to a peer, get the message.
453  *
454  * @param cls the `struct GSF_PeerTransmitHandle` of the message
455  * @param size number of bytes core is willing to take
456  * @param buf where to copy the message
457  * @return number of bytes copied to @a buf
458  */
459 static size_t
460 peer_transmit_ready_cb (void *cls, size_t size, void *buf)
461 {
462   struct GSF_ConnectedPeer *cp = cls;
463   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
464   struct GSF_PeerTransmitHandle *pos;
465   size_t ret;
466
467   cp->cth = NULL;
468   if (NULL == pth)
469     return 0;
470   if (pth->size > size)
471   {
472     schedule_transmission (pth);
473     return 0;
474   }
475   if (NULL != pth->timeout_task)
476   {
477     GNUNET_SCHEDULER_cancel (pth->timeout_task);
478     pth->timeout_task = NULL;
479   }
480   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
481   if (GNUNET_YES == pth->is_query)
482   {
483     cp->ppd.last_request_times[(cp->last_request_times_off++) %
484                                MAX_QUEUE_PER_PEER] =
485         GNUNET_TIME_absolute_get ();
486     GNUNET_assert (0 < cp->ppd.pending_queries--);
487   }
488   else if (GNUNET_NO == pth->is_query)
489   {
490     GNUNET_assert (0 < cp->ppd.pending_replies--);
491   }
492   GNUNET_LOAD_update (cp->ppd.transmission_delay,
493                       GNUNET_TIME_absolute_get_duration
494                       (pth->transmission_request_start_time).rel_value_us);
495   ret = pth->gmc (pth->gmc_cls, size, buf);
496   if (NULL != (pos = cp->pth_head))
497   {
498     GNUNET_assert (pos != pth);
499     schedule_transmission (pos);
500   }
501   GNUNET_free (pth);
502   return ret;
503 }
504
505
506 /**
507  * (re)try to reserve bandwidth from the given peer.
508  *
509  * @param cls the `struct GSF_ConnectedPeer` to reserve from
510  * @param tc scheduler context
511  */
512 static void
513 retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
514 {
515   struct GSF_ConnectedPeer *cp = cls;
516   struct GNUNET_PeerIdentity target;
517
518   GNUNET_PEER_resolve (cp->ppd.pid, &target);
519   cp->rc_delay_task = NULL;
520   cp->rc =
521     GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
522                                   &ats_reserve_callback, cp);
523 }
524
525
526 /**
527  * Function called by core upon success or failure of our bandwidth reservation request.
528  *
529  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
530  * @param peer identifies the peer
531  * @param amount set to the amount that was actually reserved or unreserved;
532  *               either the full requested amount or zero (no partial reservations)
533  * @param res_delay if the reservation could not be satisfied (amount was 0), how
534  *        long should the client wait until re-trying?
535  */
536 static void
537 ats_reserve_callback (void *cls,
538                       const struct GNUNET_PeerIdentity *peer,
539                       int32_t amount,
540                       struct GNUNET_TIME_Relative res_delay)
541 {
542   struct GSF_ConnectedPeer *cp = cls;
543   struct GSF_PeerTransmitHandle *pth;
544
545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
546               "Reserved %d bytes / need to wait %s for reservation\n",
547               (int) amount,
548               GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
549   cp->rc = NULL;
550   if (0 == amount)
551   {
552     cp->rc_delay_task =
553         GNUNET_SCHEDULER_add_delayed (res_delay, &retry_reservation, cp);
554     return;
555   }
556   cp->did_reserve = GNUNET_YES;
557   pth = cp->pth_head;
558   if ((NULL != pth) && (NULL == cp->cth) && (0 == cp->cth_in_progress))
559   {
560     /* reservation success, try transmission now! */
561     cp->cth_in_progress++;
562     cp->cth =
563         GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
564                                            GNUNET_CORE_PRIO_BACKGROUND,
565                                            GNUNET_TIME_absolute_get_remaining
566                                            (pth->timeout), peer, pth->size,
567                                            &peer_transmit_ready_cb, cp);
568     GNUNET_assert (NULL != cp->cth);
569     GNUNET_assert (0 < cp->cth_in_progress--);
570   }
571 }
572
573
574 /**
575  * Function called by PEERSTORE with peer respect record
576  *
577  * @param cls handle to connected peer entry
578  * @param record peerstore record information
579  * @param emsg error message, or NULL if no errors
580  * @return #GNUNET_NO to stop iterating since we only expect 0 or 1 records
581  */
582 static int
583 peer_respect_cb (void *cls,
584                  const struct GNUNET_PEERSTORE_Record *record,
585                  const char *emsg)
586 {
587   struct GSF_ConnectedPeer *cp = cls;
588
589   cp->respect_iterate_req = NULL;
590   if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size))
591     cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value);
592   GSF_push_start_ (cp);
593   if (NULL != cp->creation_cb)
594     cp->creation_cb (cp->creation_cb_cls, cp);
595   return GNUNET_NO;
596 }
597
598
599 /**
600  * A peer connected to us.  Setup the connected peer
601  * records.
602  *
603  * @param peer identity of peer that connected
604  * @param creation_cb callback function when the record is created.
605  * @param creation_cb_cls closure for @creation_cb
606  */
607 void
608 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
609                            GSF_ConnectedPeerCreationCallback creation_cb,
610                            void *creation_cb_cls)
611 {
612   struct GSF_ConnectedPeer *cp;
613
614   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
615               "Connected to peer %s\n",
616               GNUNET_i2s (peer));
617   cp = GNUNET_new (struct GSF_ConnectedPeer);
618   cp->ppd.pid = GNUNET_PEER_intern (peer);
619   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
620   cp->rc =
621       GNUNET_ATS_reserve_bandwidth (GSF_ats, peer, DBLOCK_SIZE,
622                                     &ats_reserve_callback, cp);
623   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
624   GNUNET_break (GNUNET_OK ==
625                 GNUNET_CONTAINER_multipeermap_put (cp_map,
626                GSF_connected_peer_get_identity2_ (cp),
627                                                    cp,
628                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
629   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
630                          GNUNET_CONTAINER_multipeermap_size (cp_map),
631                          GNUNET_NO);
632   cp->creation_cb = creation_cb;
633   cp->creation_cb_cls = creation_cb_cls;
634   cp->respect_iterate_req =
635       GNUNET_PEERSTORE_iterate (peerstore, "fs", peer, "respect",
636                                 GNUNET_TIME_UNIT_FOREVER_REL, &peer_respect_cb,
637                                 cp);
638 }
639
640
641 /**
642  * It may be time to re-start migrating content to this
643  * peer.  Check, and if so, restart migration.
644  *
645  * @param cls the `struct GSF_ConnectedPeer`
646  * @param tc scheduler context
647  */
648 static void
649 revive_migration (void *cls,
650                   const struct GNUNET_SCHEDULER_TaskContext *tc)
651 {
652   struct GSF_ConnectedPeer *cp = cls;
653   struct GNUNET_TIME_Relative bt;
654
655   cp->mig_revive_task = NULL;
656   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
657   if (0 != bt.rel_value_us)
658   {
659     /* still time left... */
660     cp->mig_revive_task =
661         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
662     return;
663   }
664   GSF_push_start_ (cp);
665 }
666
667
668 /**
669  * Get a handle for a connected peer.
670  *
671  * @param peer peer's identity
672  * @return NULL if the peer is not currently connected
673  */
674 struct GSF_ConnectedPeer *
675 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
676 {
677   if (NULL == cp_map)
678     return NULL;
679   return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
680 }
681
682
683 /**
684  * Handle P2P "MIGRATION_STOP" message.
685  *
686  * @param cls closure, always NULL
687  * @param other the other peer involved (sender or receiver, NULL
688  *        for loopback messages where we are both sender and receiver)
689  * @param message the actual message
690  * @return #GNUNET_OK to keep the connection open,
691  *         #GNUNET_SYSERR to close it (signal serious error)
692  */
693 int
694 GSF_handle_p2p_migration_stop_ (void *cls,
695                                 const struct GNUNET_PeerIdentity *other,
696                                 const struct GNUNET_MessageHeader *message)
697 {
698   struct GSF_ConnectedPeer *cp;
699   const struct MigrationStopMessage *msm;
700   struct GNUNET_TIME_Relative bt;
701
702   msm = (const struct MigrationStopMessage *) message;
703   cp = GSF_peer_get_ (other);
704   if (NULL == cp)
705   {
706     GNUNET_break (0);
707     return GNUNET_OK;
708   }
709   GNUNET_STATISTICS_update (GSF_stats,
710                             gettext_noop ("# migration stop messages received"),
711                             1, GNUNET_NO);
712   bt = GNUNET_TIME_relative_ntoh (msm->duration);
713   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
714               _("Migration of content to peer `%s' blocked for %s\n"),
715               GNUNET_i2s (other),
716               GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
717   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
718   if (NULL == cp->mig_revive_task)
719   {
720     GSF_push_stop_ (cp);
721     cp->mig_revive_task =
722         GNUNET_SCHEDULER_add_delayed (bt,
723                                       &revive_migration, cp);
724   }
725   return GNUNET_OK;
726 }
727
728
729 /**
730  * Copy reply and free put message.
731  *
732  * @param cls the `struct PutMessage`
733  * @param buf_size number of bytes available in @a buf
734  * @param buf where to copy the message, NULL on error (peer disconnect)
735  * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
736  */
737 static size_t
738 copy_reply (void *cls, size_t buf_size, void *buf)
739 {
740   struct PutMessage *pm = cls;
741   size_t size;
742
743   if (NULL != buf)
744   {
745     GNUNET_assert (buf_size >= ntohs (pm->header.size));
746     size = ntohs (pm->header.size);
747     memcpy (buf, pm, size);
748     GNUNET_STATISTICS_update (GSF_stats,
749                               gettext_noop
750                               ("# replies transmitted to other peers"), 1,
751                               GNUNET_NO);
752   }
753   else
754   {
755     size = 0;
756     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies dropped"), 1,
757                               GNUNET_NO);
758   }
759   GNUNET_free (pm);
760   return size;
761 }
762
763
764 /**
765  * Free resources associated with the given peer request.
766  *
767  * @param peerreq request to free
768  * @param query associated key for the request
769  */
770 static void
771 free_pending_request (struct PeerRequest *peerreq,
772                       const struct GNUNET_HashCode *query)
773 {
774   struct GSF_ConnectedPeer *cp = peerreq->cp;
775
776   if (NULL != peerreq->kill_task)
777   {
778     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
779     peerreq->kill_task = NULL;
780   }
781   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
782                             -1, GNUNET_NO);
783   GNUNET_break (GNUNET_YES ==
784                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
785                                                       query, peerreq));
786   GNUNET_free (peerreq);
787 }
788
789
790 /**
791  * Cancel all requests associated with the peer.
792  *
793  * @param cls unused
794  * @param query hash code of the request
795  * @param value the `struct GSF_PendingRequest`
796  * @return #GNUNET_YES (continue to iterate)
797  */
798 static int
799 cancel_pending_request (void *cls,
800                         const struct GNUNET_HashCode *query,
801                         void *value)
802 {
803   struct PeerRequest *peerreq = value;
804   struct GSF_PendingRequest *pr = peerreq->pr;
805   struct GSF_PendingRequestData *prd;
806
807   prd = GSF_pending_request_get_data_ (pr);
808   GSF_pending_request_cancel_ (pr, GNUNET_NO);
809   free_pending_request (peerreq, &prd->query);
810   return GNUNET_OK;
811 }
812
813
814 /**
815  * Free the given request.
816  *
817  * @param cls the request to free
818  * @param tc task context
819  */
820 static void
821 peer_request_destroy (void *cls,
822                       const struct GNUNET_SCHEDULER_TaskContext *tc)
823 {
824   struct PeerRequest *peerreq = cls;
825   struct GSF_PendingRequest *pr = peerreq->pr;
826   struct GSF_PendingRequestData *prd;
827
828   peerreq->kill_task = NULL;
829   prd = GSF_pending_request_get_data_ (pr);
830   cancel_pending_request (NULL, &prd->query, peerreq);
831 }
832
833
834 /**
835  * The artificial delay is over, transmit the message now.
836  *
837  * @param cls the `struct GSF_DelayedHandle` with the message
838  * @param tc scheduler context
839  */
840 static void
841 transmit_delayed_now (void *cls,
842                       const struct GNUNET_SCHEDULER_TaskContext *tc)
843 {
844   struct GSF_DelayedHandle *dh = cls;
845   struct GSF_ConnectedPeer *cp = dh->cp;
846
847   GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
848   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
849   {
850     GNUNET_free (dh->pm);
851     GNUNET_free (dh);
852     return;
853   }
854   (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
855                              dh->msize, &copy_reply, dh->pm);
856   GNUNET_free (dh);
857 }
858
859
860 /**
861  * Get the randomized delay a response should be subjected to.
862  *
863  * @return desired delay
864  */
865 static struct GNUNET_TIME_Relative
866 get_randomized_delay ()
867 {
868   struct GNUNET_TIME_Relative ret;
869
870   ret =
871       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
872                                      GNUNET_CRYPTO_random_u32
873                                      (GNUNET_CRYPTO_QUALITY_WEAK,
874                                       2 * GSF_avg_latency.rel_value_us + 1));
875 #if INSANE_STATISTICS
876   GNUNET_STATISTICS_update (GSF_stats,
877                             gettext_noop
878                             ("# artificial delays introduced (ms)"),
879                             ret.rel_value_us / 1000LL, GNUNET_NO);
880 #endif
881   return ret;
882 }
883
884
885 /**
886  * Handle a reply to a pending request.  Also called if a request
887  * expires (then with data == NULL).  The handler may be called
888  * many times (depending on the request type), but will not be
889  * called during or after a call to GSF_pending_request_cancel
890  * and will also not be called anymore after a call signalling
891  * expiration.
892  *
893  * @param cls `struct PeerRequest` this is an answer for
894  * @param eval evaluation of the result
895  * @param pr handle to the original pending request
896  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
897  * @param expiration when does @a data expire?
898  * @param last_transmission when did we last transmit a request for this block
899  * @param type type of the block
900  * @param data response data, NULL on request expiration
901  * @param data_len number of bytes in @a data
902  */
903 static void
904 handle_p2p_reply (void *cls,
905                   enum GNUNET_BLOCK_EvaluationResult eval,
906                   struct GSF_PendingRequest *pr,
907                   uint32_t reply_anonymity_level,
908                   struct GNUNET_TIME_Absolute expiration,
909                   struct GNUNET_TIME_Absolute last_transmission,
910                   enum GNUNET_BLOCK_Type type,
911                   const void *data,
912                   size_t data_len)
913 {
914   struct PeerRequest *peerreq = cls;
915   struct GSF_ConnectedPeer *cp = peerreq->cp;
916   struct GSF_PendingRequestData *prd;
917   struct PutMessage *pm;
918   size_t msize;
919
920   GNUNET_assert (data_len + sizeof (struct PutMessage) <
921                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
922   GNUNET_assert (peerreq->pr == pr);
923   prd = GSF_pending_request_get_data_ (pr);
924   if (NULL == data)
925   {
926     free_pending_request (peerreq, &prd->query);
927     return;
928   }
929   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
930   if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
931   {
932     GNUNET_STATISTICS_update (GSF_stats,
933                               gettext_noop
934                               ("# replies dropped due to type mismatch"),
935                                 1, GNUNET_NO);
936     return;
937   }
938   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
939               "Transmitting result for query `%s' to peer\n",
940               GNUNET_h2s (&prd->query));
941   GNUNET_STATISTICS_update (GSF_stats,
942                             gettext_noop ("# replies received for other peers"),
943                             1, GNUNET_NO);
944   msize = sizeof (struct PutMessage) + data_len;
945   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
946   {
947     GNUNET_break (0);
948     return;
949   }
950   if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
951   {
952     if (reply_anonymity_level - 1 > GSF_cover_content_count)
953     {
954       GNUNET_STATISTICS_update (GSF_stats,
955                                 gettext_noop
956                                 ("# replies dropped due to insufficient cover traffic"),
957                                 1, GNUNET_NO);
958       return;
959     }
960     GSF_cover_content_count -= (reply_anonymity_level - 1);
961   }
962
963   pm = GNUNET_malloc (msize);
964   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
965   pm->header.size = htons (msize);
966   pm->type = htonl (type);
967   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
968   memcpy (&pm[1], data, data_len);
969   if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) &&
970       (GNUNET_YES == GSF_enable_randomized_delays))
971   {
972     struct GSF_DelayedHandle *dh;
973
974     dh = GNUNET_new (struct GSF_DelayedHandle);
975     dh->cp = cp;
976     dh->pm = pm;
977     dh->msize = msize;
978     GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
979     dh->delay_task =
980         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
981                                       &transmit_delayed_now, dh);
982   }
983   else
984   {
985     (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
986                                &copy_reply, pm);
987   }
988   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
989     return;
990   if (NULL == peerreq->kill_task)
991   {
992     GNUNET_STATISTICS_update (GSF_stats,
993                               gettext_noop
994                               ("# P2P searches destroyed due to ultimate reply"),
995                               1, GNUNET_NO);
996     peerreq->kill_task =
997         GNUNET_SCHEDULER_add_now (&peer_request_destroy, peerreq);
998   }
999 }
1000
1001
1002 /**
1003  * Increase the peer's respect by a value.
1004  *
1005  * @param cp which peer to change the respect value on
1006  * @param value is the int value by which the
1007  *  peer's credit is to be increased or decreased
1008  * @returns the actual change in respect (positive or negative)
1009  */
1010 static int
1011 change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
1012 {
1013   if (0 == value)
1014     return 0;
1015   GNUNET_assert (NULL != cp);
1016   if (value > 0)
1017   {
1018     if (cp->ppd.respect + value < cp->ppd.respect)
1019     {
1020       value = UINT32_MAX - cp->ppd.respect;
1021       cp->ppd.respect = UINT32_MAX;
1022     }
1023     else
1024       cp->ppd.respect += value;
1025   }
1026   else
1027   {
1028     if (cp->ppd.respect < -value)
1029     {
1030       value = -cp->ppd.respect;
1031       cp->ppd.respect = 0;
1032     }
1033     else
1034       cp->ppd.respect += value;
1035   }
1036   return value;
1037 }
1038
1039
1040 /**
1041  * We've received a request with the specified priority.  Bound it
1042  * according to how much we respect the given peer.
1043  *
1044  * @param prio_in requested priority
1045  * @param cp the peer making the request
1046  * @return effective priority
1047  */
1048 static int32_t
1049 bound_priority (uint32_t prio_in, struct GSF_ConnectedPeer *cp)
1050 {
1051 #define N ((double)128.0)
1052   uint32_t ret;
1053   double rret;
1054   int ld;
1055
1056   ld = GSF_test_get_load_too_high_ (0);
1057   if (GNUNET_SYSERR == ld)
1058   {
1059 #if INSANE_STATISTICS
1060     GNUNET_STATISTICS_update (GSF_stats,
1061                               gettext_noop
1062                               ("# requests done for free (low load)"), 1,
1063                               GNUNET_NO);
1064 #endif
1065     return 0;                   /* excess resources */
1066   }
1067   if (prio_in > INT32_MAX)
1068     prio_in = INT32_MAX;
1069   ret = -change_peer_respect (cp, -(int) prio_in);
1070   if (ret > 0)
1071   {
1072     if (ret > GSF_current_priorities + N)
1073       rret = GSF_current_priorities + N;
1074     else
1075       rret = ret;
1076     GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1077   }
1078   if ((GNUNET_YES == ld) && (ret > 0))
1079   {
1080     /* try with charging */
1081     ld = GSF_test_get_load_too_high_ (ret);
1082   }
1083   if (GNUNET_YES == ld)
1084   {
1085     GNUNET_STATISTICS_update (GSF_stats,
1086                               gettext_noop
1087                               ("# request dropped, priority insufficient"), 1,
1088                               GNUNET_NO);
1089     /* undo charge */
1090     change_peer_respect (cp, (int) ret);
1091     return -1;                  /* not enough resources */
1092   }
1093   else
1094   {
1095     GNUNET_STATISTICS_update (GSF_stats,
1096                               gettext_noop
1097                               ("# requests done for a price (normal load)"), 1,
1098                               GNUNET_NO);
1099   }
1100 #undef N
1101   return ret;
1102 }
1103
1104
1105 /**
1106  * The priority level imposes a bound on the maximum
1107  * value for the ttl that can be requested.
1108  *
1109  * @param ttl_in requested ttl
1110  * @param prio given priority
1111  * @return ttl_in if ttl_in is below the limit,
1112  *         otherwise the ttl-limit for the given priority
1113  */
1114 static int32_t
1115 bound_ttl (int32_t ttl_in, uint32_t prio)
1116 {
1117   unsigned long long allowed;
1118
1119   if (ttl_in <= 0)
1120     return ttl_in;
1121   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1122   if (ttl_in > allowed)
1123   {
1124     if (allowed >= (1 << 30))
1125       return 1 << 30;
1126     return allowed;
1127   }
1128   return ttl_in;
1129 }
1130
1131
1132 /**
1133  * Handle P2P "QUERY" message.  Creates the pending request entry
1134  * and sets up all of the data structures to that we will
1135  * process replies properly.  Does not initiate forwarding or
1136  * local database lookups.
1137  *
1138  * @param other the other peer involved (sender or receiver, NULL
1139  *        for loopback messages where we are both sender and receiver)
1140  * @param message the actual message
1141  * @return pending request handle, NULL on error
1142  */
1143 struct GSF_PendingRequest *
1144 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1145                        const struct GNUNET_MessageHeader *message)
1146 {
1147   struct PeerRequest *peerreq;
1148   struct GSF_PendingRequest *pr;
1149   struct GSF_PendingRequestData *prd;
1150   struct GSF_ConnectedPeer *cp;
1151   struct GSF_ConnectedPeer *cps;
1152   const struct GNUNET_PeerIdentity *target;
1153   enum GSF_PendingRequestOptions options;
1154   uint16_t msize;
1155   const struct GetMessage *gm;
1156   unsigned int bits;
1157   const struct GNUNET_PeerIdentity *opt;
1158   uint32_t bm;
1159   size_t bfsize;
1160   uint32_t ttl_decrement;
1161   int32_t priority;
1162   int32_t ttl;
1163   enum GNUNET_BLOCK_Type type;
1164   GNUNET_PEER_Id spid;
1165
1166   GNUNET_assert (other != NULL);
1167   msize = ntohs (message->size);
1168   if (msize < sizeof (struct GetMessage))
1169   {
1170     GNUNET_break_op (0);
1171     return NULL;
1172   }
1173   GNUNET_STATISTICS_update (GSF_stats,
1174                             gettext_noop
1175                             ("# GET requests received (from other peers)"), 1,
1176                             GNUNET_NO);
1177   gm = (const struct GetMessage *) message;
1178   type = ntohl (gm->type);
1179   bm = ntohl (gm->hash_bitmap);
1180   bits = 0;
1181   while (bm > 0)
1182   {
1183     if (1 == (bm & 1))
1184       bits++;
1185     bm >>= 1;
1186   }
1187   if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
1188   {
1189     GNUNET_break_op (0);
1190     return NULL;
1191   }
1192   opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1193   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
1194   /* bfsize must be power of 2, check! */
1195   if (0 != ((bfsize - 1) & bfsize))
1196   {
1197     GNUNET_break_op (0);
1198     return NULL;
1199   }
1200   GSF_cover_query_count++;
1201   bm = ntohl (gm->hash_bitmap);
1202   bits = 0;
1203   cps = GSF_peer_get_ (other);
1204   if (NULL == cps)
1205   {
1206     /* peer must have just disconnected */
1207     GNUNET_STATISTICS_update (GSF_stats,
1208                               gettext_noop
1209                               ("# requests dropped due to initiator not being connected"),
1210                               1, GNUNET_NO);
1211     return NULL;
1212   }
1213   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1214     cp = GSF_peer_get_ (&opt[bits++]);
1215   else
1216     cp = cps;
1217   if (NULL == cp)
1218   {
1219     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1220       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221                   "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
1222                   GNUNET_i2s (&opt[bits - 1]));
1223
1224     else
1225       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1226                   "Failed to find peer `%4s' in connection set. Dropping query.\n",
1227                   GNUNET_i2s (other));
1228 #if INSANE_STATISTICS
1229     GNUNET_STATISTICS_update (GSF_stats,
1230                               gettext_noop
1231                               ("# requests dropped due to missing reverse route"),
1232                               1, GNUNET_NO);
1233 #endif
1234     return NULL;
1235   }
1236   /* note that we can really only check load here since otherwise
1237    * peers could find out that we are overloaded by not being
1238    * disconnected after sending us a malformed query... */
1239   priority = bound_priority (ntohl (gm->priority), cps);
1240   if (priority < 0)
1241   {
1242     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1243                 "Dropping query from `%s', this peer is too busy.\n",
1244                 GNUNET_i2s (other));
1245     return NULL;
1246   }
1247   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1248               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
1249               GNUNET_h2s (&gm->query),
1250               (unsigned int) type,
1251               GNUNET_i2s (other),
1252               (unsigned int) bm);
1253   target =
1254       (0 !=
1255        (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1256   options = GSF_PRO_DEFAULTS;
1257   spid = 0;
1258   if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
1259       || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1260           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
1261           GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1262   {
1263     /* don't have BW to send to peer, or would likely take longer than we have for it,
1264      * so at best indirect the query */
1265     priority = 0;
1266     options |= GSF_PRO_FORWARD_ONLY;
1267     spid = GNUNET_PEER_intern (other);
1268     GNUNET_assert (0 != spid);
1269   }
1270   ttl = bound_ttl (ntohl (gm->ttl), priority);
1271   /* decrement ttl (always) */
1272   ttl_decrement =
1273       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1274                                                     TTL_DECREMENT);
1275   if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
1276   {
1277     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1278                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1279                 GNUNET_i2s (other), ttl, ttl_decrement);
1280     GNUNET_STATISTICS_update (GSF_stats,
1281                               gettext_noop
1282                               ("# requests dropped due TTL underflow"), 1,
1283                               GNUNET_NO);
1284     /* integer underflow => drop (should be very rare)! */
1285     return NULL;
1286   }
1287   ttl -= ttl_decrement;
1288
1289   /* test if the request already exists */
1290   peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map, &gm->query);
1291   if (peerreq != NULL)
1292   {
1293     pr = peerreq->pr;
1294     prd = GSF_pending_request_get_data_ (pr);
1295     if (prd->type == type)
1296     {
1297       if (prd->ttl.abs_value_us >= GNUNET_TIME_absolute_get ().abs_value_us + ttl * 1000LL)
1298       {
1299         /* existing request has higher TTL, drop new one! */
1300         prd->priority += priority;
1301         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1302                     "Have existing request with higher TTL, dropping new request.\n",
1303                     GNUNET_i2s (other));
1304         GNUNET_STATISTICS_update (GSF_stats,
1305                                   gettext_noop
1306                                   ("# requests dropped due to higher-TTL request"),
1307                                   1, GNUNET_NO);
1308         return NULL;
1309       }
1310       /* existing request has lower TTL, drop old one! */
1311       priority += prd->priority;
1312       GSF_pending_request_cancel_ (pr, GNUNET_YES);
1313       free_pending_request (peerreq, &gm->query);
1314     }
1315   }
1316
1317   peerreq = GNUNET_new (struct PeerRequest);
1318   peerreq->cp = cp;
1319   pr = GSF_pending_request_create_ (options, type, &gm->query,
1320                                     target,
1321                                     (bfsize >
1322                                      0) ? (const char *) &opt[bits] : NULL,
1323                                     bfsize, ntohl (gm->filter_mutator),
1324                                     1 /* anonymity */ ,
1325                                     (uint32_t) priority, ttl, spid, GNUNET_PEER_intern (other), NULL, 0,        /* replies_seen */
1326                                     &handle_p2p_reply, peerreq);
1327   GNUNET_assert (NULL != pr);
1328   peerreq->pr = pr;
1329   GNUNET_break (GNUNET_OK ==
1330                 GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query,
1331                                                    peerreq,
1332                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1333   GNUNET_STATISTICS_update (GSF_stats,
1334                             gettext_noop
1335                             ("# P2P query messages received and processed"), 1,
1336                             GNUNET_NO);
1337   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
1338                             1, GNUNET_NO);
1339   return pr;
1340 }
1341
1342
1343 /**
1344  * Function called if there has been a timeout trying to satisfy
1345  * a transmission request.
1346  *
1347  * @param cls the `struct GSF_PeerTransmitHandle` of the request
1348  * @param tc scheduler context
1349  */
1350 static void
1351 peer_transmit_timeout (void *cls,
1352                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1353 {
1354   struct GSF_PeerTransmitHandle *pth = cls;
1355   struct GSF_ConnectedPeer *cp;
1356
1357   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1358               "Timeout trying to transmit to other peer\n");
1359   pth->timeout_task = NULL;
1360   cp = pth->cp;
1361   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1362   if (GNUNET_YES == pth->is_query)
1363     GNUNET_assert (0 < cp->ppd.pending_queries--);
1364   else if (GNUNET_NO == pth->is_query)
1365     GNUNET_assert (0 < cp->ppd.pending_replies--);
1366   GNUNET_LOAD_update (cp->ppd.transmission_delay, UINT64_MAX);
1367   if (NULL != cp->cth)
1368   {
1369     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1370     cp->cth = NULL;
1371   }
1372   pth->gmc (pth->gmc_cls, 0, NULL);
1373   GNUNET_assert (0 == cp->cth_in_progress);
1374   GNUNET_free (pth);
1375 }
1376
1377
1378 /**
1379  * Transmit a message to the given peer as soon as possible.
1380  * If the peer disconnects before the transmission can happen,
1381  * the callback is invoked with a `NULL` @a buffer.
1382  *
1383  * @param cp target peer
1384  * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1385  * @param priority how important is this request?
1386  * @param timeout when does this request timeout (call gmc with error)
1387  * @param size number of bytes we would like to send to the peer
1388  * @param gmc function to call to get the message
1389  * @param gmc_cls closure for @a gmc
1390  * @return handle to cancel request
1391  */
1392 struct GSF_PeerTransmitHandle *
1393 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1394                     int is_query,
1395                     uint32_t priority,
1396                     struct GNUNET_TIME_Relative timeout,
1397                     size_t size,
1398                     GSF_GetMessageCallback gmc, void *gmc_cls)
1399 {
1400   struct GSF_PeerTransmitHandle *pth;
1401   struct GSF_PeerTransmitHandle *pos;
1402   struct GSF_PeerTransmitHandle *prev;
1403
1404   pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1405   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1406   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1407   pth->gmc = gmc;
1408   pth->gmc_cls = gmc_cls;
1409   pth->size = size;
1410   pth->is_query = is_query;
1411   pth->priority = priority;
1412   pth->cp = cp;
1413   /* insertion sort (by priority, descending) */
1414   prev = NULL;
1415   pos = cp->pth_head;
1416   while ((NULL != pos) && (pos->priority > priority))
1417   {
1418     prev = pos;
1419     pos = pos->next;
1420   }
1421   GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
1422   if (GNUNET_YES == is_query)
1423     cp->ppd.pending_queries++;
1424   else if (GNUNET_NO == is_query)
1425     cp->ppd.pending_replies++;
1426   pth->timeout_task =
1427       GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
1428   schedule_transmission (pth);
1429   return pth;
1430 }
1431
1432
1433 /**
1434  * Cancel an earlier request for transmission.
1435  *
1436  * @param pth request to cancel
1437  */
1438 void
1439 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1440 {
1441   struct GSF_ConnectedPeer *cp;
1442
1443   if (NULL != pth->timeout_task)
1444   {
1445     GNUNET_SCHEDULER_cancel (pth->timeout_task);
1446     pth->timeout_task = NULL;
1447   }
1448   cp = pth->cp;
1449   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1450   if (GNUNET_YES == pth->is_query)
1451     GNUNET_assert (0 < cp->ppd.pending_queries--);
1452   else if (GNUNET_NO == pth->is_query)
1453     GNUNET_assert (0 < cp->ppd.pending_replies--);
1454   GNUNET_free (pth);
1455 }
1456
1457
1458 /**
1459  * Report on receiving a reply; update the performance record of the given peer.
1460  *
1461  * @param cp responding peer (will be updated)
1462  * @param request_time time at which the original query was transmitted
1463  * @param request_priority priority of the original request
1464  */
1465 void
1466 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1467                               struct GNUNET_TIME_Absolute request_time,
1468                               uint32_t request_priority)
1469 {
1470   struct GNUNET_TIME_Relative delay;
1471
1472   delay = GNUNET_TIME_absolute_get_duration (request_time);
1473   cp->ppd.avg_reply_delay.rel_value_us =
1474       (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
1475        delay.rel_value_us) / RUNAVG_DELAY_N;
1476   cp->ppd.avg_priority =
1477       (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1478        request_priority) / RUNAVG_DELAY_N;
1479 }
1480
1481
1482 /**
1483  * Report on receiving a reply in response to an initiating client.
1484  * Remember that this peer is good for this client.
1485  *
1486  * @param cp responding peer (will be updated)
1487  * @param initiator_client local client on responsible for query
1488  */
1489 void
1490 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1491                                    struct GSF_LocalClient *initiator_client)
1492 {
1493   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1494                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1495 }
1496
1497
1498 /**
1499  * Report on receiving a reply in response to an initiating peer.
1500  * Remember that this peer is good for this initiating peer.
1501  *
1502  * @param cp responding peer (will be updated)
1503  * @param initiator_peer other peer responsible for query
1504  */
1505 void
1506 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1507                                  const struct GSF_ConnectedPeer *initiator_peer)
1508 {
1509   unsigned int woff;
1510
1511   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1512   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1513   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1514   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1515   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1516 }
1517
1518
1519 /**
1520  * Write peer-respect information to a file - flush the buffer entry!
1521  *
1522  * @param cls unused
1523  * @param key peer identity
1524  * @param value the `struct GSF_ConnectedPeer` to flush
1525  * @return #GNUNET_OK to continue iteration
1526  */
1527 static int
1528 flush_respect (void *cls,
1529                const struct GNUNET_PeerIdentity *key,
1530                void *value)
1531 {
1532   struct GSF_ConnectedPeer *cp = value;
1533   struct GNUNET_PeerIdentity pid;
1534
1535   if (cp->ppd.respect == cp->disk_respect)
1536     return GNUNET_OK;           /* unchanged */
1537   GNUNET_assert (0 != cp->ppd.pid);
1538   GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1539   GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1540                           sizeof (cp->ppd.respect),
1541                           GNUNET_TIME_UNIT_FOREVER_ABS,
1542                           GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL);
1543   return GNUNET_OK;
1544 }
1545
1546
1547 /**
1548  * A peer disconnected from us.  Tear down the connected peer
1549  * record.
1550  *
1551  * @param cls unused
1552  * @param peer identity of peer that connected
1553  */
1554 void
1555 GSF_peer_disconnect_handler_ (void *cls,
1556                               const struct GNUNET_PeerIdentity *peer)
1557 {
1558   struct GSF_ConnectedPeer *cp;
1559   struct GSF_PeerTransmitHandle *pth;
1560   struct GSF_DelayedHandle *dh;
1561
1562   cp = GSF_peer_get_ (peer);
1563   if (NULL == cp)
1564     return;                     /* must have been disconnect from core with
1565                                  * 'peer' == my_id, ignore */
1566   flush_respect (NULL, peer, cp);
1567   GNUNET_assert (GNUNET_YES ==
1568                  GNUNET_CONTAINER_multipeermap_remove (cp_map,
1569                                                        peer,
1570                                                        cp));
1571   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1572                          GNUNET_CONTAINER_multipeermap_size (cp_map),
1573                          GNUNET_NO);
1574   if (NULL != cp->respect_iterate_req)
1575   {
1576     GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1577     cp->respect_iterate_req = NULL;
1578   }
1579   if (NULL != cp->migration_pth)
1580   {
1581     GSF_peer_transmit_cancel_ (cp->migration_pth);
1582     cp->migration_pth = NULL;
1583   }
1584   if (NULL != cp->rc)
1585   {
1586     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1587     cp->rc = NULL;
1588   }
1589   if (NULL != cp->rc_delay_task)
1590   {
1591     GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1592     cp->rc_delay_task = NULL;
1593   }
1594   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1595                                          &cancel_pending_request, cp);
1596   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1597   cp->request_map = NULL;
1598   GSF_plan_notify_peer_disconnect_ (cp);
1599   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1600   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1601   memset (cp->ppd.last_p2p_replies, 0, sizeof (cp->ppd.last_p2p_replies));
1602   GSF_push_stop_ (cp);
1603   if (NULL != cp->cth)
1604   {
1605     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1606     cp->cth = NULL;
1607   }
1608   GNUNET_assert (0 == cp->cth_in_progress);
1609   while (NULL != (pth = cp->pth_head))
1610   {
1611     if (pth->timeout_task != NULL)
1612     {
1613       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1614       pth->timeout_task = NULL;
1615     }
1616     GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1617     pth->gmc (pth->gmc_cls, 0, NULL);
1618     GNUNET_free (pth);
1619   }
1620   while (NULL != (dh = cp->delayed_head))
1621   {
1622     GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
1623     GNUNET_SCHEDULER_cancel (dh->delay_task);
1624     GNUNET_free (dh->pm);
1625     GNUNET_free (dh);
1626   }
1627   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1628   if (NULL != cp->mig_revive_task)
1629   {
1630     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1631     cp->mig_revive_task = NULL;
1632   }
1633   GNUNET_free (cp);
1634 }
1635
1636
1637 /**
1638  * Closure for #call_iterator().
1639  */
1640 struct IterationContext
1641 {
1642   /**
1643    * Function to call on each entry.
1644    */
1645   GSF_ConnectedPeerIterator it;
1646
1647   /**
1648    * Closure for @e it.
1649    */
1650   void *it_cls;
1651 };
1652
1653
1654 /**
1655  * Function that calls the callback for each peer.
1656  *
1657  * @param cls the `struct IterationContext *`
1658  * @param key identity of the peer
1659  * @param value the `struct GSF_ConnectedPeer *`
1660  * @return #GNUNET_YES to continue iteration
1661  */
1662 static int
1663 call_iterator (void *cls,
1664                const struct GNUNET_PeerIdentity *key,
1665                void *value)
1666 {
1667   struct IterationContext *ic = cls;
1668   struct GSF_ConnectedPeer *cp = value;
1669
1670   ic->it (ic->it_cls,
1671           key, cp,
1672           &cp->ppd);
1673   return GNUNET_YES;
1674 }
1675
1676
1677 /**
1678  * Iterate over all connected peers.
1679  *
1680  * @param it function to call for each peer
1681  * @param it_cls closure for @a it
1682  */
1683 void
1684 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1685                               void *it_cls)
1686 {
1687   struct IterationContext ic;
1688
1689   ic.it = it;
1690   ic.it_cls = it_cls;
1691   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1692                                          &call_iterator,
1693                                          &ic);
1694 }
1695
1696
1697 /**
1698  * Obtain the identity of a connected peer.
1699  *
1700  * @param cp peer to get identity of
1701  * @param id identity to set (written to)
1702  */
1703 void
1704 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1705                                   struct GNUNET_PeerIdentity *id)
1706 {
1707   GNUNET_assert (0 != cp->ppd.pid);
1708   GNUNET_PEER_resolve (cp->ppd.pid, id);
1709 }
1710
1711
1712 /**
1713  * Obtain the identity of a connected peer.
1714  *
1715  * @param cp peer to get identity of
1716  * @return reference to peer identity, valid until peer disconnects (!)
1717  */
1718 const struct GNUNET_PeerIdentity *
1719 GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1720 {
1721   GNUNET_assert (0 != cp->ppd.pid);
1722   return GNUNET_PEER_resolve2 (cp->ppd.pid);
1723 }
1724
1725
1726 /**
1727  * Assemble a migration stop message for transmission.
1728  *
1729  * @param cls the `struct GSF_ConnectedPeer` to use
1730  * @param size number of bytes we're allowed to write to @a buf
1731  * @param buf where to copy the message
1732  * @return number of bytes copied to @a buf
1733  */
1734 static size_t
1735 create_migration_stop_message (void *cls,
1736                                size_t size,
1737                                void *buf)
1738 {
1739   struct GSF_ConnectedPeer *cp = cls;
1740   struct MigrationStopMessage msm;
1741
1742   cp->migration_pth = NULL;
1743   if (NULL == buf)
1744     return 0;
1745   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1746   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1747   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1748   msm.reserved = htonl (0);
1749   msm.duration =
1750       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1751                                  (cp->last_migration_block));
1752   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1753   GNUNET_STATISTICS_update (GSF_stats,
1754                             gettext_noop ("# migration stop messages sent"),
1755                             1, GNUNET_NO);
1756   return sizeof (struct MigrationStopMessage);
1757 }
1758
1759
1760 /**
1761  * Ask a peer to stop migrating data to us until the given point
1762  * in time.
1763  *
1764  * @param cp peer to ask
1765  * @param block_time until when to block
1766  */
1767 void
1768 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1769                            struct GNUNET_TIME_Absolute block_time)
1770 {
1771   if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1772   {
1773     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1774                 "Migration already blocked for another %s\n",
1775                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
1776                                                         (cp->last_migration_block), GNUNET_YES));
1777     return;                     /* already blocked */
1778   }
1779   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1780               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
1781                                                       GNUNET_YES));
1782   cp->last_migration_block = block_time;
1783   if (NULL != cp->migration_pth)
1784     GSF_peer_transmit_cancel_ (cp->migration_pth);
1785   cp->migration_pth =
1786       GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1787                           GNUNET_TIME_UNIT_FOREVER_REL,
1788                           sizeof (struct MigrationStopMessage),
1789                           &create_migration_stop_message, cp);
1790 }
1791
1792
1793 /**
1794  * Notify core about a preference we have for the given peer
1795  * (to allocate more resources towards it).  The change will
1796  * be communicated the next time we reserve bandwidth with
1797  * core (not instantly).
1798  *
1799  * @param cp peer to reserve bandwidth from
1800  * @param pref preference change
1801  */
1802 void
1803 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1804                                        uint64_t pref)
1805 {
1806   cp->inc_preference += pref;
1807 }
1808
1809
1810 /**
1811  * Call this method periodically to flush respect information to disk.
1812  *
1813  * @param cls closure, not used
1814  * @param tc task context, not used
1815  */
1816 static void
1817 cron_flush_respect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1818 {
1819
1820   if (NULL == cp_map)
1821     return;
1822   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &flush_respect, NULL);
1823   if (NULL == tc)
1824     return;
1825   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1826     return;
1827   GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1828                                               GNUNET_SCHEDULER_PRIORITY_HIGH,
1829                                               &cron_flush_respect, NULL);
1830 }
1831
1832
1833 /**
1834  * Initialize peer management subsystem.
1835  */
1836 void
1837 GSF_connected_peer_init_ ()
1838 {
1839   cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1840   peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
1841   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1842                                       &cron_flush_respect, NULL);
1843 }
1844
1845
1846 /**
1847  * Iterator to free peer entries.
1848  *
1849  * @param cls closure, unused
1850  * @param key current key code
1851  * @param value value in the hash map (peer entry)
1852  * @return #GNUNET_YES (we should continue to iterate)
1853  */
1854 static int
1855 clean_peer (void *cls,
1856             const struct GNUNET_PeerIdentity *key,
1857             void *value)
1858 {
1859   GSF_peer_disconnect_handler_ (NULL, key);
1860   return GNUNET_YES;
1861 }
1862
1863
1864 /**
1865  * Shutdown peer management subsystem.
1866  */
1867 void
1868 GSF_connected_peer_done_ ()
1869 {
1870   cron_flush_respect (NULL, NULL);
1871   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_peer, NULL);
1872   GNUNET_CONTAINER_multipeermap_destroy (cp_map);
1873   cp_map = NULL;
1874   GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
1875 }
1876
1877
1878 /**
1879  * Iterator to remove references to LC entry.
1880  *
1881  * @param cls the `struct GSF_LocalClient *` to look for
1882  * @param key current key code
1883  * @param value value in the hash map (peer entry)
1884  * @return #GNUNET_YES (we should continue to iterate)
1885  */
1886 static int
1887 clean_local_client (void *cls,
1888                     const struct GNUNET_PeerIdentity *key,
1889                     void *value)
1890 {
1891   const struct GSF_LocalClient *lc = cls;
1892   struct GSF_ConnectedPeer *cp = value;
1893   unsigned int i;
1894
1895   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1896     if (cp->ppd.last_client_replies[i] == lc)
1897       cp->ppd.last_client_replies[i] = NULL;
1898   return GNUNET_YES;
1899 }
1900
1901
1902 /**
1903  * Notification that a local client disconnected.  Clean up all of our
1904  * references to the given handle.
1905  *
1906  * @param lc handle to the local client (henceforth invalid)
1907  */
1908 void
1909 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1910 {
1911   if (NULL == cp_map)
1912     return;                     /* already cleaned up */
1913   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client,
1914                                          (void *) lc);
1915 }
1916
1917
1918 /* end of gnunet-service-fs_cp.c */