count number of pending replies and refuse to process queries if queue gets too big
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_load_lib.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
31 #include "gnunet-service-fs_pe.h"
32 #include "gnunet-service-fs_pr.h"
33 #include "gnunet-service-fs_push.h"
34 #include "gnunet_peerstore_service.h"
35
36
37 /**
38  * Ratio for moving average delay calculation.  The previous
39  * average goes in with a factor of (n-1) into the calculation.
40  * Must be > 0.
41  */
42 #define RUNAVG_DELAY_N 16
43
44 /**
45  * How often do we flush respect values to disk?
46  */
47 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
48
49 /**
50  * After how long do we discard a reply?
51  */
52 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
53
54 /**
55  * Collect an instane number of statistics?  May cause excessive IPC.
56  */
57 #define INSANE_STATISTICS GNUNET_NO
58
59
60 /**
61  * Handle to cancel a transmission request.
62  */
63 struct GSF_PeerTransmitHandle
64 {
65
66   /**
67    * Kept in a doubly-linked list.
68    */
69   struct GSF_PeerTransmitHandle *next;
70
71   /**
72    * Kept in a doubly-linked list.
73    */
74   struct GSF_PeerTransmitHandle *prev;
75
76   /**
77    * Time when this transmission request was issued.
78    */
79   struct GNUNET_TIME_Absolute transmission_request_start_time;
80
81   /**
82    * Timeout for this request.
83    */
84   struct GNUNET_TIME_Absolute timeout;
85
86   /**
87    * Task called on timeout, or 0 for none.
88    */
89   struct GNUNET_SCHEDULER_Task *timeout_task;
90
91   /**
92    * Function to call to get the actual message.
93    */
94   GSF_GetMessageCallback gmc;
95
96   /**
97    * Peer this request targets.
98    */
99   struct GSF_ConnectedPeer *cp;
100
101   /**
102    * Closure for @e gmc.
103    */
104   void *gmc_cls;
105
106   /**
107    * Size of the message to be transmitted.
108    */
109   size_t size;
110
111   /**
112    * #GNUNET_YES if this is a query, #GNUNET_NO for content.
113    */
114   int is_query;
115
116   /**
117    * Did we get a reservation already?
118    */
119   int was_reserved;
120
121   /**
122    * Priority of this request.
123    */
124   uint32_t priority;
125
126 };
127
128
129 /**
130  * Handle for an entry in our delay list.
131  */
132 struct GSF_DelayedHandle
133 {
134
135   /**
136    * Kept in a doubly-linked list.
137    */
138   struct GSF_DelayedHandle *next;
139
140   /**
141    * Kept in a doubly-linked list.
142    */
143   struct GSF_DelayedHandle *prev;
144
145   /**
146    * Peer this transmission belongs to.
147    */
148   struct GSF_ConnectedPeer *cp;
149
150   /**
151    * The PUT that was delayed.
152    */
153   struct PutMessage *pm;
154
155   /**
156    * Task for the delay.
157    */
158   struct GNUNET_SCHEDULER_Task *delay_task;
159
160   /**
161    * Size of the message.
162    */
163   size_t msize;
164
165 };
166
167
168 /**
169  * Information per peer and request.
170  */
171 struct PeerRequest
172 {
173
174   /**
175    * Handle to generic request.
176    */
177   struct GSF_PendingRequest *pr;
178
179   /**
180    * Handle to specific peer.
181    */
182   struct GSF_ConnectedPeer *cp;
183
184   /**
185    * Task for asynchronous stopping of this request.
186    */
187   struct GNUNET_SCHEDULER_Task *kill_task;
188
189 };
190
191
192 /**
193  * A connected peer.
194  */
195 struct GSF_ConnectedPeer
196 {
197
198   /**
199    * Performance data for this peer.
200    */
201   struct GSF_PeerPerformanceData ppd;
202
203   /**
204    * Time until when we blocked this peer from migrating
205    * data to us.
206    */
207   struct GNUNET_TIME_Absolute last_migration_block;
208
209   /**
210    * Task scheduled to revive migration to this peer.
211    */
212   struct GNUNET_SCHEDULER_Task *mig_revive_task;
213
214   /**
215    * Messages (replies, queries, content migration) we would like to
216    * send to this peer in the near future.  Sorted by priority, head.
217    */
218   struct GSF_PeerTransmitHandle *pth_head;
219
220   /**
221    * Messages (replies, queries, content migration) we would like to
222    * send to this peer in the near future.  Sorted by priority, tail.
223    */
224   struct GSF_PeerTransmitHandle *pth_tail;
225
226   /**
227    * Messages (replies, queries, content migration) we would like to
228    * send to this peer in the near future.  Sorted by priority, head.
229    */
230   struct GSF_DelayedHandle *delayed_head;
231
232   /**
233    * Messages (replies, queries, content migration) we would like to
234    * send to this peer in the near future.  Sorted by priority, tail.
235    */
236   struct GSF_DelayedHandle *delayed_tail;
237
238   /**
239    * Migration stop message in our queue, or NULL if we have none pending.
240    */
241   struct GSF_PeerTransmitHandle *migration_pth;
242
243   /**
244    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
245    */
246   struct GNUNET_ATS_ReservationContext *rc;
247
248   /**
249    * Task scheduled if we need to retry bandwidth reservation later.
250    */
251   struct GNUNET_SCHEDULER_Task *rc_delay_task;
252
253   /**
254    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
255    */
256   struct GNUNET_CONTAINER_MultiHashMap *request_map;
257
258   /**
259    * Handle for an active request for transmission to this
260    * peer, or NULL (if core queue was full).
261    */
262   struct GNUNET_CORE_TransmitHandle *cth;
263
264   /**
265    * Increase in traffic preference still to be submitted
266    * to the core service for this peer.
267    */
268   uint64_t inc_preference;
269
270   /**
271    * Set to 1 if we're currently in the process of calling
272    * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
273    * NULL, we should not call notify_transmit_ready for this
274    * handle right now).
275    */
276   unsigned int cth_in_progress;
277
278   /**
279    * Number of entries in @e delayed_head DLL.
280    */
281   unsigned int delay_queue_size;
282
283   /**
284    * Respect rating for this peer on disk.
285    */
286   uint32_t disk_respect;
287
288   /**
289    * Which offset in "last_p2p_replies" will be updated next?
290    * (we go round-robin).
291    */
292   unsigned int last_p2p_replies_woff;
293
294   /**
295    * Which offset in "last_client_replies" will be updated next?
296    * (we go round-robin).
297    */
298   unsigned int last_client_replies_woff;
299
300   /**
301    * Current offset into 'last_request_times' ring buffer.
302    */
303   unsigned int last_request_times_off;
304
305   /**
306    * #GNUNET_YES if we did successfully reserve 32k bandwidth,
307    * #GNUNET_NO if not.
308    */
309   int did_reserve;
310
311   /**
312    * Function called when the creation of this record is complete.
313    */
314   GSF_ConnectedPeerCreationCallback creation_cb;
315
316   /**
317    * Closure for @e creation_cb
318    */
319   void *creation_cb_cls;
320
321   /**
322    * Handle to the PEERSTORE iterate request for peer respect value
323    */
324   struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
325
326 };
327
328
329 /**
330  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
331  */
332 static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
333
334 /**
335  * Handle to peerstore service.
336  */
337 static struct GNUNET_PEERSTORE_Handle *peerstore;
338
339
340 /**
341  * Update the latency information kept for the given peer.
342  *
343  * @param id peer record to update
344  * @param latency current latency value
345  */
346 void
347 GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
348                           struct GNUNET_TIME_Relative latency)
349 {
350   struct GSF_ConnectedPeer *cp;
351
352   cp = GSF_peer_get_ (id);
353   if (NULL == cp)
354     return; /* we're not yet connected at the core level, ignore */
355   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay, latency);
356 }
357
358
359 /**
360  * Return the performance data record for the given peer
361  *
362  * @param cp peer to query
363  * @return performance data record for the peer
364  */
365 struct GSF_PeerPerformanceData *
366 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
367 {
368   return &cp->ppd;
369 }
370
371
372 /**
373  * Core is ready to transmit to a peer, get the message.
374  *
375  * @param cls the `struct GSF_PeerTransmitHandle` of the message
376  * @param size number of bytes core is willing to take
377  * @param buf where to copy the message
378  * @return number of bytes copied to @a buf
379  */
380 static size_t
381 peer_transmit_ready_cb (void *cls,
382                         size_t size,
383                         void *buf);
384
385
386 /**
387  * Function called by core upon success or failure of our bandwidth reservation request.
388  *
389  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
390  * @param peer identifies the peer
391  * @param amount set to the amount that was actually reserved or unreserved;
392  *               either the full requested amount or zero (no partial reservations)
393  * @param res_delay if the reservation could not be satisfied (amount was 0), how
394  *        long should the client wait until re-trying?
395  */
396 static void
397 ats_reserve_callback (void *cls,
398                       const struct GNUNET_PeerIdentity *peer,
399                       int32_t amount,
400                       struct GNUNET_TIME_Relative res_delay);
401
402
403 /**
404  * If ready (bandwidth reserved), try to schedule transmission via
405  * core for the given handle.
406  *
407  * @param pth transmission handle to schedule
408  */
409 static void
410 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
411 {
412   struct GSF_ConnectedPeer *cp;
413   struct GNUNET_PeerIdentity target;
414
415   cp = pth->cp;
416   if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
417     return;                     /* already done */
418   GNUNET_assert (0 != cp->ppd.pid);
419   GNUNET_PEER_resolve (cp->ppd.pid, &target);
420
421   if (0 != cp->inc_preference)
422   {
423     GNUNET_ATS_performance_change_preference (GSF_ats,
424                                               &target,
425                                               GNUNET_ATS_PREFERENCE_BANDWIDTH,
426                                               (double) cp->inc_preference,
427                                               GNUNET_ATS_PREFERENCE_END);
428     cp->inc_preference = 0;
429   }
430
431   if ((GNUNET_YES == pth->is_query) && (GNUNET_YES != pth->was_reserved))
432   {
433     /* query, need reservation */
434     if (GNUNET_YES != cp->did_reserve)
435       return;                   /* not ready */
436     cp->did_reserve = GNUNET_NO;
437     /* reservation already done! */
438     pth->was_reserved = GNUNET_YES;
439     cp->rc =
440         GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
441                                       &ats_reserve_callback, cp);
442     return;
443   }
444   GNUNET_assert (NULL == cp->cth);
445   cp->cth_in_progress++;
446   cp->cth =
447     GNUNET_CORE_notify_transmit_ready (GSF_core,
448                                        GNUNET_YES,
449                                        GNUNET_CORE_PRIO_BACKGROUND,
450                                        GNUNET_TIME_absolute_get_remaining
451                                        (pth->timeout),
452                                        &target,
453                                        pth->size,
454                                        &peer_transmit_ready_cb, cp);
455   GNUNET_assert (NULL != cp->cth);
456   GNUNET_assert (0 < cp->cth_in_progress--);
457 }
458
459
460 /**
461  * Core is ready to transmit to a peer, get the message.
462  *
463  * @param cls the `struct GSF_PeerTransmitHandle` of the message
464  * @param size number of bytes core is willing to take
465  * @param buf where to copy the message
466  * @return number of bytes copied to @a buf
467  */
468 static size_t
469 peer_transmit_ready_cb (void *cls,
470                         size_t size,
471                         void *buf)
472 {
473   struct GSF_ConnectedPeer *cp = cls;
474   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
475   struct GSF_PeerTransmitHandle *pos;
476   size_t ret;
477
478   cp->cth = NULL;
479   if (NULL == pth)
480     return 0;
481   if (pth->size > size)
482   {
483     schedule_transmission (pth);
484     return 0;
485   }
486   if (NULL != pth->timeout_task)
487   {
488     GNUNET_SCHEDULER_cancel (pth->timeout_task);
489     pth->timeout_task = NULL;
490   }
491   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
492                                cp->pth_tail,
493                                pth);
494   if (GNUNET_YES == pth->is_query)
495   {
496     cp->ppd.last_request_times[(cp->last_request_times_off++) %
497                                MAX_QUEUE_PER_PEER] =
498         GNUNET_TIME_absolute_get ();
499     GNUNET_assert (0 < cp->ppd.pending_queries--);
500   }
501   else if (GNUNET_NO == pth->is_query)
502   {
503     GNUNET_assert (0 < cp->ppd.pending_replies--);
504   }
505   GNUNET_LOAD_update (cp->ppd.transmission_delay,
506                       GNUNET_TIME_absolute_get_duration
507                       (pth->transmission_request_start_time).rel_value_us);
508   ret = pth->gmc (pth->gmc_cls, size, buf);
509   if (NULL != (pos = cp->pth_head))
510   {
511     GNUNET_assert (pos != pth);
512     schedule_transmission (pos);
513   }
514   GNUNET_free (pth);
515   return ret;
516 }
517
518
519 /**
520  * (re)try to reserve bandwidth from the given peer.
521  *
522  * @param cls the `struct GSF_ConnectedPeer` to reserve from
523  * @param tc scheduler context
524  */
525 static void
526 retry_reservation (void *cls,
527                    const struct GNUNET_SCHEDULER_TaskContext *tc)
528 {
529   struct GSF_ConnectedPeer *cp = cls;
530   struct GNUNET_PeerIdentity target;
531
532   GNUNET_PEER_resolve (cp->ppd.pid, &target);
533   cp->rc_delay_task = NULL;
534   cp->rc =
535     GNUNET_ATS_reserve_bandwidth (GSF_ats,
536                                   &target,
537                                   DBLOCK_SIZE,
538                                   &ats_reserve_callback, cp);
539 }
540
541
542 /**
543  * Function called by core upon success or failure of our bandwidth reservation request.
544  *
545  * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
546  * @param peer identifies the peer
547  * @param amount set to the amount that was actually reserved or unreserved;
548  *               either the full requested amount or zero (no partial reservations)
549  * @param res_delay if the reservation could not be satisfied (amount was 0), how
550  *        long should the client wait until re-trying?
551  */
552 static void
553 ats_reserve_callback (void *cls,
554                       const struct GNUNET_PeerIdentity *peer,
555                       int32_t amount,
556                       struct GNUNET_TIME_Relative res_delay)
557 {
558   struct GSF_ConnectedPeer *cp = cls;
559   struct GSF_PeerTransmitHandle *pth;
560
561   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
562               "Reserved %d bytes / need to wait %s for reservation\n",
563               (int) amount,
564               GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
565   cp->rc = NULL;
566   if (0 == amount)
567   {
568     cp->rc_delay_task =
569         GNUNET_SCHEDULER_add_delayed (res_delay, &retry_reservation, cp);
570     return;
571   }
572   cp->did_reserve = GNUNET_YES;
573   pth = cp->pth_head;
574   if ((NULL != pth) && (NULL == cp->cth) && (0 == cp->cth_in_progress))
575   {
576     /* reservation success, try transmission now! */
577     cp->cth_in_progress++;
578     cp->cth =
579         GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
580                                            GNUNET_CORE_PRIO_BACKGROUND,
581                                            GNUNET_TIME_absolute_get_remaining
582                                            (pth->timeout), peer, pth->size,
583                                            &peer_transmit_ready_cb, cp);
584     GNUNET_assert (NULL != cp->cth);
585     GNUNET_assert (0 < cp->cth_in_progress--);
586   }
587 }
588
589
590 /**
591  * Function called by PEERSTORE with peer respect record
592  *
593  * @param cls handle to connected peer entry
594  * @param record peerstore record information
595  * @param emsg error message, or NULL if no errors
596  * @return #GNUNET_NO to stop iterating since we only expect 0 or 1 records
597  */
598 static int
599 peer_respect_cb (void *cls,
600                  const struct GNUNET_PEERSTORE_Record *record,
601                  const char *emsg)
602 {
603   struct GSF_ConnectedPeer *cp = cls;
604
605   cp->respect_iterate_req = NULL;
606   if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size))
607     cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value);
608   GSF_push_start_ (cp);
609   if (NULL != cp->creation_cb)
610     cp->creation_cb (cp->creation_cb_cls, cp);
611   return GNUNET_NO;
612 }
613
614
615 /**
616  * A peer connected to us.  Setup the connected peer
617  * records.
618  *
619  * @param peer identity of peer that connected
620  * @param creation_cb callback function when the record is created.
621  * @param creation_cb_cls closure for @creation_cb
622  */
623 void
624 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
625                            GSF_ConnectedPeerCreationCallback creation_cb,
626                            void *creation_cb_cls)
627 {
628   struct GSF_ConnectedPeer *cp;
629
630   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
631               "Connected to peer %s\n",
632               GNUNET_i2s (peer));
633   cp = GNUNET_new (struct GSF_ConnectedPeer);
634   cp->ppd.pid = GNUNET_PEER_intern (peer);
635   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
636   cp->rc =
637       GNUNET_ATS_reserve_bandwidth (GSF_ats, peer, DBLOCK_SIZE,
638                                     &ats_reserve_callback, cp);
639   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
640   GNUNET_break (GNUNET_OK ==
641                 GNUNET_CONTAINER_multipeermap_put (cp_map,
642                GSF_connected_peer_get_identity2_ (cp),
643                                                    cp,
644                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
645   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
646                          GNUNET_CONTAINER_multipeermap_size (cp_map),
647                          GNUNET_NO);
648   cp->creation_cb = creation_cb;
649   cp->creation_cb_cls = creation_cb_cls;
650   cp->respect_iterate_req =
651       GNUNET_PEERSTORE_iterate (peerstore, "fs", peer, "respect",
652                                 GNUNET_TIME_UNIT_FOREVER_REL, &peer_respect_cb,
653                                 cp);
654 }
655
656
657 /**
658  * It may be time to re-start migrating content to this
659  * peer.  Check, and if so, restart migration.
660  *
661  * @param cls the `struct GSF_ConnectedPeer`
662  * @param tc scheduler context
663  */
664 static void
665 revive_migration (void *cls,
666                   const struct GNUNET_SCHEDULER_TaskContext *tc)
667 {
668   struct GSF_ConnectedPeer *cp = cls;
669   struct GNUNET_TIME_Relative bt;
670
671   cp->mig_revive_task = NULL;
672   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
673   if (0 != bt.rel_value_us)
674   {
675     /* still time left... */
676     cp->mig_revive_task =
677         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
678     return;
679   }
680   GSF_push_start_ (cp);
681 }
682
683
684 /**
685  * Get a handle for a connected peer.
686  *
687  * @param peer peer's identity
688  * @return NULL if the peer is not currently connected
689  */
690 struct GSF_ConnectedPeer *
691 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
692 {
693   if (NULL == cp_map)
694     return NULL;
695   return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
696 }
697
698
699 /**
700  * Handle P2P "MIGRATION_STOP" message.
701  *
702  * @param cls closure, always NULL
703  * @param other the other peer involved (sender or receiver, NULL
704  *        for loopback messages where we are both sender and receiver)
705  * @param message the actual message
706  * @return #GNUNET_OK to keep the connection open,
707  *         #GNUNET_SYSERR to close it (signal serious error)
708  */
709 int
710 GSF_handle_p2p_migration_stop_ (void *cls,
711                                 const struct GNUNET_PeerIdentity *other,
712                                 const struct GNUNET_MessageHeader *message)
713 {
714   struct GSF_ConnectedPeer *cp;
715   const struct MigrationStopMessage *msm;
716   struct GNUNET_TIME_Relative bt;
717
718   msm = (const struct MigrationStopMessage *) message;
719   cp = GSF_peer_get_ (other);
720   if (NULL == cp)
721   {
722     GNUNET_break (0);
723     return GNUNET_OK;
724   }
725   GNUNET_STATISTICS_update (GSF_stats,
726                             gettext_noop ("# migration stop messages received"),
727                             1, GNUNET_NO);
728   bt = GNUNET_TIME_relative_ntoh (msm->duration);
729   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
730               _("Migration of content to peer `%s' blocked for %s\n"),
731               GNUNET_i2s (other),
732               GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
733   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
734   if (NULL == cp->mig_revive_task)
735   {
736     GSF_push_stop_ (cp);
737     cp->mig_revive_task =
738         GNUNET_SCHEDULER_add_delayed (bt,
739                                       &revive_migration, cp);
740   }
741   return GNUNET_OK;
742 }
743
744
745 /**
746  * Copy reply and free put message.
747  *
748  * @param cls the `struct PutMessage`
749  * @param buf_size number of bytes available in @a buf
750  * @param buf where to copy the message, NULL on error (peer disconnect)
751  * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
752  */
753 static size_t
754 copy_reply (void *cls,
755             size_t buf_size,
756             void *buf)
757 {
758   struct PutMessage *pm = cls;
759   size_t size;
760
761   if (NULL != buf)
762   {
763     GNUNET_assert (buf_size >= ntohs (pm->header.size));
764     size = ntohs (pm->header.size);
765     memcpy (buf, pm, size);
766     GNUNET_STATISTICS_update (GSF_stats,
767                               gettext_noop
768                               ("# replies transmitted to other peers"), 1,
769                               GNUNET_NO);
770   }
771   else
772   {
773     size = 0;
774     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies dropped"), 1,
775                               GNUNET_NO);
776   }
777   GNUNET_free (pm);
778   return size;
779 }
780
781
782 /**
783  * Free resources associated with the given peer request.
784  *
785  * @param peerreq request to free
786  * @param query associated key for the request
787  */
788 static void
789 free_pending_request (struct PeerRequest *peerreq,
790                       const struct GNUNET_HashCode *query)
791 {
792   struct GSF_ConnectedPeer *cp = peerreq->cp;
793
794   if (NULL != peerreq->kill_task)
795   {
796     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
797     peerreq->kill_task = NULL;
798   }
799   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
800                             -1, GNUNET_NO);
801   GNUNET_break (GNUNET_YES ==
802                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
803                                                       query, peerreq));
804   GNUNET_free (peerreq);
805 }
806
807
808 /**
809  * Cancel all requests associated with the peer.
810  *
811  * @param cls unused
812  * @param query hash code of the request
813  * @param value the `struct GSF_PendingRequest`
814  * @return #GNUNET_YES (continue to iterate)
815  */
816 static int
817 cancel_pending_request (void *cls,
818                         const struct GNUNET_HashCode *query,
819                         void *value)
820 {
821   struct PeerRequest *peerreq = value;
822   struct GSF_PendingRequest *pr = peerreq->pr;
823   struct GSF_PendingRequestData *prd;
824
825   prd = GSF_pending_request_get_data_ (pr);
826   GSF_pending_request_cancel_ (pr, GNUNET_NO);
827   free_pending_request (peerreq, &prd->query);
828   return GNUNET_OK;
829 }
830
831
832 /**
833  * Free the given request.
834  *
835  * @param cls the request to free
836  * @param tc task context
837  */
838 static void
839 peer_request_destroy (void *cls,
840                       const struct GNUNET_SCHEDULER_TaskContext *tc)
841 {
842   struct PeerRequest *peerreq = cls;
843   struct GSF_PendingRequest *pr = peerreq->pr;
844   struct GSF_PendingRequestData *prd;
845
846   peerreq->kill_task = NULL;
847   prd = GSF_pending_request_get_data_ (pr);
848   cancel_pending_request (NULL, &prd->query, peerreq);
849 }
850
851
852 /**
853  * The artificial delay is over, transmit the message now.
854  *
855  * @param cls the `struct GSF_DelayedHandle` with the message
856  * @param tc scheduler context
857  */
858 static void
859 transmit_delayed_now (void *cls,
860                       const struct GNUNET_SCHEDULER_TaskContext *tc)
861 {
862   struct GSF_DelayedHandle *dh = cls;
863   struct GSF_ConnectedPeer *cp = dh->cp;
864
865   GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
866                                cp->delayed_tail,
867                                dh);
868   cp->delay_queue_size--;
869   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
870   {
871     GNUNET_free (dh->pm);
872     GNUNET_free (dh);
873     return;
874   }
875   (void) GSF_peer_transmit_ (cp,
876                              GNUNET_NO,
877                              UINT32_MAX,
878                              REPLY_TIMEOUT,
879                              dh->msize,
880                              &copy_reply,
881                              dh->pm);
882   GNUNET_free (dh);
883 }
884
885
886 /**
887  * Get the randomized delay a response should be subjected to.
888  *
889  * @return desired delay
890  */
891 static struct GNUNET_TIME_Relative
892 get_randomized_delay ()
893 {
894   struct GNUNET_TIME_Relative ret;
895
896   ret =
897       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
898                                      GNUNET_CRYPTO_random_u32
899                                      (GNUNET_CRYPTO_QUALITY_WEAK,
900                                       2 * GSF_avg_latency.rel_value_us + 1));
901 #if INSANE_STATISTICS
902   GNUNET_STATISTICS_update (GSF_stats,
903                             gettext_noop
904                             ("# artificial delays introduced (ms)"),
905                             ret.rel_value_us / 1000LL, GNUNET_NO);
906 #endif
907   return ret;
908 }
909
910
911 /**
912  * Handle a reply to a pending request.  Also called if a request
913  * expires (then with data == NULL).  The handler may be called
914  * many times (depending on the request type), but will not be
915  * called during or after a call to GSF_pending_request_cancel
916  * and will also not be called anymore after a call signalling
917  * expiration.
918  *
919  * @param cls `struct PeerRequest` this is an answer for
920  * @param eval evaluation of the result
921  * @param pr handle to the original pending request
922  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
923  * @param expiration when does @a data expire?
924  * @param last_transmission when did we last transmit a request for this block
925  * @param type type of the block
926  * @param data response data, NULL on request expiration
927  * @param data_len number of bytes in @a data
928  */
929 static void
930 handle_p2p_reply (void *cls,
931                   enum GNUNET_BLOCK_EvaluationResult eval,
932                   struct GSF_PendingRequest *pr,
933                   uint32_t reply_anonymity_level,
934                   struct GNUNET_TIME_Absolute expiration,
935                   struct GNUNET_TIME_Absolute last_transmission,
936                   enum GNUNET_BLOCK_Type type,
937                   const void *data,
938                   size_t data_len)
939 {
940   struct PeerRequest *peerreq = cls;
941   struct GSF_ConnectedPeer *cp = peerreq->cp;
942   struct GSF_PendingRequestData *prd;
943   struct PutMessage *pm;
944   size_t msize;
945
946   GNUNET_assert (data_len + sizeof (struct PutMessage) <
947                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
948   GNUNET_assert (peerreq->pr == pr);
949   prd = GSF_pending_request_get_data_ (pr);
950   if (NULL == data)
951   {
952     free_pending_request (peerreq, &prd->query);
953     return;
954   }
955   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
956   if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
957   {
958     GNUNET_STATISTICS_update (GSF_stats,
959                               gettext_noop
960                               ("# replies dropped due to type mismatch"),
961                                 1, GNUNET_NO);
962     return;
963   }
964   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
965               "Transmitting result for query `%s' to peer\n",
966               GNUNET_h2s (&prd->query));
967   GNUNET_STATISTICS_update (GSF_stats,
968                             gettext_noop ("# replies received for other peers"),
969                             1, GNUNET_NO);
970   msize = sizeof (struct PutMessage) + data_len;
971   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
972   {
973     GNUNET_break (0);
974     return;
975   }
976   if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
977   {
978     if (reply_anonymity_level - 1 > GSF_cover_content_count)
979     {
980       GNUNET_STATISTICS_update (GSF_stats,
981                                 gettext_noop
982                                 ("# replies dropped due to insufficient cover traffic"),
983                                 1, GNUNET_NO);
984       return;
985     }
986     GSF_cover_content_count -= (reply_anonymity_level - 1);
987   }
988
989   pm = GNUNET_malloc (msize);
990   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
991   pm->header.size = htons (msize);
992   pm->type = htonl (type);
993   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
994   memcpy (&pm[1], data, data_len);
995   if ( (UINT32_MAX != reply_anonymity_level) &&
996        (0 != reply_anonymity_level) &&
997        (GNUNET_YES == GSF_enable_randomized_delays) )
998   {
999     struct GSF_DelayedHandle *dh;
1000
1001     dh = GNUNET_new (struct GSF_DelayedHandle);
1002     dh->cp = cp;
1003     dh->pm = pm;
1004     dh->msize = msize;
1005     GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
1006                                  cp->delayed_tail,
1007                                  dh);
1008     cp->delay_queue_size++;
1009     dh->delay_task =
1010         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
1011                                       &transmit_delayed_now,
1012                                       dh);
1013   }
1014   else
1015   {
1016     (void) GSF_peer_transmit_ (cp,
1017                                GNUNET_NO,
1018                                UINT32_MAX,
1019                                REPLY_TIMEOUT,
1020                                msize,
1021                                &copy_reply,
1022                                pm);
1023   }
1024   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
1025     return;
1026   if (NULL == peerreq->kill_task)
1027   {
1028     GNUNET_STATISTICS_update (GSF_stats,
1029                               gettext_noop
1030                               ("# P2P searches destroyed due to ultimate reply"),
1031                               1, GNUNET_NO);
1032     peerreq->kill_task =
1033         GNUNET_SCHEDULER_add_now (&peer_request_destroy, peerreq);
1034   }
1035 }
1036
1037
1038 /**
1039  * Increase the peer's respect by a value.
1040  *
1041  * @param cp which peer to change the respect value on
1042  * @param value is the int value by which the
1043  *  peer's credit is to be increased or decreased
1044  * @returns the actual change in respect (positive or negative)
1045  */
1046 static int
1047 change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
1048 {
1049   if (0 == value)
1050     return 0;
1051   GNUNET_assert (NULL != cp);
1052   if (value > 0)
1053   {
1054     if (cp->ppd.respect + value < cp->ppd.respect)
1055     {
1056       value = UINT32_MAX - cp->ppd.respect;
1057       cp->ppd.respect = UINT32_MAX;
1058     }
1059     else
1060       cp->ppd.respect += value;
1061   }
1062   else
1063   {
1064     if (cp->ppd.respect < -value)
1065     {
1066       value = -cp->ppd.respect;
1067       cp->ppd.respect = 0;
1068     }
1069     else
1070       cp->ppd.respect += value;
1071   }
1072   return value;
1073 }
1074
1075
1076 /**
1077  * We've received a request with the specified priority.  Bound it
1078  * according to how much we respect the given peer.
1079  *
1080  * @param prio_in requested priority
1081  * @param cp the peer making the request
1082  * @return effective priority
1083  */
1084 static int32_t
1085 bound_priority (uint32_t prio_in, struct GSF_ConnectedPeer *cp)
1086 {
1087 #define N ((double)128.0)
1088   uint32_t ret;
1089   double rret;
1090   int ld;
1091
1092   ld = GSF_test_get_load_too_high_ (0);
1093   if (GNUNET_SYSERR == ld)
1094   {
1095 #if INSANE_STATISTICS
1096     GNUNET_STATISTICS_update (GSF_stats,
1097                               gettext_noop
1098                               ("# requests done for free (low load)"), 1,
1099                               GNUNET_NO);
1100 #endif
1101     return 0;                   /* excess resources */
1102   }
1103   if (prio_in > INT32_MAX)
1104     prio_in = INT32_MAX;
1105   ret = -change_peer_respect (cp, -(int) prio_in);
1106   if (ret > 0)
1107   {
1108     if (ret > GSF_current_priorities + N)
1109       rret = GSF_current_priorities + N;
1110     else
1111       rret = ret;
1112     GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1113   }
1114   if ((GNUNET_YES == ld) && (ret > 0))
1115   {
1116     /* try with charging */
1117     ld = GSF_test_get_load_too_high_ (ret);
1118   }
1119   if (GNUNET_YES == ld)
1120   {
1121     GNUNET_STATISTICS_update (GSF_stats,
1122                               gettext_noop
1123                               ("# request dropped, priority insufficient"), 1,
1124                               GNUNET_NO);
1125     /* undo charge */
1126     change_peer_respect (cp, (int) ret);
1127     return -1;                  /* not enough resources */
1128   }
1129   else
1130   {
1131     GNUNET_STATISTICS_update (GSF_stats,
1132                               gettext_noop
1133                               ("# requests done for a price (normal load)"), 1,
1134                               GNUNET_NO);
1135   }
1136 #undef N
1137   return ret;
1138 }
1139
1140
1141 /**
1142  * The priority level imposes a bound on the maximum
1143  * value for the ttl that can be requested.
1144  *
1145  * @param ttl_in requested ttl
1146  * @param prio given priority
1147  * @return ttl_in if ttl_in is below the limit,
1148  *         otherwise the ttl-limit for the given priority
1149  */
1150 static int32_t
1151 bound_ttl (int32_t ttl_in, uint32_t prio)
1152 {
1153   unsigned long long allowed;
1154
1155   if (ttl_in <= 0)
1156     return ttl_in;
1157   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1158   if (ttl_in > allowed)
1159   {
1160     if (allowed >= (1 << 30))
1161       return 1 << 30;
1162     return allowed;
1163   }
1164   return ttl_in;
1165 }
1166
1167
1168 /**
1169  * Handle P2P "QUERY" message.  Creates the pending request entry
1170  * and sets up all of the data structures to that we will
1171  * process replies properly.  Does not initiate forwarding or
1172  * local database lookups.
1173  *
1174  * @param other the other peer involved (sender or receiver, NULL
1175  *        for loopback messages where we are both sender and receiver)
1176  * @param message the actual message
1177  * @return pending request handle, NULL on error
1178  */
1179 struct GSF_PendingRequest *
1180 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1181                        const struct GNUNET_MessageHeader *message)
1182 {
1183   struct PeerRequest *peerreq;
1184   struct GSF_PendingRequest *pr;
1185   struct GSF_PendingRequestData *prd;
1186   struct GSF_ConnectedPeer *cp;
1187   struct GSF_ConnectedPeer *cps;
1188   const struct GNUNET_PeerIdentity *target;
1189   enum GSF_PendingRequestOptions options;
1190   uint16_t msize;
1191   const struct GetMessage *gm;
1192   unsigned int bits;
1193   const struct GNUNET_PeerIdentity *opt;
1194   uint32_t bm;
1195   size_t bfsize;
1196   uint32_t ttl_decrement;
1197   int32_t priority;
1198   int32_t ttl;
1199   enum GNUNET_BLOCK_Type type;
1200   GNUNET_PEER_Id spid;
1201
1202   msize = ntohs (message->size);
1203   if (msize < sizeof (struct GetMessage))
1204   {
1205     GNUNET_break_op (0);
1206     return NULL;
1207   }
1208   GNUNET_STATISTICS_update (GSF_stats,
1209                             gettext_noop
1210                             ("# GET requests received (from other peers)"),
1211                             1,
1212                             GNUNET_NO);
1213   gm = (const struct GetMessage *) message;
1214   type = ntohl (gm->type);
1215   bm = ntohl (gm->hash_bitmap);
1216   bits = 0;
1217   while (bm > 0)
1218   {
1219     if (1 == (bm & 1))
1220       bits++;
1221     bm >>= 1;
1222   }
1223   if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
1224   {
1225     GNUNET_break_op (0);
1226     return NULL;
1227   }
1228   opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1229   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
1230   /* bfsize must be power of 2, check! */
1231   if (0 != ((bfsize - 1) & bfsize))
1232   {
1233     GNUNET_break_op (0);
1234     return NULL;
1235   }
1236   GSF_cover_query_count++;
1237   bm = ntohl (gm->hash_bitmap);
1238   bits = 0;
1239   cps = GSF_peer_get_ (other);
1240   if (NULL == cps)
1241   {
1242     /* peer must have just disconnected */
1243     GNUNET_STATISTICS_update (GSF_stats,
1244                               gettext_noop
1245                               ("# requests dropped due to initiator not being connected"),
1246                               1, GNUNET_NO);
1247     return NULL;
1248   }
1249   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1250     cp = GSF_peer_get_ (&opt[bits++]);
1251   else
1252     cp = cps;
1253   if (NULL == cp)
1254   {
1255     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1256       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1257                   "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1258                   GNUNET_i2s (&opt[bits - 1]));
1259
1260     else
1261       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1262                   "Failed to find peer `%s' in connection set. Dropping query.\n",
1263                   GNUNET_i2s (other));
1264     GNUNET_STATISTICS_update (GSF_stats,
1265                               gettext_noop
1266                               ("# requests dropped due to missing reverse route"),
1267                               1,
1268                               GNUNET_NO);
1269     return NULL;
1270   }
1271   if (cp->ppd.pending_replies + cp->delay_queue_size > 128)
1272   {
1273     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1274                 "Peer `%s' has too many replies queued already. Dropping query.\n",
1275                 GNUNET_i2s (other));
1276     GNUNET_STATISTICS_update (GSF_stats,
1277                               gettext_noop ("# requests dropped due to full reply queue"),
1278                               1,
1279                               GNUNET_NO);
1280     return NULL;
1281   }
1282   /* note that we can really only check load here since otherwise
1283    * peers could find out that we are overloaded by not being
1284    * disconnected after sending us a malformed query... */
1285   priority = bound_priority (ntohl (gm->priority),
1286                              cps);
1287   if (priority < 0)
1288   {
1289     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1290                 "Dropping query from `%s', this peer is too busy.\n",
1291                 GNUNET_i2s (other));
1292     return NULL;
1293   }
1294   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1295               "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1296               GNUNET_h2s (&gm->query),
1297               (unsigned int) type,
1298               GNUNET_i2s (other),
1299               (unsigned int) bm);
1300   target =
1301       (0 !=
1302        (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1303   options = GSF_PRO_DEFAULTS;
1304   spid = 0;
1305   if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
1306       || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1307           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
1308           GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1309   {
1310     /* don't have BW to send to peer, or would likely take longer than we have for it,
1311      * so at best indirect the query */
1312     priority = 0;
1313     options |= GSF_PRO_FORWARD_ONLY;
1314     spid = GNUNET_PEER_intern (other);
1315     GNUNET_assert (0 != spid);
1316   }
1317   ttl = bound_ttl (ntohl (gm->ttl), priority);
1318   /* decrement ttl (always) */
1319   ttl_decrement =
1320       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1321                                                     TTL_DECREMENT);
1322   if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
1323   {
1324     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1325                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1326                 GNUNET_i2s (other), ttl, ttl_decrement);
1327     GNUNET_STATISTICS_update (GSF_stats,
1328                               gettext_noop
1329                               ("# requests dropped due TTL underflow"), 1,
1330                               GNUNET_NO);
1331     /* integer underflow => drop (should be very rare)! */
1332     return NULL;
1333   }
1334   ttl -= ttl_decrement;
1335
1336   /* test if the request already exists */
1337   peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map, &gm->query);
1338   if (peerreq != NULL)
1339   {
1340     pr = peerreq->pr;
1341     prd = GSF_pending_request_get_data_ (pr);
1342     if (prd->type == type)
1343     {
1344       if (prd->ttl.abs_value_us >= GNUNET_TIME_absolute_get ().abs_value_us + ttl * 1000LL)
1345       {
1346         /* existing request has higher TTL, drop new one! */
1347         prd->priority += priority;
1348         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349                     "Have existing request with higher TTL, dropping new request.\n",
1350                     GNUNET_i2s (other));
1351         GNUNET_STATISTICS_update (GSF_stats,
1352                                   gettext_noop
1353                                   ("# requests dropped due to higher-TTL request"),
1354                                   1, GNUNET_NO);
1355         return NULL;
1356       }
1357       /* existing request has lower TTL, drop old one! */
1358       priority += prd->priority;
1359       GSF_pending_request_cancel_ (pr, GNUNET_YES);
1360       free_pending_request (peerreq, &gm->query);
1361     }
1362   }
1363
1364   peerreq = GNUNET_new (struct PeerRequest);
1365   peerreq->cp = cp;
1366   pr = GSF_pending_request_create_ (options, type, &gm->query,
1367                                     target,
1368                                     (bfsize >
1369                                      0) ? (const char *) &opt[bits] : NULL,
1370                                     bfsize, ntohl (gm->filter_mutator),
1371                                     1 /* anonymity */ ,
1372                                     (uint32_t) priority, ttl, spid, GNUNET_PEER_intern (other), NULL, 0,        /* replies_seen */
1373                                     &handle_p2p_reply, peerreq);
1374   GNUNET_assert (NULL != pr);
1375   peerreq->pr = pr;
1376   GNUNET_break (GNUNET_OK ==
1377                 GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query,
1378                                                    peerreq,
1379                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1380   GNUNET_STATISTICS_update (GSF_stats,
1381                             gettext_noop
1382                             ("# P2P query messages received and processed"), 1,
1383                             GNUNET_NO);
1384   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
1385                             1, GNUNET_NO);
1386   return pr;
1387 }
1388
1389
1390 /**
1391  * Function called if there has been a timeout trying to satisfy
1392  * a transmission request.
1393  *
1394  * @param cls the `struct GSF_PeerTransmitHandle` of the request
1395  * @param tc scheduler context
1396  */
1397 static void
1398 peer_transmit_timeout (void *cls,
1399                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1400 {
1401   struct GSF_PeerTransmitHandle *pth = cls;
1402   struct GSF_ConnectedPeer *cp;
1403
1404   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1405               "Timeout trying to transmit to other peer\n");
1406   pth->timeout_task = NULL;
1407   cp = pth->cp;
1408   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1409                                cp->pth_tail,
1410                                pth);
1411   if (GNUNET_YES == pth->is_query)
1412     GNUNET_assert (0 < cp->ppd.pending_queries--);
1413   else if (GNUNET_NO == pth->is_query)
1414     GNUNET_assert (0 < cp->ppd.pending_replies--);
1415   GNUNET_LOAD_update (cp->ppd.transmission_delay, UINT64_MAX);
1416   if (NULL != cp->cth)
1417   {
1418     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1419     cp->cth = NULL;
1420   }
1421   pth->gmc (pth->gmc_cls, 0, NULL);
1422   GNUNET_assert (0 == cp->cth_in_progress);
1423   GNUNET_free (pth);
1424 }
1425
1426
1427 /**
1428  * Transmit a message to the given peer as soon as possible.
1429  * If the peer disconnects before the transmission can happen,
1430  * the callback is invoked with a `NULL` @a buffer.
1431  *
1432  * @param cp target peer
1433  * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1434  * @param priority how important is this request?
1435  * @param timeout when does this request timeout (call gmc with error)
1436  * @param size number of bytes we would like to send to the peer
1437  * @param gmc function to call to get the message
1438  * @param gmc_cls closure for @a gmc
1439  * @return handle to cancel request
1440  */
1441 struct GSF_PeerTransmitHandle *
1442 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1443                     int is_query,
1444                     uint32_t priority,
1445                     struct GNUNET_TIME_Relative timeout,
1446                     size_t size,
1447                     GSF_GetMessageCallback gmc, void *gmc_cls)
1448 {
1449   struct GSF_PeerTransmitHandle *pth;
1450   struct GSF_PeerTransmitHandle *pos;
1451   struct GSF_PeerTransmitHandle *prev;
1452
1453   pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1454   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1455   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1456   pth->gmc = gmc;
1457   pth->gmc_cls = gmc_cls;
1458   pth->size = size;
1459   pth->is_query = is_query;
1460   pth->priority = priority;
1461   pth->cp = cp;
1462   /* insertion sort (by priority, descending) */
1463   prev = NULL;
1464   pos = cp->pth_head;
1465   while ((NULL != pos) && (pos->priority > priority))
1466   {
1467     prev = pos;
1468     pos = pos->next;
1469   }
1470   GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1471                                      cp->pth_tail,
1472                                      prev,
1473                                      pth);
1474   if (GNUNET_YES == is_query)
1475     cp->ppd.pending_queries++;
1476   else if (GNUNET_NO == is_query)
1477     cp->ppd.pending_replies++;
1478   pth->timeout_task
1479     = GNUNET_SCHEDULER_add_delayed (timeout,
1480                                     &peer_transmit_timeout,
1481                                     pth);
1482   schedule_transmission (pth);
1483   return pth;
1484 }
1485
1486
1487 /**
1488  * Cancel an earlier request for transmission.
1489  *
1490  * @param pth request to cancel
1491  */
1492 void
1493 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1494 {
1495   struct GSF_ConnectedPeer *cp;
1496
1497   if (NULL != pth->timeout_task)
1498   {
1499     GNUNET_SCHEDULER_cancel (pth->timeout_task);
1500     pth->timeout_task = NULL;
1501   }
1502   cp = pth->cp;
1503   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1504                                cp->pth_tail,
1505                                pth);
1506   if (GNUNET_YES == pth->is_query)
1507     GNUNET_assert (0 < cp->ppd.pending_queries--);
1508   else if (GNUNET_NO == pth->is_query)
1509     GNUNET_assert (0 < cp->ppd.pending_replies--);
1510   GNUNET_free (pth);
1511 }
1512
1513
1514 /**
1515  * Report on receiving a reply; update the performance record of the given peer.
1516  *
1517  * @param cp responding peer (will be updated)
1518  * @param request_time time at which the original query was transmitted
1519  * @param request_priority priority of the original request
1520  */
1521 void
1522 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1523                               struct GNUNET_TIME_Absolute request_time,
1524                               uint32_t request_priority)
1525 {
1526   struct GNUNET_TIME_Relative delay;
1527
1528   delay = GNUNET_TIME_absolute_get_duration (request_time);
1529   cp->ppd.avg_reply_delay.rel_value_us =
1530       (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
1531        delay.rel_value_us) / RUNAVG_DELAY_N;
1532   cp->ppd.avg_priority =
1533       (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1534        request_priority) / RUNAVG_DELAY_N;
1535 }
1536
1537
1538 /**
1539  * Report on receiving a reply in response to an initiating client.
1540  * Remember that this peer is good for this client.
1541  *
1542  * @param cp responding peer (will be updated)
1543  * @param initiator_client local client on responsible for query
1544  */
1545 void
1546 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1547                                    struct GSF_LocalClient *initiator_client)
1548 {
1549   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1550                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1551 }
1552
1553
1554 /**
1555  * Report on receiving a reply in response to an initiating peer.
1556  * Remember that this peer is good for this initiating peer.
1557  *
1558  * @param cp responding peer (will be updated)
1559  * @param initiator_peer other peer responsible for query
1560  */
1561 void
1562 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1563                                  const struct GSF_ConnectedPeer *initiator_peer)
1564 {
1565   unsigned int woff;
1566
1567   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1568   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1569   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1570   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1571   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1572 }
1573
1574
1575 /**
1576  * Write peer-respect information to a file - flush the buffer entry!
1577  *
1578  * @param cls unused
1579  * @param key peer identity
1580  * @param value the `struct GSF_ConnectedPeer` to flush
1581  * @return #GNUNET_OK to continue iteration
1582  */
1583 static int
1584 flush_respect (void *cls,
1585                const struct GNUNET_PeerIdentity *key,
1586                void *value)
1587 {
1588   struct GSF_ConnectedPeer *cp = value;
1589   struct GNUNET_PeerIdentity pid;
1590
1591   if (cp->ppd.respect == cp->disk_respect)
1592     return GNUNET_OK;           /* unchanged */
1593   GNUNET_assert (0 != cp->ppd.pid);
1594   GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1595   GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1596                           sizeof (cp->ppd.respect),
1597                           GNUNET_TIME_UNIT_FOREVER_ABS,
1598                           GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL);
1599   return GNUNET_OK;
1600 }
1601
1602
1603 /**
1604  * A peer disconnected from us.  Tear down the connected peer
1605  * record.
1606  *
1607  * @param cls unused
1608  * @param peer identity of peer that connected
1609  */
1610 void
1611 GSF_peer_disconnect_handler_ (void *cls,
1612                               const struct GNUNET_PeerIdentity *peer)
1613 {
1614   struct GSF_ConnectedPeer *cp;
1615   struct GSF_PeerTransmitHandle *pth;
1616   struct GSF_DelayedHandle *dh;
1617
1618   cp = GSF_peer_get_ (peer);
1619   if (NULL == cp)
1620     return;                     /* must have been disconnect from core with
1621                                  * 'peer' == my_id, ignore */
1622   flush_respect (NULL, peer, cp);
1623   GNUNET_assert (GNUNET_YES ==
1624                  GNUNET_CONTAINER_multipeermap_remove (cp_map,
1625                                                        peer,
1626                                                        cp));
1627   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1628                          GNUNET_CONTAINER_multipeermap_size (cp_map),
1629                          GNUNET_NO);
1630   if (NULL != cp->respect_iterate_req)
1631   {
1632     GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1633     cp->respect_iterate_req = NULL;
1634   }
1635   if (NULL != cp->migration_pth)
1636   {
1637     GSF_peer_transmit_cancel_ (cp->migration_pth);
1638     cp->migration_pth = NULL;
1639   }
1640   if (NULL != cp->rc)
1641   {
1642     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1643     cp->rc = NULL;
1644   }
1645   if (NULL != cp->rc_delay_task)
1646   {
1647     GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1648     cp->rc_delay_task = NULL;
1649   }
1650   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1651                                          &cancel_pending_request, cp);
1652   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1653   cp->request_map = NULL;
1654   GSF_plan_notify_peer_disconnect_ (cp);
1655   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1656   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1657   memset (cp->ppd.last_p2p_replies, 0, sizeof (cp->ppd.last_p2p_replies));
1658   GSF_push_stop_ (cp);
1659   if (NULL != cp->cth)
1660   {
1661     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1662     cp->cth = NULL;
1663   }
1664   GNUNET_assert (0 == cp->cth_in_progress);
1665   while (NULL != (pth = cp->pth_head))
1666   {
1667     if (pth->timeout_task != NULL)
1668     {
1669       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1670       pth->timeout_task = NULL;
1671     }
1672     GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1673                                  cp->pth_tail,
1674                                  pth);
1675     if (GNUNET_YES == pth->is_query)
1676       GNUNET_assert (0 < cp->ppd.pending_queries--);
1677     else if (GNUNET_NO == pth->is_query)
1678       GNUNET_assert (0 < cp->ppd.pending_replies--);
1679     pth->gmc (pth->gmc_cls, 0, NULL);
1680     GNUNET_free (pth);
1681   }
1682   while (NULL != (dh = cp->delayed_head))
1683   {
1684     GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1685                                  cp->delayed_tail,
1686                                  dh);
1687     cp->delay_queue_size--;
1688     GNUNET_SCHEDULER_cancel (dh->delay_task);
1689     GNUNET_free (dh->pm);
1690     GNUNET_free (dh);
1691   }
1692   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1693   if (NULL != cp->mig_revive_task)
1694   {
1695     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1696     cp->mig_revive_task = NULL;
1697   }
1698   GNUNET_break (0 == cp->ppd.pending_queries);
1699   GNUNET_break (0 == cp->ppd.pending_replies);
1700   GNUNET_free (cp);
1701 }
1702
1703
1704 /**
1705  * Closure for #call_iterator().
1706  */
1707 struct IterationContext
1708 {
1709   /**
1710    * Function to call on each entry.
1711    */
1712   GSF_ConnectedPeerIterator it;
1713
1714   /**
1715    * Closure for @e it.
1716    */
1717   void *it_cls;
1718 };
1719
1720
1721 /**
1722  * Function that calls the callback for each peer.
1723  *
1724  * @param cls the `struct IterationContext *`
1725  * @param key identity of the peer
1726  * @param value the `struct GSF_ConnectedPeer *`
1727  * @return #GNUNET_YES to continue iteration
1728  */
1729 static int
1730 call_iterator (void *cls,
1731                const struct GNUNET_PeerIdentity *key,
1732                void *value)
1733 {
1734   struct IterationContext *ic = cls;
1735   struct GSF_ConnectedPeer *cp = value;
1736
1737   ic->it (ic->it_cls,
1738           key, cp,
1739           &cp->ppd);
1740   return GNUNET_YES;
1741 }
1742
1743
1744 /**
1745  * Iterate over all connected peers.
1746  *
1747  * @param it function to call for each peer
1748  * @param it_cls closure for @a it
1749  */
1750 void
1751 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1752                               void *it_cls)
1753 {
1754   struct IterationContext ic;
1755
1756   ic.it = it;
1757   ic.it_cls = it_cls;
1758   GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1759                                          &call_iterator,
1760                                          &ic);
1761 }
1762
1763
1764 /**
1765  * Obtain the identity of a connected peer.
1766  *
1767  * @param cp peer to get identity of
1768  * @param id identity to set (written to)
1769  */
1770 void
1771 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1772                                   struct GNUNET_PeerIdentity *id)
1773 {
1774   GNUNET_assert (0 != cp->ppd.pid);
1775   GNUNET_PEER_resolve (cp->ppd.pid, id);
1776 }
1777
1778
1779 /**
1780  * Obtain the identity of a connected peer.
1781  *
1782  * @param cp peer to get identity of
1783  * @return reference to peer identity, valid until peer disconnects (!)
1784  */
1785 const struct GNUNET_PeerIdentity *
1786 GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1787 {
1788   GNUNET_assert (0 != cp->ppd.pid);
1789   return GNUNET_PEER_resolve2 (cp->ppd.pid);
1790 }
1791
1792
1793 /**
1794  * Assemble a migration stop message for transmission.
1795  *
1796  * @param cls the `struct GSF_ConnectedPeer` to use
1797  * @param size number of bytes we're allowed to write to @a buf
1798  * @param buf where to copy the message
1799  * @return number of bytes copied to @a buf
1800  */
1801 static size_t
1802 create_migration_stop_message (void *cls,
1803                                size_t size,
1804                                void *buf)
1805 {
1806   struct GSF_ConnectedPeer *cp = cls;
1807   struct MigrationStopMessage msm;
1808
1809   cp->migration_pth = NULL;
1810   if (NULL == buf)
1811     return 0;
1812   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1813   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1814   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1815   msm.reserved = htonl (0);
1816   msm.duration =
1817       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1818                                  (cp->last_migration_block));
1819   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1820   GNUNET_STATISTICS_update (GSF_stats,
1821                             gettext_noop ("# migration stop messages sent"),
1822                             1, GNUNET_NO);
1823   return sizeof (struct MigrationStopMessage);
1824 }
1825
1826
1827 /**
1828  * Ask a peer to stop migrating data to us until the given point
1829  * in time.
1830  *
1831  * @param cp peer to ask
1832  * @param block_time until when to block
1833  */
1834 void
1835 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1836                            struct GNUNET_TIME_Absolute block_time)
1837 {
1838   if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1839   {
1840     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1841                 "Migration already blocked for another %s\n",
1842                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
1843                                                         (cp->last_migration_block), GNUNET_YES));
1844     return;                     /* already blocked */
1845   }
1846   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1847               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
1848                                                       GNUNET_YES));
1849   cp->last_migration_block = block_time;
1850   if (NULL != cp->migration_pth)
1851     GSF_peer_transmit_cancel_ (cp->migration_pth);
1852   cp->migration_pth =
1853       GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1854                           GNUNET_TIME_UNIT_FOREVER_REL,
1855                           sizeof (struct MigrationStopMessage),
1856                           &create_migration_stop_message, cp);
1857 }
1858
1859
1860 /**
1861  * Notify core about a preference we have for the given peer
1862  * (to allocate more resources towards it).  The change will
1863  * be communicated the next time we reserve bandwidth with
1864  * core (not instantly).
1865  *
1866  * @param cp peer to reserve bandwidth from
1867  * @param pref preference change
1868  */
1869 void
1870 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1871                                        uint64_t pref)
1872 {
1873   cp->inc_preference += pref;
1874 }
1875
1876
1877 /**
1878  * Call this method periodically to flush respect information to disk.
1879  *
1880  * @param cls closure, not used
1881  * @param tc task context, not used
1882  */
1883 static void
1884 cron_flush_respect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1885 {
1886
1887   if (NULL == cp_map)
1888     return;
1889   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &flush_respect, NULL);
1890   if (NULL == tc)
1891     return;
1892   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1893     return;
1894   GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1895                                               GNUNET_SCHEDULER_PRIORITY_HIGH,
1896                                               &cron_flush_respect, NULL);
1897 }
1898
1899
1900 /**
1901  * Initialize peer management subsystem.
1902  */
1903 void
1904 GSF_connected_peer_init_ ()
1905 {
1906   cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1907   peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
1908   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1909                                       &cron_flush_respect, NULL);
1910 }
1911
1912
1913 /**
1914  * Iterator to free peer entries.
1915  *
1916  * @param cls closure, unused
1917  * @param key current key code
1918  * @param value value in the hash map (peer entry)
1919  * @return #GNUNET_YES (we should continue to iterate)
1920  */
1921 static int
1922 clean_peer (void *cls,
1923             const struct GNUNET_PeerIdentity *key,
1924             void *value)
1925 {
1926   GSF_peer_disconnect_handler_ (NULL, key);
1927   return GNUNET_YES;
1928 }
1929
1930
1931 /**
1932  * Shutdown peer management subsystem.
1933  */
1934 void
1935 GSF_connected_peer_done_ ()
1936 {
1937   cron_flush_respect (NULL, NULL);
1938   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_peer, NULL);
1939   GNUNET_CONTAINER_multipeermap_destroy (cp_map);
1940   cp_map = NULL;
1941   GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
1942 }
1943
1944
1945 /**
1946  * Iterator to remove references to LC entry.
1947  *
1948  * @param cls the `struct GSF_LocalClient *` to look for
1949  * @param key current key code
1950  * @param value value in the hash map (peer entry)
1951  * @return #GNUNET_YES (we should continue to iterate)
1952  */
1953 static int
1954 clean_local_client (void *cls,
1955                     const struct GNUNET_PeerIdentity *key,
1956                     void *value)
1957 {
1958   const struct GSF_LocalClient *lc = cls;
1959   struct GSF_ConnectedPeer *cp = value;
1960   unsigned int i;
1961
1962   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1963     if (cp->ppd.last_client_replies[i] == lc)
1964       cp->ppd.last_client_replies[i] = NULL;
1965   return GNUNET_YES;
1966 }
1967
1968
1969 /**
1970  * Notification that a local client disconnected.  Clean up all of our
1971  * references to the given handle.
1972  *
1973  * @param lc handle to the local client (henceforth invalid)
1974  */
1975 void
1976 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1977 {
1978   if (NULL == cp_map)
1979     return;                     /* already cleaned up */
1980   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client,
1981                                          (void *) lc);
1982 }
1983
1984
1985 /* end of gnunet-service-fs_cp.c */