-remove dead api
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_load_lib.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
31 #include "gnunet-service-fs_pe.h"
32 #include "gnunet-service-fs_pr.h"
33 #include "gnunet-service-fs_push.h"
34 #include "gnunet_peerstore_service.h"
35
36
37 /**
38  * Ratio for moving average delay calculation.  The previous
39  * average goes in with a factor of (n-1) into the calculation.
40  * Must be > 0.
41  */
42 #define RUNAVG_DELAY_N 16
43
44 /**
45  * How often do we flush respect values to disk?
46  */
47 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
48
49 /**
50  * After how long do we discard a reply?
51  */
52 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
53
54 /**
55  * Collect an instane number of statistics?  May cause excessive IPC.
56  */
57 #define INSANE_STATISTICS GNUNET_NO
58
59
60 /**
61  * Handle to cancel a transmission request.
62  */
63 struct GSF_PeerTransmitHandle
64 {
65
66   /**
67    * Kept in a doubly-linked list.
68    */
69   struct GSF_PeerTransmitHandle *next;
70
71   /**
72    * Kept in a doubly-linked list.
73    */
74   struct GSF_PeerTransmitHandle *prev;
75
76   /**
77    * Time when this transmission request was issued.
78    */
79   struct GNUNET_TIME_Absolute transmission_request_start_time;
80
81   /**
82    * Timeout for this request.
83    */
84   struct GNUNET_TIME_Absolute timeout;
85
86   /**
87    * Task called on timeout, or 0 for none.
88    */
89   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
90
91   /**
92    * Function to call to get the actual message.
93    */
94   GSF_GetMessageCallback gmc;
95
96   /**
97    * Peer this request targets.
98    */
99   struct GSF_ConnectedPeer *cp;
100
101   /**
102    * Closure for @e gmc.
103    */
104   void *gmc_cls;
105
106   /**
107    * Size of the message to be transmitted.
108    */
109   size_t size;
110
111   /**
112    * #GNUNET_YES if this is a query, #GNUNET_NO for content.
113    */
114   int is_query;
115
116   /**
117    * Did we get a reservation already?
118    */
119   int was_reserved;
120
121   /**
122    * Priority of this request.
123    */
124   uint32_t priority;
125
126 };
127
128
129 /**
130  * Handle for an entry in our delay list.
131  */
132 struct GSF_DelayedHandle
133 {
134
135   /**
136    * Kept in a doubly-linked list.
137    */
138   struct GSF_DelayedHandle *next;
139
140   /**
141    * Kept in a doubly-linked list.
142    */
143   struct GSF_DelayedHandle *prev;
144
145   /**
146    * Peer this transmission belongs to.
147    */
148   struct GSF_ConnectedPeer *cp;
149
150   /**
151    * The PUT that was delayed.
152    */
153   struct PutMessage *pm;
154
155   /**
156    * Task for the delay.
157    */
158   GNUNET_SCHEDULER_TaskIdentifier delay_task;
159
160   /**
161    * Size of the message.
162    */
163   size_t msize;
164
165 };
166
167
168 /**
169  * Information per peer and request.
170  */
171 struct PeerRequest
172 {
173
174   /**
175    * Handle to generic request.
176    */
177   struct GSF_PendingRequest *pr;
178
179   /**
180    * Handle to specific peer.
181    */
182   struct GSF_ConnectedPeer *cp;
183
184   /**
185    * Task for asynchronous stopping of this request.
186    */
187   GNUNET_SCHEDULER_TaskIdentifier kill_task;
188
189 };
190
191
192 /**
193  * A connected peer.
194  */
195 struct GSF_ConnectedPeer
196 {
197
198   /**
199    * Performance data for this peer.
200    */
201   struct GSF_PeerPerformanceData ppd;
202
203   /**
204    * Time until when we blocked this peer from migrating
205    * data to us.
206    */
207   struct GNUNET_TIME_Absolute last_migration_block;
208
209   /**
210    * Task scheduled to revive migration to this peer.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
213
214   /**
215    * Messages (replies, queries, content migration) we would like to
216    * send to this peer in the near future.  Sorted by priority, head.
217    */
218   struct GSF_PeerTransmitHandle *pth_head;
219
220   /**
221    * Messages (replies, queries, content migration) we would like to
222    * send to this peer in the near future.  Sorted by priority, tail.
223    */
224   struct GSF_PeerTransmitHandle *pth_tail;
225
226   /**
227    * Messages (replies, queries, content migration) we would like to
228    * send to this peer in the near future.  Sorted by priority, head.
229    */
230   struct GSF_DelayedHandle *delayed_head;
231
232   /**
233    * Messages (replies, queries, content migration) we would like to
234    * send to this peer in the near future.  Sorted by priority, tail.
235    */
236   struct GSF_DelayedHandle *delayed_tail;
237
238   /**
239    * Migration stop message in our queue, or NULL if we have none pending.
240    */
241   struct GSF_PeerTransmitHandle *migration_pth;
242
243   /**
244    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
245    */
246   struct GNUNET_ATS_ReservationContext *rc;
247
248   /**
249    * Task scheduled if we need to retry bandwidth reservation later.
250    */
251   GNUNET_SCHEDULER_TaskIdentifier rc_delay_task;
252
253   /**
254    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
255    */
256   struct GNUNET_CONTAINER_MultiHashMap *request_map;
257
258   /**
259    * Handle for an active request for transmission to this
260    * peer, or NULL (if core queue was full).
261    */
262   struct GNUNET_CORE_TransmitHandle *cth;
263
264   /**
265    * Increase in traffic preference still to be submitted
266    * to the core service for this peer.
267    */
268   uint64_t inc_preference;
269
270   /**
271    * Set to 1 if we're currently in the process of calling
272    * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
273    * NULL, we should not call notify_transmit_ready for this
274    * handle right now).
275    */
276   unsigned int cth_in_progress;
277
278   /**
279    * Respect rating for this peer on disk.
280    */
281   uint32_t disk_respect;
282
283   /**
284    * Which offset in "last_p2p_replies" will be updated next?
285    * (we go round-robin).
286    */
287   unsigned int last_p2p_replies_woff;
288
289   /**
290    * Which offset in "last_client_replies" will be updated next?
291    * (we go round-robin).
292    */
293   unsigned int last_client_replies_woff;
294
295   /**
296    * Current offset into 'last_request_times' ring buffer.
297    */
298   unsigned int last_request_times_off;
299
300   /**
301    * GNUNET_YES if we did successfully reserve 32k bandwidth,
302    * GNUNET_NO if not.
303    */
304   int did_reserve;
305
306   /**
307    * Function called when the creation of this record is complete.
308    */
309   GSF_ConnectedPeerCreationCallback creation_cb;
310
311   /**
312    * Closure for @e creation_cb
313    */
314   void *creation_cb_cls;
315
316   /**
317    * Handle to the PEERSTORE iterate request for peer respect value
318    */
319   struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
320
321 };
322
323
324 /**
325  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
326  */
327 static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
328
329 /**
330  * Handle to peerstore service.
331  */
332 static struct GNUNET_PEERSTORE_Handle *peerstore;
333
334
335 /**
336  * Update the latency information kept for the given peer.
337  *
338  * @param id peer record to update
339  * @param latency current latency value
340  */
341 void
342 GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
343                           struct GNUNET_TIME_Relative latency)
344 {
345   struct GSF_ConnectedPeer *cp;
346
347   cp = GSF_peer_get_ (id);
348   if (NULL == cp)
349     return; /* we're not yet connected at the core level, ignore */
350   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay, latency);
351   /* LATER: merge atsi into cp's performance data (if we ever care...) */
352 }
353
354
355 /**
356  * Return the performance data record for the given peer
357  *
358  * @param cp peer to query
359  * @return performance data record for the peer
360  */
361 struct GSF_PeerPerformanceData *
362 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
363 {
364   return &cp->ppd;
365 }
366
367
368 /**
369  * Core is ready to transmit to a peer, get the message.
370  *
371  * @param cls the `struct GSF_PeerTransmitHandle` of the message
372  * @param size number of bytes core is willing to take
373  * @param buf where to copy the message
374  * @return number of bytes copied to @a buf
375  */
376 static size_t
377 peer_transmit_ready_cb (void *cls,
378                         size_t size,
379                         void *buf);
380
381
382 /**
383  * Function called by core upon success or failure of our bandwidth reservation request.
384  *
385  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
386  * @param peer identifies the peer
387  * @param amount set to the amount that was actually reserved or unreserved;
388  *               either the full requested amount or zero (no partial reservations)
389  * @param res_delay if the reservation could not be satisfied (amount was 0), how
390  *        long should the client wait until re-trying?
391  */
392 static void
393 ats_reserve_callback (void *cls,
394                       const struct GNUNET_PeerIdentity *peer,
395                       int32_t amount,
396                       struct GNUNET_TIME_Relative res_delay);
397
398
399 /**
400  * If ready (bandwidth reserved), try to schedule transmission via
401  * core for the given handle.
402  *
403  * @param pth transmission handle to schedule
404  */
405 static void
406 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
407 {
408   struct GSF_ConnectedPeer *cp;
409   struct GNUNET_PeerIdentity target;
410
411   cp = pth->cp;
412   if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
413     return;                     /* already done */
414   GNUNET_assert (0 != cp->ppd.pid);
415   GNUNET_PEER_resolve (cp->ppd.pid, &target);
416
417   if (0 != cp->inc_preference)
418   {
419     GNUNET_ATS_performance_change_preference (GSF_ats, &target, GNUNET_ATS_PREFERENCE_BANDWIDTH,
420                                   (double) cp->inc_preference,
421                                   GNUNET_ATS_PREFERENCE_END);
422     cp->inc_preference = 0;
423   }
424
425   if ((GNUNET_YES == pth->is_query) && (GNUNET_YES != pth->was_reserved))
426   {
427     /* query, need reservation */
428     if (GNUNET_YES != cp->did_reserve)
429       return;                   /* not ready */
430     cp->did_reserve = GNUNET_NO;
431     /* reservation already done! */
432     pth->was_reserved = GNUNET_YES;
433     cp->rc =
434         GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
435                                       &ats_reserve_callback, cp);
436     return;
437   }
438   GNUNET_assert (NULL == cp->cth);
439   cp->cth_in_progress++;
440   cp->cth =
441     GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
442                                        GNUNET_CORE_PRIO_BACKGROUND,
443                                        GNUNET_TIME_absolute_get_remaining
444                                        (pth->timeout), &target, pth->size,
445                                        &peer_transmit_ready_cb, cp);
446   GNUNET_assert (NULL != cp->cth);
447   GNUNET_assert (0 < cp->cth_in_progress--);
448 }
449
450
451 /**
452  * Core is ready to transmit to a peer, get the message.
453  *
454  * @param cls the `struct GSF_PeerTransmitHandle` of the message
455  * @param size number of bytes core is willing to take
456  * @param buf where to copy the message
457  * @return number of bytes copied to @a buf
458  */
459 static size_t
460 peer_transmit_ready_cb (void *cls, size_t size, void *buf)
461 {
462   struct GSF_ConnectedPeer *cp = cls;
463   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
464   struct GSF_PeerTransmitHandle *pos;
465   size_t ret;
466
467   cp->cth = NULL;
468   if (NULL == pth)
469     return 0;
470   if (pth->size > size)
471   {
472     schedule_transmission (pth);
473     return 0;
474   }
475   if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
476   {
477     GNUNET_SCHEDULER_cancel (pth->timeout_task);
478     pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
479   }
480   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
481   if (GNUNET_YES == pth->is_query)
482   {
483     cp->ppd.last_request_times[(cp->last_request_times_off++) %
484                                MAX_QUEUE_PER_PEER] =
485         GNUNET_TIME_absolute_get ();
486     GNUNET_assert (0 < cp->ppd.pending_queries--);
487   }
488   else if (GNUNET_NO == pth->is_query)
489   {
490     GNUNET_assert (0 < cp->ppd.pending_replies--);
491   }
492   GNUNET_LOAD_update (cp->ppd.transmission_delay,
493                       GNUNET_TIME_absolute_get_duration
494                       (pth->transmission_request_start_time).rel_value_us);
495   ret = pth->gmc (pth->gmc_cls, size, buf);
496   if (NULL != (pos = cp->pth_head))
497   {
498     GNUNET_assert (pos != pth);
499     schedule_transmission (pos);
500   }
501   GNUNET_free (pth);
502   return ret;
503 }
504
505
506 /**
507  * (re)try to reserve bandwidth from the given peer.
508  *
509  * @param cls the `struct GSF_ConnectedPeer` to reserve from
510  * @param tc scheduler context
511  */
512 static void
513 retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
514 {
515   struct GSF_ConnectedPeer *cp = cls;
516   struct GNUNET_PeerIdentity target;
517
518   GNUNET_PEER_resolve (cp->ppd.pid, &target);
519   cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
520   cp->rc =
521     GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
522                                   &ats_reserve_callback, cp);
523 }
524
525
526 /**
527  * Function called by core upon success or failure of our bandwidth reservation request.
528  *
529  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
530  * @param peer identifies the peer
531  * @param amount set to the amount that was actually reserved or unreserved;
532  *               either the full requested amount or zero (no partial reservations)
533  * @param res_delay if the reservation could not be satisfied (amount was 0), how
534  *        long should the client wait until re-trying?
535  */
536 static void
537 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
538                       int32_t amount, struct GNUNET_TIME_Relative res_delay)
539 {
540   struct GSF_ConnectedPeer *cp = cls;
541   struct GSF_PeerTransmitHandle *pth;
542
543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
544               "Reserved %d bytes / need to wait %s for reservation\n",
545               (int) amount,
546               GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
547   cp->rc = NULL;
548   if (0 == amount)
549   {
550     cp->rc_delay_task =
551         GNUNET_SCHEDULER_add_delayed (res_delay, &retry_reservation, cp);
552     return;
553   }
554   cp->did_reserve = GNUNET_YES;
555   pth = cp->pth_head;
556   if ((NULL != pth) && (NULL == cp->cth) && (0 == cp->cth_in_progress))
557   {
558     /* reservation success, try transmission now! */
559     cp->cth_in_progress++;
560     cp->cth =
561         GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
562                                            GNUNET_CORE_PRIO_BACKGROUND,
563                                            GNUNET_TIME_absolute_get_remaining
564                                            (pth->timeout), peer, pth->size,
565                                            &peer_transmit_ready_cb, cp);
566     GNUNET_assert (NULL != cp->cth);
567     GNUNET_assert (0 < cp->cth_in_progress--);
568   }
569 }
570
571
572 /**
573  * Function called by PEERSTORE with peer respect record
574  *
575  * @param cls handle to connected peer entry
576  * @param record peerstore record information
577  * @param emsg error message, or NULL if no errors
578  * @return #GNUNET_NO to stop iterating since we only expect 0 or 1 records
579  */
580 static int
581 peer_respect_cb (void *cls,
582                  struct GNUNET_PEERSTORE_Record *record,
583                  char *emsg)
584 {
585   struct GSF_ConnectedPeer *cp = cls;
586
587   cp->respect_iterate_req = NULL;
588   if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size))
589     cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value);
590   GSF_push_start_ (cp);
591   if (NULL != cp->creation_cb)
592     cp->creation_cb (cp->creation_cb_cls, cp);
593   return GNUNET_NO;
594 }
595
596
597 /**
598  * A peer connected to us.  Setup the connected peer
599  * records.
600  *
601  * @param peer identity of peer that connected
602  * @param creation_cb callback function when the record is created.
603  * @param creation_cb_cls closure for @creation_cb
604  */
605 void
606 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
607                            GSF_ConnectedPeerCreationCallback creation_cb,
608                            void *creation_cb_cls)
609 {
610   struct GSF_ConnectedPeer *cp;
611
612   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
613               "Connected to peer %s\n",
614               GNUNET_i2s (peer));
615   cp = GNUNET_new (struct GSF_ConnectedPeer);
616   cp->ppd.pid = GNUNET_PEER_intern (peer);
617   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
618   cp->rc =
619       GNUNET_ATS_reserve_bandwidth (GSF_ats, peer, DBLOCK_SIZE,
620                                     &ats_reserve_callback, cp);
621   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
622   GNUNET_break (GNUNET_OK ==
623                 GNUNET_CONTAINER_multipeermap_put (cp_map,
624                GSF_connected_peer_get_identity2_ (cp),
625                                                    cp,
626                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
627   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
628                          GNUNET_CONTAINER_multipeermap_size (cp_map),
629                          GNUNET_NO);
630   cp->creation_cb = creation_cb;
631   cp->creation_cb_cls = creation_cb_cls;
632   cp->respect_iterate_req =
633       GNUNET_PEERSTORE_iterate (peerstore, "fs", peer, "respect",
634                                 GNUNET_TIME_UNIT_FOREVER_REL, &peer_respect_cb,
635                                 cp);
636 }
637
638
639 /**
640  * It may be time to re-start migrating content to this
641  * peer.  Check, and if so, restart migration.
642  *
643  * @param cls the `struct GSF_ConnectedPeer`
644  * @param tc scheduler context
645  */
646 static void
647 revive_migration (void *cls,
648                   const struct GNUNET_SCHEDULER_TaskContext *tc)
649 {
650   struct GSF_ConnectedPeer *cp = cls;
651   struct GNUNET_TIME_Relative bt;
652
653   cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
654   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
655   if (0 != bt.rel_value_us)
656   {
657     /* still time left... */
658     cp->mig_revive_task =
659         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
660     return;
661   }
662   GSF_push_start_ (cp);
663 }
664
665
666 /**
667  * Get a handle for a connected peer.
668  *
669  * @param peer peer's identity
670  * @return NULL if the peer is not currently connected
671  */
672 struct GSF_ConnectedPeer *
673 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
674 {
675   if (NULL == cp_map)
676     return NULL;
677   return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
678 }
679
680
681 /**
682  * Handle P2P "MIGRATION_STOP" message.
683  *
684  * @param cls closure, always NULL
685  * @param other the other peer involved (sender or receiver, NULL
686  *        for loopback messages where we are both sender and receiver)
687  * @param message the actual message
688  * @return #GNUNET_OK to keep the connection open,
689  *         #GNUNET_SYSERR to close it (signal serious error)
690  */
691 int
692 GSF_handle_p2p_migration_stop_ (void *cls,
693                                 const struct GNUNET_PeerIdentity *other,
694                                 const struct GNUNET_MessageHeader *message)
695 {
696   struct GSF_ConnectedPeer *cp;
697   const struct MigrationStopMessage *msm;
698   struct GNUNET_TIME_Relative bt;
699
700   msm = (const struct MigrationStopMessage *) message;
701   cp = GSF_peer_get_ (other);
702   if (NULL == cp)
703   {
704     GNUNET_break (0);
705     return GNUNET_OK;
706   }
707   GNUNET_STATISTICS_update (GSF_stats,
708                             gettext_noop ("# migration stop messages received"),
709                             1, GNUNET_NO);
710   bt = GNUNET_TIME_relative_ntoh (msm->duration);
711   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
712               _("Migration of content to peer `%s' blocked for %s\n"),
713               GNUNET_i2s (other),
714               GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
715   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
716   if (GNUNET_SCHEDULER_NO_TASK == cp->mig_revive_task)
717   {
718     GSF_push_stop_ (cp);
719     cp->mig_revive_task =
720         GNUNET_SCHEDULER_add_delayed (bt,
721                                       &revive_migration, cp);
722   }
723   return GNUNET_OK;
724 }
725
726
727 /**
728  * Copy reply and free put message.
729  *
730  * @param cls the `struct PutMessage`
731  * @param buf_size number of bytes available in @a buf
732  * @param buf where to copy the message, NULL on error (peer disconnect)
733  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
734  */
735 static size_t
736 copy_reply (void *cls, size_t buf_size, void *buf)
737 {
738   struct PutMessage *pm = cls;
739   size_t size;
740
741   if (NULL != buf)
742   {
743     GNUNET_assert (buf_size >= ntohs (pm->header.size));
744     size = ntohs (pm->header.size);
745     memcpy (buf, pm, size);
746     GNUNET_STATISTICS_update (GSF_stats,
747                               gettext_noop
748                               ("# replies transmitted to other peers"), 1,
749                               GNUNET_NO);
750   }
751   else
752   {
753     size = 0;
754     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies dropped"), 1,
755                               GNUNET_NO);
756   }
757   GNUNET_free (pm);
758   return size;
759 }
760
761
762 /**
763  * Free resources associated with the given peer request.
764  *
765  * @param peerreq request to free
766  * @param query associated key for the request
767  */
768 static void
769 free_pending_request (struct PeerRequest *peerreq,
770                       const struct GNUNET_HashCode *query)
771 {
772   struct GSF_ConnectedPeer *cp = peerreq->cp;
773
774   if (GNUNET_SCHEDULER_NO_TASK != peerreq->kill_task)
775   {
776     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
777     peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
778   }
779   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
780                             -1, GNUNET_NO);
781   GNUNET_break (GNUNET_YES ==
782                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
783                                                       query, peerreq));
784   GNUNET_free (peerreq);
785 }
786
787
788 /**
789  * Cancel all requests associated with the peer.
790  *
791  * @param cls unused
792  * @param query hash code of the request
793  * @param value the `struct GSF_PendingRequest`
794  * @return #GNUNET_YES (continue to iterate)
795  */
796 static int
797 cancel_pending_request (void *cls,
798                         const struct GNUNET_HashCode *query,
799                         void *value)
800 {
801   struct PeerRequest *peerreq = value;
802   struct GSF_PendingRequest *pr = peerreq->pr;
803   struct GSF_PendingRequestData *prd;
804
805   prd = GSF_pending_request_get_data_ (pr);
806   GSF_pending_request_cancel_ (pr, GNUNET_NO);
807   free_pending_request (peerreq, &prd->query);
808   return GNUNET_OK;
809 }
810
811
812 /**
813  * Free the given request.
814  *
815  * @param cls the request to free
816  * @param tc task context
817  */
818 static void
819 peer_request_destroy (void *cls,
820                       const struct GNUNET_SCHEDULER_TaskContext *tc)
821 {
822   struct PeerRequest *peerreq = cls;
823   struct GSF_PendingRequest *pr = peerreq->pr;
824   struct GSF_PendingRequestData *prd;
825
826   peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
827   prd = GSF_pending_request_get_data_ (pr);
828   cancel_pending_request (NULL, &prd->query, peerreq);
829 }
830
831
832 /**
833  * The artificial delay is over, transmit the message now.
834  *
835  * @param cls the 'struct GSF_DelayedHandle' with the message
836  * @param tc scheduler context
837  */
838 static void
839 transmit_delayed_now (void *cls,
840                       const struct GNUNET_SCHEDULER_TaskContext *tc)
841 {
842   struct GSF_DelayedHandle *dh = cls;
843   struct GSF_ConnectedPeer *cp = dh->cp;
844
845   GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
846   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
847   {
848     GNUNET_free (dh->pm);
849     GNUNET_free (dh);
850     return;
851   }
852   (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
853                              dh->msize, &copy_reply, dh->pm);
854   GNUNET_free (dh);
855 }
856
857
858 /**
859  * Get the randomized delay a response should be subjected to.
860  *
861  * @return desired delay
862  */
863 static struct GNUNET_TIME_Relative
864 get_randomized_delay ()
865 {
866   struct GNUNET_TIME_Relative ret;
867
868   ret =
869       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
870                                      GNUNET_CRYPTO_random_u32
871                                      (GNUNET_CRYPTO_QUALITY_WEAK,
872                                       2 * GSF_avg_latency.rel_value_us + 1));
873 #if INSANE_STATISTICS
874   GNUNET_STATISTICS_update (GSF_stats,
875                             gettext_noop
876                             ("# artificial delays introduced (ms)"),
877                             ret.rel_value_us / 1000LL, GNUNET_NO);
878 #endif
879   return ret;
880 }
881
882
883 /**
884  * Handle a reply to a pending request.  Also called if a request
885  * expires (then with data == NULL).  The handler may be called
886  * many times (depending on the request type), but will not be
887  * called during or after a call to GSF_pending_request_cancel
888  * and will also not be called anymore after a call signalling
889  * expiration.
890  *
891  * @param cls `struct PeerRequest` this is an answer for
892  * @param eval evaluation of the result
893  * @param pr handle to the original pending request
894  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
895  * @param expiration when does @a data expire?
896  * @param last_transmission when did we last transmit a request for this block
897  * @param type type of the block
898  * @param data response data, NULL on request expiration
899  * @param data_len number of bytes in @a data
900  */
901 static void
902 handle_p2p_reply (void *cls,
903                   enum GNUNET_BLOCK_EvaluationResult eval,
904                   struct GSF_PendingRequest *pr,
905                   uint32_t reply_anonymity_level,
906                   struct GNUNET_TIME_Absolute expiration,
907                   struct GNUNET_TIME_Absolute last_transmission,
908                   enum GNUNET_BLOCK_Type type,
909                   const void *data,
910                   size_t data_len)
911 {
912   struct PeerRequest *peerreq = cls;
913   struct GSF_ConnectedPeer *cp = peerreq->cp;
914   struct GSF_PendingRequestData *prd;
915   struct PutMessage *pm;
916   size_t msize;
917
918   GNUNET_assert (data_len + sizeof (struct PutMessage) <
919                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
920   GNUNET_assert (peerreq->pr == pr);
921   prd = GSF_pending_request_get_data_ (pr);
922   if (NULL == data)
923   {
924     free_pending_request (peerreq, &prd->query);
925     return;
926   }
927   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
928   if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
929   {
930     GNUNET_STATISTICS_update (GSF_stats,
931                               gettext_noop
932                               ("# replies dropped due to type mismatch"),
933                                 1, GNUNET_NO);
934     return;
935   }
936   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
937               "Transmitting result for query `%s' to peer\n",
938               GNUNET_h2s (&prd->query));
939   GNUNET_STATISTICS_update (GSF_stats,
940                             gettext_noop ("# replies received for other peers"),
941                             1, GNUNET_NO);
942   msize = sizeof (struct PutMessage) + data_len;
943   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
944   {
945     GNUNET_break (0);
946     return;
947   }
948   if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
949   {
950     if (reply_anonymity_level - 1 > GSF_cover_content_count)
951     {
952       GNUNET_STATISTICS_update (GSF_stats,
953                                 gettext_noop
954                                 ("# replies dropped due to insufficient cover traffic"),
955                                 1, GNUNET_NO);
956       return;
957     }
958     GSF_cover_content_count -= (reply_anonymity_level - 1);
959   }
960
961   pm = GNUNET_malloc (msize);
962   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
963   pm->header.size = htons (msize);
964   pm->type = htonl (type);
965   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
966   memcpy (&pm[1], data, data_len);
967   if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) &&
968       (GNUNET_YES == GSF_enable_randomized_delays))
969   {
970     struct GSF_DelayedHandle *dh;
971
972     dh = GNUNET_new (struct GSF_DelayedHandle);
973     dh->cp = cp;
974     dh->pm = pm;
975     dh->msize = msize;
976     GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
977     dh->delay_task =
978         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
979                                       &transmit_delayed_now, dh);
980   }
981   else
982   {
983     (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
984                                &copy_reply, pm);
985   }
986   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
987     return;
988   if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task)
989   {
990     GNUNET_STATISTICS_update (GSF_stats,
991                               gettext_noop
992                               ("# P2P searches destroyed due to ultimate reply"),
993                               1, GNUNET_NO);
994     peerreq->kill_task =
995         GNUNET_SCHEDULER_add_now (&peer_request_destroy, peerreq);
996   }
997 }
998
999
1000 /**
1001  * Increase the peer's respect by a value.
1002  *
1003  * @param cp which peer to change the respect value on
1004  * @param value is the int value by which the
1005  *  peer's credit is to be increased or decreased
1006  * @returns the actual change in respect (positive or negative)
1007  */
1008 static int
1009 change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
1010 {
1011   if (0 == value)
1012     return 0;
1013   GNUNET_assert (NULL != cp);
1014   if (value > 0)
1015   {
1016     if (cp->ppd.respect + value < cp->ppd.respect)
1017     {
1018       value = UINT32_MAX - cp->ppd.respect;
1019       cp->ppd.respect = UINT32_MAX;
1020     }
1021     else
1022       cp->ppd.respect += value;
1023   }
1024   else
1025   {
1026     if (cp->ppd.respect < -value)
1027     {
1028       value = -cp->ppd.respect;
1029       cp->ppd.respect = 0;
1030     }
1031     else
1032       cp->ppd.respect += value;
1033   }
1034   return value;
1035 }
1036
1037
1038 /**
1039  * We've received a request with the specified priority.  Bound it
1040  * according to how much we respect the given peer.
1041  *
1042  * @param prio_in requested priority
1043  * @param cp the peer making the request
1044  * @return effective priority
1045  */
1046 static int32_t
1047 bound_priority (uint32_t prio_in, struct GSF_ConnectedPeer *cp)
1048 {
1049 #define N ((double)128.0)
1050   uint32_t ret;
1051   double rret;
1052   int ld;
1053
1054   ld = GSF_test_get_load_too_high_ (0);
1055   if (GNUNET_SYSERR == ld)
1056   {
1057 #if INSANE_STATISTICS
1058     GNUNET_STATISTICS_update (GSF_stats,
1059                               gettext_noop
1060                               ("# requests done for free (low load)"), 1,
1061                               GNUNET_NO);
1062 #endif
1063     return 0;                   /* excess resources */
1064   }
1065   if (prio_in > INT32_MAX)
1066     prio_in = INT32_MAX;
1067   ret = -change_peer_respect (cp, -(int) prio_in);
1068   if (ret > 0)
1069   {
1070     if (ret > GSF_current_priorities + N)
1071       rret = GSF_current_priorities + N;
1072     else
1073       rret = ret;
1074     GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1075   }
1076   if ((GNUNET_YES == ld) && (ret > 0))
1077   {
1078     /* try with charging */
1079     ld = GSF_test_get_load_too_high_ (ret);
1080   }
1081   if (GNUNET_YES == ld)
1082   {
1083     GNUNET_STATISTICS_update (GSF_stats,
1084                               gettext_noop
1085                               ("# request dropped, priority insufficient"), 1,
1086                               GNUNET_NO);
1087     /* undo charge */
1088     change_peer_respect (cp, (int) ret);
1089     return -1;                  /* not enough resources */
1090   }
1091   else
1092   {
1093     GNUNET_STATISTICS_update (GSF_stats,
1094                               gettext_noop
1095                               ("# requests done for a price (normal load)"), 1,
1096                               GNUNET_NO);
1097   }
1098 #undef N
1099   return ret;
1100 }
1101
1102
1103 /**
1104  * The priority level imposes a bound on the maximum
1105  * value for the ttl that can be requested.
1106  *
1107  * @param ttl_in requested ttl
1108  * @param prio given priority
1109  * @return ttl_in if ttl_in is below the limit,
1110  *         otherwise the ttl-limit for the given priority
1111  */
1112 static int32_t
1113 bound_ttl (int32_t ttl_in, uint32_t prio)
1114 {
1115   unsigned long long allowed;
1116
1117   if (ttl_in <= 0)
1118     return ttl_in;
1119   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1120   if (ttl_in > allowed)
1121   {
1122     if (allowed >= (1 << 30))
1123       return 1 << 30;
1124     return allowed;
1125   }
1126   return ttl_in;
1127 }
1128
1129
1130 /**
1131  * Handle P2P "QUERY" message.  Creates the pending request entry
1132  * and sets up all of the data structures to that we will
1133  * process replies properly.  Does not initiate forwarding or
1134  * local database lookups.
1135  *
1136  * @param other the other peer involved (sender or receiver, NULL
1137  *        for loopback messages where we are both sender and receiver)
1138  * @param message the actual message
1139  * @return pending request handle, NULL on error
1140  */
1141 struct GSF_PendingRequest *
1142 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1143                        const struct GNUNET_MessageHeader *message)
1144 {
1145   struct PeerRequest *peerreq;
1146   struct GSF_PendingRequest *pr;
1147   struct GSF_PendingRequestData *prd;
1148   struct GSF_ConnectedPeer *cp;
1149   struct GSF_ConnectedPeer *cps;
1150   const struct GNUNET_PeerIdentity *target;
1151   enum GSF_PendingRequestOptions options;
1152   uint16_t msize;
1153   const struct GetMessage *gm;
1154   unsigned int bits;
1155   const struct GNUNET_PeerIdentity *opt;
1156   uint32_t bm;
1157   size_t bfsize;
1158   uint32_t ttl_decrement;
1159   int32_t priority;
1160   int32_t ttl;
1161   enum GNUNET_BLOCK_Type type;
1162   GNUNET_PEER_Id spid;
1163
1164   GNUNET_assert (other != NULL);
1165   msize = ntohs (message->size);
1166   if (msize < sizeof (struct GetMessage))
1167   {
1168     GNUNET_break_op (0);
1169     return NULL;
1170   }
1171   GNUNET_STATISTICS_update (GSF_stats,
1172                             gettext_noop
1173                             ("# GET requests received (from other peers)"), 1,
1174                             GNUNET_NO);
1175   gm = (const struct GetMessage *) message;
1176   type = ntohl (gm->type);
1177   bm = ntohl (gm->hash_bitmap);
1178   bits = 0;
1179   while (bm > 0)
1180   {
1181     if (1 == (bm & 1))
1182       bits++;
1183     bm >>= 1;
1184   }
1185   if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
1186   {
1187     GNUNET_break_op (0);
1188     return NULL;
1189   }
1190   opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1191   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
1192   /* bfsize must be power of 2, check! */
1193   if (0 != ((bfsize - 1) & bfsize))
1194   {
1195     GNUNET_break_op (0);
1196     return NULL;
1197   }
1198   GSF_cover_query_count++;
1199   bm = ntohl (gm->hash_bitmap);
1200   bits = 0;
1201   cps = GSF_peer_get_ (other);
1202   if (NULL == cps)
1203   {
1204     /* peer must have just disconnected */
1205     GNUNET_STATISTICS_update (GSF_stats,
1206                               gettext_noop
1207                               ("# requests dropped due to initiator not being connected"),
1208                               1, GNUNET_NO);
1209     return NULL;
1210   }
1211   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1212     cp = GSF_peer_get_ (&opt[bits++]);
1213   else
1214     cp = cps;
1215   if (NULL == cp)
1216   {
1217     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1218       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1219                   "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
1220                   GNUNET_i2s (&opt[bits - 1]));
1221
1222     else
1223       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1224                   "Failed to find peer `%4s' in connection set. Dropping query.\n",
1225                   GNUNET_i2s (other));
1226 #if INSANE_STATISTICS
1227     GNUNET_STATISTICS_update (GSF_stats,
1228                               gettext_noop
1229                               ("# requests dropped due to missing reverse route"),
1230                               1, GNUNET_NO);
1231 #endif
1232     return NULL;
1233   }
1234   /* note that we can really only check load here since otherwise
1235    * peers could find out that we are overloaded by not being
1236    * disconnected after sending us a malformed query... */
1237   priority = bound_priority (ntohl (gm->priority), cps);
1238   if (priority < 0)
1239   {
1240     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1241                 "Dropping query from `%s', this peer is too busy.\n",
1242                 GNUNET_i2s (other));
1243     return NULL;
1244   }
1245   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1246               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
1247               GNUNET_h2s (&gm->query),
1248               (unsigned int) type,
1249               GNUNET_i2s (other),
1250               (unsigned int) bm);
1251   target =
1252       (0 !=
1253        (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1254   options = GSF_PRO_DEFAULTS;
1255   spid = 0;
1256   if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
1257       || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1258           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
1259           GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1260   {
1261     /* don't have BW to send to peer, or would likely take longer than we have for it,
1262      * so at best indirect the query */
1263     priority = 0;
1264     options |= GSF_PRO_FORWARD_ONLY;
1265     spid = GNUNET_PEER_intern (other);
1266     GNUNET_assert (0 != spid);
1267   }
1268   ttl = bound_ttl (ntohl (gm->ttl), priority);
1269   /* decrement ttl (always) */
1270   ttl_decrement =
1271       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1272                                                     TTL_DECREMENT);
1273   if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
1274   {
1275     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1276                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1277                 GNUNET_i2s (other), ttl, ttl_decrement);
1278     GNUNET_STATISTICS_update (GSF_stats,
1279                               gettext_noop
1280                               ("# requests dropped due TTL underflow"), 1,
1281                               GNUNET_NO);
1282     /* integer underflow => drop (should be very rare)! */
1283     return NULL;
1284   }
1285   ttl -= ttl_decrement;
1286
1287   /* test if the request already exists */
1288   peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map, &gm->query);
1289   if (peerreq != NULL)
1290   {
1291     pr = peerreq->pr;
1292     prd = GSF_pending_request_get_data_ (pr);
1293     if (prd->type == type)
1294     {
1295       if (prd->ttl.abs_value_us >= GNUNET_TIME_absolute_get ().abs_value_us + ttl * 1000LL)
1296       {
1297         /* existing request has higher TTL, drop new one! */
1298         prd->priority += priority;
1299         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1300                     "Have existing request with higher TTL, dropping new request.\n",
1301                     GNUNET_i2s (other));
1302         GNUNET_STATISTICS_update (GSF_stats,
1303                                   gettext_noop
1304                                   ("# requests dropped due to higher-TTL request"),
1305                                   1, GNUNET_NO);
1306         return NULL;
1307       }
1308       /* existing request has lower TTL, drop old one! */
1309       priority += prd->priority;
1310       GSF_pending_request_cancel_ (pr, GNUNET_YES);
1311       free_pending_request (peerreq, &gm->query);
1312     }
1313   }
1314
1315   peerreq = GNUNET_new (struct PeerRequest);
1316   peerreq->cp = cp;
1317   pr = GSF_pending_request_create_ (options, type, &gm->query,
1318                                     target,
1319                                     (bfsize >
1320                                      0) ? (const char *) &opt[bits] : NULL,
1321                                     bfsize, ntohl (gm->filter_mutator),
1322                                     1 /* anonymity */ ,
1323                                     (uint32_t) priority, ttl, spid, GNUNET_PEER_intern (other), NULL, 0,        /* replies_seen */
1324                                     &handle_p2p_reply, peerreq);
1325   GNUNET_assert (NULL != pr);
1326   peerreq->pr = pr;
1327   GNUNET_break (GNUNET_OK ==
1328                 GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query,
1329                                                    peerreq,
1330                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1331   GNUNET_STATISTICS_update (GSF_stats,
1332                             gettext_noop
1333                             ("# P2P query messages received and processed"), 1,
1334                             GNUNET_NO);
1335   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
1336                             1, GNUNET_NO);
1337   return pr;
1338 }
1339
1340
1341 /**
1342  * Function called if there has been a timeout trying to satisfy
1343  * a transmission request.
1344  *
1345  * @param cls the 'struct GSF_PeerTransmitHandle' of the request
1346  * @param tc scheduler context
1347  */
1348 static void
1349 peer_transmit_timeout (void *cls,
1350                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1351 {
1352   struct GSF_PeerTransmitHandle *pth = cls;
1353   struct GSF_ConnectedPeer *cp;
1354
1355   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1356               "Timeout trying to transmit to other peer\n");
1357   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1358   cp = pth->cp;
1359   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1360   if (GNUNET_YES == pth->is_query)
1361     GNUNET_assert (0 < cp->ppd.pending_queries--);
1362   else if (GNUNET_NO == pth->is_query)
1363     GNUNET_assert (0 < cp->ppd.pending_replies--);
1364   GNUNET_LOAD_update (cp->ppd.transmission_delay, UINT64_MAX);
1365   if (NULL != cp->cth)
1366   {
1367     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1368     cp->cth = NULL;
1369   }
1370   pth->gmc (pth->gmc_cls, 0, NULL);
1371   GNUNET_assert (0 == cp->cth_in_progress);
1372   GNUNET_free (pth);
1373 }
1374
1375
1376 /**
1377  * Transmit a message to the given peer as soon as possible.
1378  * If the peer disconnects before the transmission can happen,
1379  * the callback is invoked with a `NULL` @a buffer.
1380  *
1381  * @param cp target peer
1382  * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1383  * @param priority how important is this request?
1384  * @param timeout when does this request timeout (call gmc with error)
1385  * @param size number of bytes we would like to send to the peer
1386  * @param gmc function to call to get the message
1387  * @param gmc_cls closure for @a gmc
1388  * @return handle to cancel request
1389  */
1390 struct GSF_PeerTransmitHandle *
1391 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1392                     int is_query,
1393                     uint32_t priority,
1394                     struct GNUNET_TIME_Relative timeout,
1395                     size_t size,
1396                     GSF_GetMessageCallback gmc, void *gmc_cls)
1397 {
1398   struct GSF_PeerTransmitHandle *pth;
1399   struct GSF_PeerTransmitHandle *pos;
1400   struct GSF_PeerTransmitHandle *prev;
1401
1402   pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1403   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1404   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1405   pth->gmc = gmc;
1406   pth->gmc_cls = gmc_cls;
1407   pth->size = size;
1408   pth->is_query = is_query;
1409   pth->priority = priority;
1410   pth->cp = cp;
1411   /* insertion sort (by priority, descending) */
1412   prev = NULL;
1413   pos = cp->pth_head;
1414   while ((NULL != pos) && (pos->priority > priority))
1415   {
1416     prev = pos;
1417     pos = pos->next;
1418   }
1419   GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
1420   if (GNUNET_YES == is_query)
1421     cp->ppd.pending_queries++;
1422   else if (GNUNET_NO == is_query)
1423     cp->ppd.pending_replies++;
1424   pth->timeout_task =
1425       GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
1426   schedule_transmission (pth);
1427   return pth;
1428 }
1429
1430
1431 /**
1432  * Cancel an earlier request for transmission.
1433  *
1434  * @param pth request to cancel
1435  */
1436 void
1437 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1438 {
1439   struct GSF_ConnectedPeer *cp;
1440
1441   if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
1442   {
1443     GNUNET_SCHEDULER_cancel (pth->timeout_task);
1444     pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1445   }
1446   cp = pth->cp;
1447   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1448   if (GNUNET_YES == pth->is_query)
1449     GNUNET_assert (0 < cp->ppd.pending_queries--);
1450   else if (GNUNET_NO == pth->is_query)
1451     GNUNET_assert (0 < cp->ppd.pending_replies--);
1452   GNUNET_free (pth);
1453 }
1454
1455
1456 /**
1457  * Report on receiving a reply; update the performance record of the given peer.
1458  *
1459  * @param cp responding peer (will be updated)
1460  * @param request_time time at which the original query was transmitted
1461  * @param request_priority priority of the original request
1462  */
1463 void
1464 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1465                               struct GNUNET_TIME_Absolute request_time,
1466                               uint32_t request_priority)
1467 {
1468   struct GNUNET_TIME_Relative delay;
1469
1470   delay = GNUNET_TIME_absolute_get_duration (request_time);
1471   cp->ppd.avg_reply_delay.rel_value_us =
1472       (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
1473        delay.rel_value_us) / RUNAVG_DELAY_N;
1474   cp->ppd.avg_priority =
1475       (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1476        request_priority) / RUNAVG_DELAY_N;
1477 }
1478
1479
1480 /**
1481  * Report on receiving a reply in response to an initiating client.
1482  * Remember that this peer is good for this client.
1483  *
1484  * @param cp responding peer (will be updated)
1485  * @param initiator_client local client on responsible for query
1486  */
1487 void
1488 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1489                                    struct GSF_LocalClient *initiator_client)
1490 {
1491   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1492                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1493 }
1494
1495
1496 /**
1497  * Report on receiving a reply in response to an initiating peer.
1498  * Remember that this peer is good for this initiating peer.
1499  *
1500  * @param cp responding peer (will be updated)
1501  * @param initiator_peer other peer responsible for query
1502  */
1503 void
1504 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1505                                  const struct GSF_ConnectedPeer *initiator_peer)
1506 {
1507   unsigned int woff;
1508
1509   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1510   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1511   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1512   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1513   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1514 }
1515
1516
1517 /**
1518  * A peer disconnected from us.  Tear down the connected peer
1519  * record.
1520  *
1521  * @param cls unused
1522  * @param peer identity of peer that connected
1523  */
1524 void
1525 GSF_peer_disconnect_handler_ (void *cls,
1526                               const struct GNUNET_PeerIdentity *peer)
1527 {
1528   struct GSF_ConnectedPeer *cp;
1529   struct GSF_PeerTransmitHandle *pth;
1530   struct GSF_DelayedHandle *dh;
1531
1532   cp = GSF_peer_get_ (peer);
1533   if (NULL == cp)
1534     return;                     /* must have been disconnect from core with
1535                                  * 'peer' == my_id, ignore */
1536   GNUNET_assert (GNUNET_YES ==
1537                  GNUNET_CONTAINER_multipeermap_remove (cp_map,
1538                                                        peer,
1539                                                        cp));
1540   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1541                          GNUNET_CONTAINER_multipeermap_size (cp_map),
1542                          GNUNET_NO);
1543   if (NULL != cp->respect_iterate_req)
1544   {
1545     GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1546     cp->respect_iterate_req = NULL;
1547   }
1548   if (NULL != cp->migration_pth)
1549   {
1550     GSF_peer_transmit_cancel_ (cp->migration_pth);
1551     cp->migration_pth = NULL;
1552   }
1553   if (NULL != cp->rc)
1554   {
1555     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1556     cp->rc = NULL;
1557   }
1558   if (GNUNET_SCHEDULER_NO_TASK != cp->rc_delay_task)
1559   {
1560     GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1561     cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
1562   }
1563   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1564                                          &cancel_pending_request, cp);
1565   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1566   cp->request_map = NULL;
1567   GSF_plan_notify_peer_disconnect_ (cp);
1568   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1569   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1570   memset (cp->ppd.last_p2p_replies, 0, sizeof (cp->ppd.last_p2p_replies));
1571   GSF_push_stop_ (cp);
1572   if (NULL != cp->cth)
1573   {
1574     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1575     cp->cth = NULL;
1576   }
1577   GNUNET_assert (0 == cp->cth_in_progress);
1578   while (NULL != (pth = cp->pth_head))
1579   {
1580     if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1581     {
1582       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1583       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1584     }
1585     GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1586     pth->gmc (pth->gmc_cls, 0, NULL);
1587     GNUNET_free (pth);
1588   }
1589   while (NULL != (dh = cp->delayed_head))
1590   {
1591     GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
1592     GNUNET_SCHEDULER_cancel (dh->delay_task);
1593     GNUNET_free (dh->pm);
1594     GNUNET_free (dh);
1595   }
1596   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1597   if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1598   {
1599     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1600     cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
1601   }
1602   GNUNET_free (cp);
1603 }
1604
1605
1606 /**
1607  * Closure for #call_iterator().
1608  */
1609 struct IterationContext
1610 {
1611   /**
1612    * Function to call on each entry.
1613    */
1614   GSF_ConnectedPeerIterator it;
1615
1616   /**
1617    * Closure for @e it.
1618    */
1619   void *it_cls;
1620 };
1621
1622
1623 /**
1624  * Function that calls the callback for each peer.
1625  *
1626  * @param cls the `struct IterationContext *`
1627  * @param key identity of the peer
1628  * @param value the `struct GSF_ConnectedPeer *`
1629  * @return #GNUNET_YES to continue iteration
1630  */
1631 static int
1632 call_iterator (void *cls,
1633                const struct GNUNET_PeerIdentity *key,
1634                void *value)
1635 {
1636   struct IterationContext *ic = cls;
1637   struct GSF_ConnectedPeer *cp = value;
1638
1639   ic->it (ic->it_cls,
1640           key, cp,
1641           &cp->ppd);
1642   return GNUNET_YES;
1643 }
1644
1645
1646 /**
1647  * Iterate over all connected peers.
1648  *
1649  * @param it function to call for each peer
1650  * @param it_cls closure for @a it
1651  */
1652 void
1653 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1654                               void *it_cls)
1655 {
1656   struct IterationContext ic;
1657
1658   ic.it = it;
1659   ic.it_cls = it_cls;
1660   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1661                                          &call_iterator,
1662                                          &ic);
1663 }
1664
1665
1666 /**
1667  * Obtain the identity of a connected peer.
1668  *
1669  * @param cp peer to get identity of
1670  * @param id identity to set (written to)
1671  */
1672 void
1673 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1674                                   struct GNUNET_PeerIdentity *id)
1675 {
1676   GNUNET_assert (0 != cp->ppd.pid);
1677   GNUNET_PEER_resolve (cp->ppd.pid, id);
1678 }
1679
1680
1681 /**
1682  * Obtain the identity of a connected peer.
1683  *
1684  * @param cp peer to get identity of
1685  * @return reference to peer identity, valid until peer disconnects (!)
1686  */
1687 const struct GNUNET_PeerIdentity *
1688 GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1689 {
1690   GNUNET_assert (0 != cp->ppd.pid);
1691   return GNUNET_PEER_resolve2 (cp->ppd.pid);
1692 }
1693
1694
1695 /**
1696  * Assemble a migration stop message for transmission.
1697  *
1698  * @param cls the `struct GSF_ConnectedPeer` to use
1699  * @param size number of bytes we're allowed to write to @a buf
1700  * @param buf where to copy the message
1701  * @return number of bytes copied to @a buf
1702  */
1703 static size_t
1704 create_migration_stop_message (void *cls,
1705                                size_t size,
1706                                void *buf)
1707 {
1708   struct GSF_ConnectedPeer *cp = cls;
1709   struct MigrationStopMessage msm;
1710
1711   cp->migration_pth = NULL;
1712   if (NULL == buf)
1713     return 0;
1714   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1715   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1716   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1717   msm.reserved = htonl (0);
1718   msm.duration =
1719       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1720                                  (cp->last_migration_block));
1721   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1722   GNUNET_STATISTICS_update (GSF_stats,
1723                             gettext_noop ("# migration stop messages sent"),
1724                             1, GNUNET_NO);
1725   return sizeof (struct MigrationStopMessage);
1726 }
1727
1728
1729 /**
1730  * Ask a peer to stop migrating data to us until the given point
1731  * in time.
1732  *
1733  * @param cp peer to ask
1734  * @param block_time until when to block
1735  */
1736 void
1737 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1738                            struct GNUNET_TIME_Absolute block_time)
1739 {
1740   if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1741   {
1742     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1743                 "Migration already blocked for another %s\n",
1744                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
1745                                                         (cp->last_migration_block), GNUNET_YES));
1746     return;                     /* already blocked */
1747   }
1748   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1749               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
1750                                                       GNUNET_YES));
1751   cp->last_migration_block = block_time;
1752   if (NULL != cp->migration_pth)
1753     GSF_peer_transmit_cancel_ (cp->migration_pth);
1754   cp->migration_pth =
1755       GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1756                           GNUNET_TIME_UNIT_FOREVER_REL,
1757                           sizeof (struct MigrationStopMessage),
1758                           &create_migration_stop_message, cp);
1759 }
1760
1761
1762 /**
1763  * Write peer-respect information to a file - flush the buffer entry!
1764  *
1765  * @param cls unused
1766  * @param key peer identity
1767  * @param value the 'struct GSF_ConnectedPeer' to flush
1768  * @return GNUNET_OK to continue iteration
1769  */
1770 static int
1771 flush_respect (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
1772 {
1773   struct GSF_ConnectedPeer *cp = value;
1774   struct GNUNET_PeerIdentity pid;
1775
1776   if (cp->ppd.respect == cp->disk_respect)
1777     return GNUNET_OK;           /* unchanged */
1778   GNUNET_assert (0 != cp->ppd.pid);
1779   GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1780   GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1781                           sizeof (cp->ppd.respect),
1782                           GNUNET_TIME_UNIT_FOREVER_ABS,
1783                           GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL);
1784   return GNUNET_OK;
1785 }
1786
1787
1788 /**
1789  * Notify core about a preference we have for the given peer
1790  * (to allocate more resources towards it).  The change will
1791  * be communicated the next time we reserve bandwidth with
1792  * core (not instantly).
1793  *
1794  * @param cp peer to reserve bandwidth from
1795  * @param pref preference change
1796  */
1797 void
1798 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1799                                        uint64_t pref)
1800 {
1801   cp->inc_preference += pref;
1802 }
1803
1804
1805 /**
1806  * Call this method periodically to flush respect information to disk.
1807  *
1808  * @param cls closure, not used
1809  * @param tc task context, not used
1810  */
1811 static void
1812 cron_flush_respect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1813 {
1814
1815   if (NULL == cp_map)
1816     return;
1817   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &flush_respect, NULL);
1818   if (NULL == tc)
1819     return;
1820   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1821     return;
1822   GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1823                                               GNUNET_SCHEDULER_PRIORITY_HIGH,
1824                                               &cron_flush_respect, NULL);
1825 }
1826
1827
1828 /**
1829  * Initialize peer management subsystem.
1830  */
1831 void
1832 GSF_connected_peer_init_ ()
1833 {
1834   cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1835   peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
1836   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1837                                       &cron_flush_respect, NULL);
1838 }
1839
1840
1841 /**
1842  * Iterator to free peer entries.
1843  *
1844  * @param cls closure, unused
1845  * @param key current key code
1846  * @param value value in the hash map (peer entry)
1847  * @return #GNUNET_YES (we should continue to iterate)
1848  */
1849 static int
1850 clean_peer (void *cls,
1851             const struct GNUNET_PeerIdentity *key,
1852             void *value)
1853 {
1854   GSF_peer_disconnect_handler_ (NULL, key);
1855   return GNUNET_YES;
1856 }
1857
1858
1859 /**
1860  * Shutdown peer management subsystem.
1861  */
1862 void
1863 GSF_connected_peer_done_ ()
1864 {
1865   cron_flush_respect (NULL, NULL);
1866   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_peer, NULL);
1867   GNUNET_CONTAINER_multipeermap_destroy (cp_map);
1868   cp_map = NULL;
1869   GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
1870 }
1871
1872
1873 /**
1874  * Iterator to remove references to LC entry.
1875  *
1876  * @param cls the 'struct GSF_LocalClient*' to look for
1877  * @param key current key code
1878  * @param value value in the hash map (peer entry)
1879  * @return #GNUNET_YES (we should continue to iterate)
1880  */
1881 static int
1882 clean_local_client (void *cls,
1883                     const struct GNUNET_PeerIdentity *key,
1884                     void *value)
1885 {
1886   const struct GSF_LocalClient *lc = cls;
1887   struct GSF_ConnectedPeer *cp = value;
1888   unsigned int i;
1889
1890   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1891     if (cp->ppd.last_client_replies[i] == lc)
1892       cp->ppd.last_client_replies[i] = NULL;
1893   return GNUNET_YES;
1894 }
1895
1896
1897 /**
1898  * Notification that a local client disconnected.  Clean up all of our
1899  * references to the given handle.
1900  *
1901  * @param lc handle to the local client (henceforth invalid)
1902  */
1903 void
1904 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1905 {
1906   if (NULL == cp_map)
1907     return;                     /* already cleaned up */
1908   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client,
1909                                          (void *) lc);
1910 }
1911
1912
1913 /* end of gnunet-service-fs_cp.c */