f031688c32f066efc0e13dbbd25eebb770d88541
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_load_lib.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
31 #include "gnunet-service-fs_pe.h"
32 #include "gnunet-service-fs_pr.h"
33 #include "gnunet-service-fs_push.h"
34 #include "gnunet_peerstore_service.h"
35
36
37 /**
38  * Ratio for moving average delay calculation.  The previous
39  * average goes in with a factor of (n-1) into the calculation.
40  * Must be > 0.
41  */
42 #define RUNAVG_DELAY_N 16
43
44 /**
45  * How often do we flush respect values to disk?
46  */
47 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
48
49 /**
50  * After how long do we discard a reply?
51  */
52 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
53
54 /**
55  * Collect an instane number of statistics?  May cause excessive IPC.
56  */
57 #define INSANE_STATISTICS GNUNET_NO
58
59
60 /**
61  * Handle to cancel a transmission request.
62  */
63 struct GSF_PeerTransmitHandle
64 {
65
66   /**
67    * Kept in a doubly-linked list.
68    */
69   struct GSF_PeerTransmitHandle *next;
70
71   /**
72    * Kept in a doubly-linked list.
73    */
74   struct GSF_PeerTransmitHandle *prev;
75
76   /**
77    * Time when this transmission request was issued.
78    */
79   struct GNUNET_TIME_Absolute transmission_request_start_time;
80
81   /**
82    * Timeout for this request.
83    */
84   struct GNUNET_TIME_Absolute timeout;
85
86   /**
87    * Task called on timeout, or 0 for none.
88    */
89   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
90
91   /**
92    * Function to call to get the actual message.
93    */
94   GSF_GetMessageCallback gmc;
95
96   /**
97    * Peer this request targets.
98    */
99   struct GSF_ConnectedPeer *cp;
100
101   /**
102    * Closure for @e gmc.
103    */
104   void *gmc_cls;
105
106   /**
107    * Size of the message to be transmitted.
108    */
109   size_t size;
110
111   /**
112    * #GNUNET_YES if this is a query, #GNUNET_NO for content.
113    */
114   int is_query;
115
116   /**
117    * Did we get a reservation already?
118    */
119   int was_reserved;
120
121   /**
122    * Priority of this request.
123    */
124   uint32_t priority;
125
126 };
127
128
129 /**
130  * Handle for an entry in our delay list.
131  */
132 struct GSF_DelayedHandle
133 {
134
135   /**
136    * Kept in a doubly-linked list.
137    */
138   struct GSF_DelayedHandle *next;
139
140   /**
141    * Kept in a doubly-linked list.
142    */
143   struct GSF_DelayedHandle *prev;
144
145   /**
146    * Peer this transmission belongs to.
147    */
148   struct GSF_ConnectedPeer *cp;
149
150   /**
151    * The PUT that was delayed.
152    */
153   struct PutMessage *pm;
154
155   /**
156    * Task for the delay.
157    */
158   GNUNET_SCHEDULER_TaskIdentifier delay_task;
159
160   /**
161    * Size of the message.
162    */
163   size_t msize;
164
165 };
166
167
168 /**
169  * Information per peer and request.
170  */
171 struct PeerRequest
172 {
173
174   /**
175    * Handle to generic request.
176    */
177   struct GSF_PendingRequest *pr;
178
179   /**
180    * Handle to specific peer.
181    */
182   struct GSF_ConnectedPeer *cp;
183
184   /**
185    * Task for asynchronous stopping of this request.
186    */
187   GNUNET_SCHEDULER_TaskIdentifier kill_task;
188
189 };
190
191
192 /**
193  * A connected peer.
194  */
195 struct GSF_ConnectedPeer
196 {
197
198   /**
199    * Performance data for this peer.
200    */
201   struct GSF_PeerPerformanceData ppd;
202
203   /**
204    * Time until when we blocked this peer from migrating
205    * data to us.
206    */
207   struct GNUNET_TIME_Absolute last_migration_block;
208
209   /**
210    * Task scheduled to revive migration to this peer.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
213
214   /**
215    * Messages (replies, queries, content migration) we would like to
216    * send to this peer in the near future.  Sorted by priority, head.
217    */
218   struct GSF_PeerTransmitHandle *pth_head;
219
220   /**
221    * Messages (replies, queries, content migration) we would like to
222    * send to this peer in the near future.  Sorted by priority, tail.
223    */
224   struct GSF_PeerTransmitHandle *pth_tail;
225
226   /**
227    * Messages (replies, queries, content migration) we would like to
228    * send to this peer in the near future.  Sorted by priority, head.
229    */
230   struct GSF_DelayedHandle *delayed_head;
231
232   /**
233    * Messages (replies, queries, content migration) we would like to
234    * send to this peer in the near future.  Sorted by priority, tail.
235    */
236   struct GSF_DelayedHandle *delayed_tail;
237
238   /**
239    * Migration stop message in our queue, or NULL if we have none pending.
240    */
241   struct GSF_PeerTransmitHandle *migration_pth;
242
243   /**
244    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
245    */
246   struct GNUNET_ATS_ReservationContext *rc;
247
248   /**
249    * Task scheduled if we need to retry bandwidth reservation later.
250    */
251   GNUNET_SCHEDULER_TaskIdentifier rc_delay_task;
252
253   /**
254    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
255    */
256   struct GNUNET_CONTAINER_MultiHashMap *request_map;
257
258   /**
259    * Handle for an active request for transmission to this
260    * peer, or NULL (if core queue was full).
261    */
262   struct GNUNET_CORE_TransmitHandle *cth;
263
264   /**
265    * Increase in traffic preference still to be submitted
266    * to the core service for this peer.
267    */
268   uint64_t inc_preference;
269
270   /**
271    * Set to 1 if we're currently in the process of calling
272    * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
273    * NULL, we should not call notify_transmit_ready for this
274    * handle right now).
275    */
276   unsigned int cth_in_progress;
277
278   /**
279    * Respect rating for this peer on disk.
280    */
281   uint32_t disk_respect;
282
283   /**
284    * Which offset in "last_p2p_replies" will be updated next?
285    * (we go round-robin).
286    */
287   unsigned int last_p2p_replies_woff;
288
289   /**
290    * Which offset in "last_client_replies" will be updated next?
291    * (we go round-robin).
292    */
293   unsigned int last_client_replies_woff;
294
295   /**
296    * Current offset into 'last_request_times' ring buffer.
297    */
298   unsigned int last_request_times_off;
299
300   /**
301    * GNUNET_YES if we did successfully reserve 32k bandwidth,
302    * GNUNET_NO if not.
303    */
304   int did_reserve;
305
306   /**
307    * Function called when the creation of this record is complete.
308    */
309   GSF_ConnectedPeerCreationCallback creation_cb;
310
311   /**
312    * Closure for @e creation_cb
313    */
314   void *creation_cb_cls;
315
316 };
317
318
319 /**
320  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
321  */
322 static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
323
324 /**
325  * Handle to peerstore service.
326  */
327 static struct GNUNET_PEERSTORE_Handle *peerstore;
328
329
330 /**
331  * Update the latency information kept for the given peer.
332  *
333  * @param id peer record to update
334  * @param latency current latency value
335  */
336 void
337 GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
338                           struct GNUNET_TIME_Relative latency)
339 {
340   struct GSF_ConnectedPeer *cp;
341
342   cp = GSF_peer_get_ (id);
343   if (NULL == cp)
344     return; /* we're not yet connected at the core level, ignore */
345   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay, latency);
346   /* LATER: merge atsi into cp's performance data (if we ever care...) */
347 }
348
349
350 /**
351  * Return the performance data record for the given peer
352  *
353  * @param cp peer to query
354  * @return performance data record for the peer
355  */
356 struct GSF_PeerPerformanceData *
357 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
358 {
359   return &cp->ppd;
360 }
361
362
363 /**
364  * Core is ready to transmit to a peer, get the message.
365  *
366  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
367  * @param size number of bytes core is willing to take
368  * @param buf where to copy the message
369  * @return number of bytes copied to buf
370  */
371 static size_t
372 peer_transmit_ready_cb (void *cls, size_t size, void *buf);
373
374
375 /**
376  * Function called by core upon success or failure of our bandwidth reservation request.
377  *
378  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
379  * @param peer identifies the peer
380  * @param amount set to the amount that was actually reserved or unreserved;
381  *               either the full requested amount or zero (no partial reservations)
382  * @param res_delay if the reservation could not be satisfied (amount was 0), how
383  *        long should the client wait until re-trying?
384  */
385 static void
386 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
387                       int32_t amount, struct GNUNET_TIME_Relative res_delay);
388
389
390 /**
391  * If ready (bandwidth reserved), try to schedule transmission via
392  * core for the given handle.
393  *
394  * @param pth transmission handle to schedule
395  */
396 static void
397 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
398 {
399   struct GSF_ConnectedPeer *cp;
400   struct GNUNET_PeerIdentity target;
401
402   cp = pth->cp;
403   if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
404     return;                     /* already done */
405   GNUNET_assert (0 != cp->ppd.pid);
406   GNUNET_PEER_resolve (cp->ppd.pid, &target);
407
408   if (0 != cp->inc_preference)
409   {
410     GNUNET_ATS_performance_change_preference (GSF_ats, &target, GNUNET_ATS_PREFERENCE_BANDWIDTH,
411                                   (double) cp->inc_preference,
412                                   GNUNET_ATS_PREFERENCE_END);
413     cp->inc_preference = 0;
414   }
415
416   if ((GNUNET_YES == pth->is_query) && (GNUNET_YES != pth->was_reserved))
417   {
418     /* query, need reservation */
419     if (GNUNET_YES != cp->did_reserve)
420       return;                   /* not ready */
421     cp->did_reserve = GNUNET_NO;
422     /* reservation already done! */
423     pth->was_reserved = GNUNET_YES;
424     cp->rc =
425         GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
426                                       &ats_reserve_callback, cp);
427     return;
428   }
429   GNUNET_assert (NULL == cp->cth);
430   cp->cth_in_progress++;
431   cp->cth =
432     GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
433                                        GNUNET_CORE_PRIO_BACKGROUND,
434                                        GNUNET_TIME_absolute_get_remaining
435                                        (pth->timeout), &target, pth->size,
436                                        &peer_transmit_ready_cb, cp);
437   GNUNET_assert (NULL != cp->cth);
438   GNUNET_assert (0 < cp->cth_in_progress--);
439 }
440
441
442 /**
443  * Core is ready to transmit to a peer, get the message.
444  *
445  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
446  * @param size number of bytes core is willing to take
447  * @param buf where to copy the message
448  * @return number of bytes copied to buf
449  */
450 static size_t
451 peer_transmit_ready_cb (void *cls, size_t size, void *buf)
452 {
453   struct GSF_ConnectedPeer *cp = cls;
454   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
455   struct GSF_PeerTransmitHandle *pos;
456   size_t ret;
457
458   cp->cth = NULL;
459   if (NULL == pth)
460     return 0;
461   if (pth->size > size)
462   {
463     schedule_transmission (pth);
464     return 0;
465   }
466   if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
467   {
468     GNUNET_SCHEDULER_cancel (pth->timeout_task);
469     pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
470   }
471   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
472   if (GNUNET_YES == pth->is_query)
473   {
474     cp->ppd.last_request_times[(cp->last_request_times_off++) %
475                                MAX_QUEUE_PER_PEER] =
476         GNUNET_TIME_absolute_get ();
477     GNUNET_assert (0 < cp->ppd.pending_queries--);
478   }
479   else if (GNUNET_NO == pth->is_query)
480   {
481     GNUNET_assert (0 < cp->ppd.pending_replies--);
482   }
483   GNUNET_LOAD_update (cp->ppd.transmission_delay,
484                       GNUNET_TIME_absolute_get_duration
485                       (pth->transmission_request_start_time).rel_value_us);
486   ret = pth->gmc (pth->gmc_cls, size, buf);
487   if (NULL != (pos = cp->pth_head))
488   {
489     GNUNET_assert (pos != pth);
490     schedule_transmission (pos);
491   }
492   GNUNET_free (pth);
493   return ret;
494 }
495
496
497 /**
498  * (re)try to reserve bandwidth from the given peer.
499  *
500  * @param cls the 'struct GSF_ConnectedPeer' to reserve from
501  * @param tc scheduler context
502  */
503 static void
504 retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
505 {
506   struct GSF_ConnectedPeer *cp = cls;
507   struct GNUNET_PeerIdentity target;
508
509   GNUNET_PEER_resolve (cp->ppd.pid, &target);
510   cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
511   cp->rc =
512     GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
513                                   &ats_reserve_callback, cp);
514 }
515
516
517 /**
518  * Function called by core upon success or failure of our bandwidth reservation request.
519  *
520  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
521  * @param peer identifies the peer
522  * @param amount set to the amount that was actually reserved or unreserved;
523  *               either the full requested amount or zero (no partial reservations)
524  * @param res_delay if the reservation could not be satisfied (amount was 0), how
525  *        long should the client wait until re-trying?
526  */
527 static void
528 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
529                       int32_t amount, struct GNUNET_TIME_Relative res_delay)
530 {
531   struct GSF_ConnectedPeer *cp = cls;
532   struct GSF_PeerTransmitHandle *pth;
533
534   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
535               "Reserved %d bytes / need to wait %s for reservation\n",
536               (int) amount,
537               GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
538   cp->rc = NULL;
539   if (0 == amount)
540   {
541     cp->rc_delay_task =
542         GNUNET_SCHEDULER_add_delayed (res_delay, &retry_reservation, cp);
543     return;
544   }
545   cp->did_reserve = GNUNET_YES;
546   pth = cp->pth_head;
547   if ((NULL != pth) && (NULL == cp->cth) && (0 == cp->cth_in_progress))
548   {
549     /* reservation success, try transmission now! */
550     cp->cth_in_progress++;
551     cp->cth =
552         GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
553                                            GNUNET_CORE_PRIO_BACKGROUND,
554                                            GNUNET_TIME_absolute_get_remaining
555                                            (pth->timeout), peer, pth->size,
556                                            &peer_transmit_ready_cb, cp);
557     GNUNET_assert (NULL != cp->cth);
558     GNUNET_assert (0 < cp->cth_in_progress--);
559   }
560 }
561
562 /**
563  * Function called by PEERSTORE with peer respect record
564  *
565  * @param cls handle to connected peer entry
566  * @param record peerstore record information
567  * @param emsg error message, or NULL if no errors
568  * @return #GNUNET_NO to stop iterating since we only expect 0 or 1 records
569  */
570 static int
571 peer_respect_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
572 {
573   struct GSF_ConnectedPeer *cp = cls;
574
575   if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size))
576     cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value);
577   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
578   GNUNET_break (GNUNET_OK ==
579                 GNUNET_CONTAINER_multipeermap_put (cp_map,
580                GSF_connected_peer_get_identity2_ (cp),
581                                                    cp,
582                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
583   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
584                          GNUNET_CONTAINER_multipeermap_size (cp_map),
585                          GNUNET_NO);
586   GSF_push_start_ (cp);
587   if (NULL != cp->creation_cb)
588     cp->creation_cb (cp->creation_cb_cls, cp);
589   return GNUNET_NO;
590 }
591
592 /**
593  * A peer connected to us.  Setup the connected peer
594  * records.
595  *
596  * @param peer identity of peer that connected
597  * @param creation_cb callback function when the record is created.
598  * @param creation_cb_cls closure for @creation_cb
599  */
600 void
601 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
602                            GSF_ConnectedPeerCreationCallback creation_cb,
603                            void *creation_cb_cls)
604 {
605   struct GSF_ConnectedPeer *cp;
606
607   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to peer %s\n",
608               GNUNET_i2s (peer));
609   cp = GNUNET_new (struct GSF_ConnectedPeer);
610   cp->ppd.pid = GNUNET_PEER_intern (peer);
611   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
612   cp->rc =
613       GNUNET_ATS_reserve_bandwidth (GSF_ats, peer, DBLOCK_SIZE,
614                                     &ats_reserve_callback, cp);
615   cp->creation_cb = creation_cb;
616   cp->creation_cb_cls = creation_cb_cls;
617   GNUNET_PEERSTORE_iterate (peerstore, "fs", peer, "respect",
618                             GNUNET_TIME_UNIT_FOREVER_REL, &peer_respect_cb, cp);
619 }
620
621
622 /**
623  * It may be time to re-start migrating content to this
624  * peer.  Check, and if so, restart migration.
625  *
626  * @param cls the 'struct GSF_ConnectedPeer'
627  * @param tc scheduler context
628  */
629 static void
630 revive_migration (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
631 {
632   struct GSF_ConnectedPeer *cp = cls;
633   struct GNUNET_TIME_Relative bt;
634
635   cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
636   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
637   if (0 != bt.rel_value_us)
638   {
639     /* still time left... */
640     cp->mig_revive_task =
641         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
642     return;
643   }
644   GSF_push_start_ (cp);
645 }
646
647
648 /**
649  * Get a handle for a connected peer.
650  *
651  * @param peer peer's identity
652  * @return NULL if the peer is not currently connected
653  */
654 struct GSF_ConnectedPeer *
655 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
656 {
657   if (NULL == cp_map)
658     return NULL;
659   return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
660 }
661
662
663 /**
664  * Handle P2P "MIGRATION_STOP" message.
665  *
666  * @param cls closure, always NULL
667  * @param other the other peer involved (sender or receiver, NULL
668  *        for loopback messages where we are both sender and receiver)
669  * @param message the actual message
670  * @return GNUNET_OK to keep the connection open,
671  *         GNUNET_SYSERR to close it (signal serious error)
672  */
673 int
674 GSF_handle_p2p_migration_stop_ (void *cls,
675                                 const struct GNUNET_PeerIdentity *other,
676                                 const struct GNUNET_MessageHeader *message)
677 {
678   struct GSF_ConnectedPeer *cp;
679   const struct MigrationStopMessage *msm;
680   struct GNUNET_TIME_Relative bt;
681
682   msm = (const struct MigrationStopMessage *) message;
683   cp = GSF_peer_get_ (other);
684   if (NULL == cp)
685   {
686     GNUNET_break (0);
687     return GNUNET_OK;
688   }
689   GNUNET_STATISTICS_update (GSF_stats,
690                             gettext_noop ("# migration stop messages received"),
691                             1, GNUNET_NO);
692   bt = GNUNET_TIME_relative_ntoh (msm->duration);
693   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
694               _("Migration of content to peer `%s' blocked for %s\n"),
695               GNUNET_i2s (other),
696               GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
697   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
698   if (GNUNET_SCHEDULER_NO_TASK == cp->mig_revive_task)
699   {
700     GSF_push_stop_ (cp);
701     cp->mig_revive_task =
702         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
703   }
704   return GNUNET_OK;
705 }
706
707
708 /**
709  * Copy reply and free put message.
710  *
711  * @param cls the 'struct PutMessage'
712  * @param buf_size number of bytes available in buf
713  * @param buf where to copy the message, NULL on error (peer disconnect)
714  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
715  */
716 static size_t
717 copy_reply (void *cls, size_t buf_size, void *buf)
718 {
719   struct PutMessage *pm = cls;
720   size_t size;
721
722   if (NULL != buf)
723   {
724     GNUNET_assert (buf_size >= ntohs (pm->header.size));
725     size = ntohs (pm->header.size);
726     memcpy (buf, pm, size);
727     GNUNET_STATISTICS_update (GSF_stats,
728                               gettext_noop
729                               ("# replies transmitted to other peers"), 1,
730                               GNUNET_NO);
731   }
732   else
733   {
734     size = 0;
735     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies dropped"), 1,
736                               GNUNET_NO);
737   }
738   GNUNET_free (pm);
739   return size;
740 }
741
742
743 /**
744  * Free resources associated with the given peer request.
745  *
746  * @param peerreq request to free
747  * @param query associated key for the request
748  */
749 static void
750 free_pending_request (struct PeerRequest *peerreq,
751                       const struct GNUNET_HashCode *query)
752 {
753   struct GSF_ConnectedPeer *cp = peerreq->cp;
754
755   if (GNUNET_SCHEDULER_NO_TASK != peerreq->kill_task)
756   {
757     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
758     peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
759   }
760   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
761                             -1, GNUNET_NO);
762   GNUNET_break (GNUNET_YES ==
763                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
764                                                       query, peerreq));
765   GNUNET_free (peerreq);
766 }
767
768
769 /**
770  * Cancel all requests associated with the peer.
771  *
772  * @param cls unused
773  * @param query hash code of the request
774  * @param value the 'struct GSF_PendingRequest'
775  * @return GNUNET_YES (continue to iterate)
776  */
777 static int
778 cancel_pending_request (void *cls, const struct GNUNET_HashCode * query, void *value)
779 {
780   struct PeerRequest *peerreq = value;
781   struct GSF_PendingRequest *pr = peerreq->pr;
782   struct GSF_PendingRequestData *prd;
783
784   prd = GSF_pending_request_get_data_ (pr);
785   GSF_pending_request_cancel_ (pr, GNUNET_NO);
786   free_pending_request (peerreq, &prd->query);
787   return GNUNET_OK;
788 }
789
790
791 /**
792  * Free the given request.
793  *
794  * @param cls the request to free
795  * @param tc task context
796  */
797 static void
798 peer_request_destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
799 {
800   struct PeerRequest *peerreq = cls;
801   struct GSF_PendingRequest *pr = peerreq->pr;
802   struct GSF_PendingRequestData *prd;
803
804   peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
805   prd = GSF_pending_request_get_data_ (pr);
806   cancel_pending_request (NULL, &prd->query, peerreq);
807 }
808
809
810 /**
811  * The artificial delay is over, transmit the message now.
812  *
813  * @param cls the 'struct GSF_DelayedHandle' with the message
814  * @param tc scheduler context
815  */
816 static void
817 transmit_delayed_now (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
818 {
819   struct GSF_DelayedHandle *dh = cls;
820   struct GSF_ConnectedPeer *cp = dh->cp;
821
822   GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
823   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
824   {
825     GNUNET_free (dh->pm);
826     GNUNET_free (dh);
827     return;
828   }
829   (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
830                              dh->msize, &copy_reply, dh->pm);
831   GNUNET_free (dh);
832 }
833
834
835 /**
836  * Get the randomized delay a response should be subjected to.
837  *
838  * @return desired delay
839  */
840 static struct GNUNET_TIME_Relative
841 get_randomized_delay ()
842 {
843   struct GNUNET_TIME_Relative ret;
844
845   ret =
846       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
847                                      GNUNET_CRYPTO_random_u32
848                                      (GNUNET_CRYPTO_QUALITY_WEAK,
849                                       2 * GSF_avg_latency.rel_value_us + 1));
850 #if INSANE_STATISTICS
851   GNUNET_STATISTICS_update (GSF_stats,
852                             gettext_noop
853                             ("# artificial delays introduced (ms)"),
854                             ret.rel_value_us / 1000LL, GNUNET_NO);
855 #endif
856   return ret;
857 }
858
859
860 /**
861  * Handle a reply to a pending request.  Also called if a request
862  * expires (then with data == NULL).  The handler may be called
863  * many times (depending on the request type), but will not be
864  * called during or after a call to GSF_pending_request_cancel
865  * and will also not be called anymore after a call signalling
866  * expiration.
867  *
868  * @param cls 'struct PeerRequest' this is an answer for
869  * @param eval evaluation of the result
870  * @param pr handle to the original pending request
871  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
872  * @param expiration when does 'data' expire?
873  * @param last_transmission when did we last transmit a request for this block
874  * @param type type of the block
875  * @param data response data, NULL on request expiration
876  * @param data_len number of bytes in data
877  */
878 static void
879 handle_p2p_reply (void *cls, enum GNUNET_BLOCK_EvaluationResult eval,
880                   struct GSF_PendingRequest *pr, uint32_t reply_anonymity_level,
881                   struct GNUNET_TIME_Absolute expiration,
882                   struct GNUNET_TIME_Absolute last_transmission,
883                   enum GNUNET_BLOCK_Type type, const void *data,
884                   size_t data_len)
885 {
886   struct PeerRequest *peerreq = cls;
887   struct GSF_ConnectedPeer *cp = peerreq->cp;
888   struct GSF_PendingRequestData *prd;
889   struct PutMessage *pm;
890   size_t msize;
891
892   GNUNET_assert (data_len + sizeof (struct PutMessage) <
893                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
894   GNUNET_assert (peerreq->pr == pr);
895   prd = GSF_pending_request_get_data_ (pr);
896   if (NULL == data)
897   {
898     free_pending_request (peerreq, &prd->query);
899     return;
900   }
901   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
902   if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
903   {
904     GNUNET_STATISTICS_update (GSF_stats,
905                               gettext_noop
906                               ("# replies dropped due to type mismatch"),
907                                 1, GNUNET_NO);
908     return;
909   }
910   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
911               "Transmitting result for query `%s' to peer\n",
912               GNUNET_h2s (&prd->query));
913   GNUNET_STATISTICS_update (GSF_stats,
914                             gettext_noop ("# replies received for other peers"),
915                             1, GNUNET_NO);
916   msize = sizeof (struct PutMessage) + data_len;
917   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
918   {
919     GNUNET_break (0);
920     return;
921   }
922   if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
923   {
924     if (reply_anonymity_level - 1 > GSF_cover_content_count)
925     {
926       GNUNET_STATISTICS_update (GSF_stats,
927                                 gettext_noop
928                                 ("# replies dropped due to insufficient cover traffic"),
929                                 1, GNUNET_NO);
930       return;
931     }
932     GSF_cover_content_count -= (reply_anonymity_level - 1);
933   }
934
935   pm = GNUNET_malloc (msize);
936   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
937   pm->header.size = htons (msize);
938   pm->type = htonl (type);
939   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
940   memcpy (&pm[1], data, data_len);
941   if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) &&
942       (GNUNET_YES == GSF_enable_randomized_delays))
943   {
944     struct GSF_DelayedHandle *dh;
945
946     dh = GNUNET_new (struct GSF_DelayedHandle);
947     dh->cp = cp;
948     dh->pm = pm;
949     dh->msize = msize;
950     GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
951     dh->delay_task =
952         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
953                                       &transmit_delayed_now, dh);
954   }
955   else
956   {
957     (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
958                                &copy_reply, pm);
959   }
960   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
961     return;
962   if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task)
963   {
964     GNUNET_STATISTICS_update (GSF_stats,
965                               gettext_noop
966                               ("# P2P searches destroyed due to ultimate reply"),
967                               1, GNUNET_NO);
968     peerreq->kill_task =
969         GNUNET_SCHEDULER_add_now (&peer_request_destroy, peerreq);
970   }
971 }
972
973
974 /**
975  * Increase the peer's respect by a value.
976  *
977  * @param cp which peer to change the respect value on
978  * @param value is the int value by which the
979  *  peer's credit is to be increased or decreased
980  * @returns the actual change in respect (positive or negative)
981  */
982 static int
983 change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
984 {
985   if (0 == value)
986     return 0;
987   GNUNET_assert (NULL != cp);
988   if (value > 0)
989   {
990     if (cp->ppd.respect + value < cp->ppd.respect)
991     {
992       value = UINT32_MAX - cp->ppd.respect;
993       cp->ppd.respect = UINT32_MAX;
994     }
995     else
996       cp->ppd.respect += value;
997   }
998   else
999   {
1000     if (cp->ppd.respect < -value)
1001     {
1002       value = -cp->ppd.respect;
1003       cp->ppd.respect = 0;
1004     }
1005     else
1006       cp->ppd.respect += value;
1007   }
1008   return value;
1009 }
1010
1011
1012 /**
1013  * We've received a request with the specified priority.  Bound it
1014  * according to how much we respect the given peer.
1015  *
1016  * @param prio_in requested priority
1017  * @param cp the peer making the request
1018  * @return effective priority
1019  */
1020 static int32_t
1021 bound_priority (uint32_t prio_in, struct GSF_ConnectedPeer *cp)
1022 {
1023 #define N ((double)128.0)
1024   uint32_t ret;
1025   double rret;
1026   int ld;
1027
1028   ld = GSF_test_get_load_too_high_ (0);
1029   if (GNUNET_SYSERR == ld)
1030   {
1031 #if INSANE_STATISTICS
1032     GNUNET_STATISTICS_update (GSF_stats,
1033                               gettext_noop
1034                               ("# requests done for free (low load)"), 1,
1035                               GNUNET_NO);
1036 #endif
1037     return 0;                   /* excess resources */
1038   }
1039   if (prio_in > INT32_MAX)
1040     prio_in = INT32_MAX;
1041   ret = -change_peer_respect (cp, -(int) prio_in);
1042   if (ret > 0)
1043   {
1044     if (ret > GSF_current_priorities + N)
1045       rret = GSF_current_priorities + N;
1046     else
1047       rret = ret;
1048     GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1049   }
1050   if ((GNUNET_YES == ld) && (ret > 0))
1051   {
1052     /* try with charging */
1053     ld = GSF_test_get_load_too_high_ (ret);
1054   }
1055   if (GNUNET_YES == ld)
1056   {
1057     GNUNET_STATISTICS_update (GSF_stats,
1058                               gettext_noop
1059                               ("# request dropped, priority insufficient"), 1,
1060                               GNUNET_NO);
1061     /* undo charge */
1062     change_peer_respect (cp, (int) ret);
1063     return -1;                  /* not enough resources */
1064   }
1065   else
1066   {
1067     GNUNET_STATISTICS_update (GSF_stats,
1068                               gettext_noop
1069                               ("# requests done for a price (normal load)"), 1,
1070                               GNUNET_NO);
1071   }
1072 #undef N
1073   return ret;
1074 }
1075
1076
1077 /**
1078  * The priority level imposes a bound on the maximum
1079  * value for the ttl that can be requested.
1080  *
1081  * @param ttl_in requested ttl
1082  * @param prio given priority
1083  * @return ttl_in if ttl_in is below the limit,
1084  *         otherwise the ttl-limit for the given priority
1085  */
1086 static int32_t
1087 bound_ttl (int32_t ttl_in, uint32_t prio)
1088 {
1089   unsigned long long allowed;
1090
1091   if (ttl_in <= 0)
1092     return ttl_in;
1093   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1094   if (ttl_in > allowed)
1095   {
1096     if (allowed >= (1 << 30))
1097       return 1 << 30;
1098     return allowed;
1099   }
1100   return ttl_in;
1101 }
1102
1103
1104 /**
1105  * Handle P2P "QUERY" message.  Creates the pending request entry
1106  * and sets up all of the data structures to that we will
1107  * process replies properly.  Does not initiate forwarding or
1108  * local database lookups.
1109  *
1110  * @param other the other peer involved (sender or receiver, NULL
1111  *        for loopback messages where we are both sender and receiver)
1112  * @param message the actual message
1113  * @return pending request handle, NULL on error
1114  */
1115 struct GSF_PendingRequest *
1116 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1117                        const struct GNUNET_MessageHeader *message)
1118 {
1119   struct PeerRequest *peerreq;
1120   struct GSF_PendingRequest *pr;
1121   struct GSF_PendingRequestData *prd;
1122   struct GSF_ConnectedPeer *cp;
1123   struct GSF_ConnectedPeer *cps;
1124   const struct GNUNET_PeerIdentity *target;
1125   enum GSF_PendingRequestOptions options;
1126   uint16_t msize;
1127   const struct GetMessage *gm;
1128   unsigned int bits;
1129   const struct GNUNET_PeerIdentity *opt;
1130   uint32_t bm;
1131   size_t bfsize;
1132   uint32_t ttl_decrement;
1133   int32_t priority;
1134   int32_t ttl;
1135   enum GNUNET_BLOCK_Type type;
1136   GNUNET_PEER_Id spid;
1137
1138   GNUNET_assert (other != NULL);
1139   msize = ntohs (message->size);
1140   if (msize < sizeof (struct GetMessage))
1141   {
1142     GNUNET_break_op (0);
1143     return NULL;
1144   }
1145   GNUNET_STATISTICS_update (GSF_stats,
1146                             gettext_noop
1147                             ("# GET requests received (from other peers)"), 1,
1148                             GNUNET_NO);
1149   gm = (const struct GetMessage *) message;
1150   type = ntohl (gm->type);
1151   bm = ntohl (gm->hash_bitmap);
1152   bits = 0;
1153   while (bm > 0)
1154   {
1155     if (1 == (bm & 1))
1156       bits++;
1157     bm >>= 1;
1158   }
1159   if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
1160   {
1161     GNUNET_break_op (0);
1162     return NULL;
1163   }
1164   opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1165   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
1166   /* bfsize must be power of 2, check! */
1167   if (0 != ((bfsize - 1) & bfsize))
1168   {
1169     GNUNET_break_op (0);
1170     return NULL;
1171   }
1172   GSF_cover_query_count++;
1173   bm = ntohl (gm->hash_bitmap);
1174   bits = 0;
1175   cps = GSF_peer_get_ (other);
1176   if (NULL == cps)
1177   {
1178     /* peer must have just disconnected */
1179     GNUNET_STATISTICS_update (GSF_stats,
1180                               gettext_noop
1181                               ("# requests dropped due to initiator not being connected"),
1182                               1, GNUNET_NO);
1183     return NULL;
1184   }
1185   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1186     cp = GSF_peer_get_ (&opt[bits++]);
1187   else
1188     cp = cps;
1189   if (NULL == cp)
1190   {
1191     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1192       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1193                   "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
1194                   GNUNET_i2s (&opt[bits - 1]));
1195
1196     else
1197       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1198                   "Failed to find peer `%4s' in connection set. Dropping query.\n",
1199                   GNUNET_i2s (other));
1200 #if INSANE_STATISTICS
1201     GNUNET_STATISTICS_update (GSF_stats,
1202                               gettext_noop
1203                               ("# requests dropped due to missing reverse route"),
1204                               1, GNUNET_NO);
1205 #endif
1206     return NULL;
1207   }
1208   /* note that we can really only check load here since otherwise
1209    * peers could find out that we are overloaded by not being
1210    * disconnected after sending us a malformed query... */
1211   priority = bound_priority (ntohl (gm->priority), cps);
1212   if (priority < 0)
1213   {
1214     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1215                 "Dropping query from `%s', this peer is too busy.\n",
1216                 GNUNET_i2s (other));
1217     return NULL;
1218   }
1219   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1220               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
1221               GNUNET_h2s (&gm->query),
1222               (unsigned int) type,
1223               GNUNET_i2s (other),
1224               (unsigned int) bm);
1225   target =
1226       (0 !=
1227        (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1228   options = GSF_PRO_DEFAULTS;
1229   spid = 0;
1230   if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
1231       || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1232           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
1233           GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1234   {
1235     /* don't have BW to send to peer, or would likely take longer than we have for it,
1236      * so at best indirect the query */
1237     priority = 0;
1238     options |= GSF_PRO_FORWARD_ONLY;
1239     spid = GNUNET_PEER_intern (other);
1240     GNUNET_assert (0 != spid);
1241   }
1242   ttl = bound_ttl (ntohl (gm->ttl), priority);
1243   /* decrement ttl (always) */
1244   ttl_decrement =
1245       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1246                                                     TTL_DECREMENT);
1247   if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
1248   {
1249     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1250                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1251                 GNUNET_i2s (other), ttl, ttl_decrement);
1252     GNUNET_STATISTICS_update (GSF_stats,
1253                               gettext_noop
1254                               ("# requests dropped due TTL underflow"), 1,
1255                               GNUNET_NO);
1256     /* integer underflow => drop (should be very rare)! */
1257     return NULL;
1258   }
1259   ttl -= ttl_decrement;
1260
1261   /* test if the request already exists */
1262   peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map, &gm->query);
1263   if (peerreq != NULL)
1264   {
1265     pr = peerreq->pr;
1266     prd = GSF_pending_request_get_data_ (pr);
1267     if (prd->type == type)
1268     {
1269       if (prd->ttl.abs_value_us >= GNUNET_TIME_absolute_get ().abs_value_us + ttl * 1000LL)
1270       {
1271         /* existing request has higher TTL, drop new one! */
1272         prd->priority += priority;
1273         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1274                     "Have existing request with higher TTL, dropping new request.\n",
1275                     GNUNET_i2s (other));
1276         GNUNET_STATISTICS_update (GSF_stats,
1277                                   gettext_noop
1278                                   ("# requests dropped due to higher-TTL request"),
1279                                   1, GNUNET_NO);
1280         return NULL;
1281       }
1282       /* existing request has lower TTL, drop old one! */
1283       priority += prd->priority;
1284       GSF_pending_request_cancel_ (pr, GNUNET_YES);
1285       free_pending_request (peerreq, &gm->query);
1286     }
1287   }
1288
1289   peerreq = GNUNET_new (struct PeerRequest);
1290   peerreq->cp = cp;
1291   pr = GSF_pending_request_create_ (options, type, &gm->query,
1292                                     target,
1293                                     (bfsize >
1294                                      0) ? (const char *) &opt[bits] : NULL,
1295                                     bfsize, ntohl (gm->filter_mutator),
1296                                     1 /* anonymity */ ,
1297                                     (uint32_t) priority, ttl, spid, GNUNET_PEER_intern (other), NULL, 0,        /* replies_seen */
1298                                     &handle_p2p_reply, peerreq);
1299   GNUNET_assert (NULL != pr);
1300   peerreq->pr = pr;
1301   GNUNET_break (GNUNET_OK ==
1302                 GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query,
1303                                                    peerreq,
1304                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1305   GNUNET_STATISTICS_update (GSF_stats,
1306                             gettext_noop
1307                             ("# P2P query messages received and processed"), 1,
1308                             GNUNET_NO);
1309   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
1310                             1, GNUNET_NO);
1311   return pr;
1312 }
1313
1314
1315 /**
1316  * Function called if there has been a timeout trying to satisfy
1317  * a transmission request.
1318  *
1319  * @param cls the 'struct GSF_PeerTransmitHandle' of the request
1320  * @param tc scheduler context
1321  */
1322 static void
1323 peer_transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1324 {
1325   struct GSF_PeerTransmitHandle *pth = cls;
1326   struct GSF_ConnectedPeer *cp;
1327
1328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1329               "Timeout trying to transmit to other peer\n");
1330   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1331   cp = pth->cp;
1332   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1333   if (GNUNET_YES == pth->is_query)
1334     GNUNET_assert (0 < cp->ppd.pending_queries--);
1335   else if (GNUNET_NO == pth->is_query)
1336     GNUNET_assert (0 < cp->ppd.pending_replies--);
1337   GNUNET_LOAD_update (cp->ppd.transmission_delay, UINT64_MAX);
1338   if (NULL != cp->cth)
1339   {
1340     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1341     cp->cth = NULL;
1342   }
1343   pth->gmc (pth->gmc_cls, 0, NULL);
1344   GNUNET_assert (0 == cp->cth_in_progress);
1345   GNUNET_free (pth);
1346 }
1347
1348
1349 /**
1350  * Transmit a message to the given peer as soon as possible.
1351  * If the peer disconnects before the transmission can happen,
1352  * the callback is invoked with a `NULL` @a buffer.
1353  *
1354  * @param cp target peer
1355  * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1356  * @param priority how important is this request?
1357  * @param timeout when does this request timeout (call gmc with error)
1358  * @param size number of bytes we would like to send to the peer
1359  * @param gmc function to call to get the message
1360  * @param gmc_cls closure for @a gmc
1361  * @return handle to cancel request
1362  */
1363 struct GSF_PeerTransmitHandle *
1364 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, int is_query,
1365                     uint32_t priority, struct GNUNET_TIME_Relative timeout,
1366                     size_t size, GSF_GetMessageCallback gmc, void *gmc_cls)
1367 {
1368   struct GSF_PeerTransmitHandle *pth;
1369   struct GSF_PeerTransmitHandle *pos;
1370   struct GSF_PeerTransmitHandle *prev;
1371
1372   pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1373   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1374   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1375   pth->gmc = gmc;
1376   pth->gmc_cls = gmc_cls;
1377   pth->size = size;
1378   pth->is_query = is_query;
1379   pth->priority = priority;
1380   pth->cp = cp;
1381   /* insertion sort (by priority, descending) */
1382   prev = NULL;
1383   pos = cp->pth_head;
1384   while ((NULL != pos) && (pos->priority > priority))
1385   {
1386     prev = pos;
1387     pos = pos->next;
1388   }
1389   GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
1390   if (GNUNET_YES == is_query)
1391     cp->ppd.pending_queries++;
1392   else if (GNUNET_NO == is_query)
1393     cp->ppd.pending_replies++;
1394   pth->timeout_task =
1395       GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
1396   schedule_transmission (pth);
1397   return pth;
1398 }
1399
1400
1401 /**
1402  * Cancel an earlier request for transmission.
1403  *
1404  * @param pth request to cancel
1405  */
1406 void
1407 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1408 {
1409   struct GSF_ConnectedPeer *cp;
1410
1411   if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
1412   {
1413     GNUNET_SCHEDULER_cancel (pth->timeout_task);
1414     pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1415   }
1416   cp = pth->cp;
1417   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1418   if (GNUNET_YES == pth->is_query)
1419     GNUNET_assert (0 < cp->ppd.pending_queries--);
1420   else if (GNUNET_NO == pth->is_query)
1421     GNUNET_assert (0 < cp->ppd.pending_replies--);
1422   GNUNET_free (pth);
1423 }
1424
1425
1426 /**
1427  * Report on receiving a reply; update the performance record of the given peer.
1428  *
1429  * @param cp responding peer (will be updated)
1430  * @param request_time time at which the original query was transmitted
1431  * @param request_priority priority of the original request
1432  */
1433 void
1434 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1435                               struct GNUNET_TIME_Absolute request_time,
1436                               uint32_t request_priority)
1437 {
1438   struct GNUNET_TIME_Relative delay;
1439
1440   delay = GNUNET_TIME_absolute_get_duration (request_time);
1441   cp->ppd.avg_reply_delay.rel_value_us =
1442       (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
1443        delay.rel_value_us) / RUNAVG_DELAY_N;
1444   cp->ppd.avg_priority =
1445       (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1446        request_priority) / RUNAVG_DELAY_N;
1447 }
1448
1449
1450 /**
1451  * Report on receiving a reply in response to an initiating client.
1452  * Remember that this peer is good for this client.
1453  *
1454  * @param cp responding peer (will be updated)
1455  * @param initiator_client local client on responsible for query
1456  */
1457 void
1458 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1459                                    struct GSF_LocalClient *initiator_client)
1460 {
1461   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1462                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1463 }
1464
1465
1466 /**
1467  * Report on receiving a reply in response to an initiating peer.
1468  * Remember that this peer is good for this initiating peer.
1469  *
1470  * @param cp responding peer (will be updated)
1471  * @param initiator_peer other peer responsible for query
1472  */
1473 void
1474 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1475                                  const struct GSF_ConnectedPeer *initiator_peer)
1476 {
1477   unsigned int woff;
1478
1479   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1480   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1481   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1482   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1483   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1484 }
1485
1486
1487 /**
1488  * A peer disconnected from us.  Tear down the connected peer
1489  * record.
1490  *
1491  * @param cls unused
1492  * @param peer identity of peer that connected
1493  */
1494 void
1495 GSF_peer_disconnect_handler_ (void *cls, const struct GNUNET_PeerIdentity *peer)
1496 {
1497   struct GSF_ConnectedPeer *cp;
1498   struct GSF_PeerTransmitHandle *pth;
1499   struct GSF_DelayedHandle *dh;
1500
1501   cp = GSF_peer_get_ (peer);
1502   if (NULL == cp)
1503     return;                     /* must have been disconnect from core with
1504                                  * 'peer' == my_id, ignore */
1505   GNUNET_assert (GNUNET_YES ==
1506                  GNUNET_CONTAINER_multipeermap_remove (cp_map,
1507                                                        peer,
1508                                                        cp));
1509   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1510                          GNUNET_CONTAINER_multipeermap_size (cp_map),
1511                          GNUNET_NO);
1512   if (NULL != cp->migration_pth)
1513   {
1514     GSF_peer_transmit_cancel_ (cp->migration_pth);
1515     cp->migration_pth = NULL;
1516   }
1517   if (NULL != cp->rc)
1518   {
1519     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1520     cp->rc = NULL;
1521   }
1522   if (GNUNET_SCHEDULER_NO_TASK != cp->rc_delay_task)
1523   {
1524     GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1525     cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
1526   }
1527   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1528                                          &cancel_pending_request, cp);
1529   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1530   cp->request_map = NULL;
1531   GSF_plan_notify_peer_disconnect_ (cp);
1532   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1533   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1534   memset (cp->ppd.last_p2p_replies, 0, sizeof (cp->ppd.last_p2p_replies));
1535   GSF_push_stop_ (cp);
1536   if (NULL != cp->cth)
1537   {
1538     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1539     cp->cth = NULL;
1540   }
1541   GNUNET_assert (0 == cp->cth_in_progress);
1542   while (NULL != (pth = cp->pth_head))
1543   {
1544     if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1545     {
1546       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1547       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1548     }
1549     GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1550     pth->gmc (pth->gmc_cls, 0, NULL);
1551     GNUNET_free (pth);
1552   }
1553   while (NULL != (dh = cp->delayed_head))
1554   {
1555     GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
1556     GNUNET_SCHEDULER_cancel (dh->delay_task);
1557     GNUNET_free (dh->pm);
1558     GNUNET_free (dh);
1559   }
1560   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1561   if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1562   {
1563     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1564     cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
1565   }
1566   GNUNET_free (cp);
1567 }
1568
1569
1570 /**
1571  * Closure for 'call_iterator'.
1572  */
1573 struct IterationContext
1574 {
1575   /**
1576    * Function to call on each entry.
1577    */
1578   GSF_ConnectedPeerIterator it;
1579
1580   /**
1581    * Closure for 'it'.
1582    */
1583   void *it_cls;
1584 };
1585
1586
1587 /**
1588  * Function that calls the callback for each peer.
1589  *
1590  * @param cls the `struct IterationContext *`
1591  * @param key identity of the peer
1592  * @param value the `struct GSF_ConnectedPeer *`
1593  * @return #GNUNET_YES to continue iteration
1594  */
1595 static int
1596 call_iterator (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
1597 {
1598   struct IterationContext *ic = cls;
1599   struct GSF_ConnectedPeer *cp = value;
1600
1601   ic->it (ic->it_cls, (const struct GNUNET_PeerIdentity *) key, cp, &cp->ppd);
1602   return GNUNET_YES;
1603 }
1604
1605
1606 /**
1607  * Iterate over all connected peers.
1608  *
1609  * @param it function to call for each peer
1610  * @param it_cls closure for @a it
1611  */
1612 void
1613 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it, void *it_cls)
1614 {
1615   struct IterationContext ic;
1616
1617   ic.it = it;
1618   ic.it_cls = it_cls;
1619   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &call_iterator, &ic);
1620 }
1621
1622
1623 /**
1624  * Obtain the identity of a connected peer.
1625  *
1626  * @param cp peer to get identity of
1627  * @param id identity to set (written to)
1628  */
1629 void
1630 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1631                                   struct GNUNET_PeerIdentity *id)
1632 {
1633   GNUNET_assert (0 != cp->ppd.pid);
1634   GNUNET_PEER_resolve (cp->ppd.pid, id);
1635 }
1636
1637
1638 /**
1639  * Obtain the identity of a connected peer.
1640  *
1641  * @param cp peer to get identity of
1642  * @return reference to peer identity, valid until peer disconnects (!)
1643  */
1644 const struct GNUNET_PeerIdentity *
1645 GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1646 {
1647   GNUNET_assert (0 != cp->ppd.pid);
1648   return GNUNET_PEER_resolve2 (cp->ppd.pid);
1649 }
1650
1651
1652 /**
1653  * Assemble a migration stop message for transmission.
1654  *
1655  * @param cls the 'struct GSF_ConnectedPeer' to use
1656  * @param size number of bytes we're allowed to write to buf
1657  * @param buf where to copy the message
1658  * @return number of bytes copied to buf
1659  */
1660 static size_t
1661 create_migration_stop_message (void *cls, size_t size, void *buf)
1662 {
1663   struct GSF_ConnectedPeer *cp = cls;
1664   struct MigrationStopMessage msm;
1665
1666   cp->migration_pth = NULL;
1667   if (NULL == buf)
1668     return 0;
1669   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1670   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1671   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1672   msm.reserved = htonl (0);
1673   msm.duration =
1674       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1675                                  (cp->last_migration_block));
1676   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1677   GNUNET_STATISTICS_update (GSF_stats,
1678                             gettext_noop ("# migration stop messages sent"),
1679                             1, GNUNET_NO);
1680   return sizeof (struct MigrationStopMessage);
1681 }
1682
1683
1684 /**
1685  * Ask a peer to stop migrating data to us until the given point
1686  * in time.
1687  *
1688  * @param cp peer to ask
1689  * @param block_time until when to block
1690  */
1691 void
1692 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1693                            struct GNUNET_TIME_Absolute block_time)
1694 {
1695   if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1696   {
1697     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1698                 "Migration already blocked for another %s\n",
1699                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
1700                                                         (cp->last_migration_block), GNUNET_YES));
1701     return;                     /* already blocked */
1702   }
1703   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1704               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
1705                                                       GNUNET_YES));
1706   cp->last_migration_block = block_time;
1707   if (NULL != cp->migration_pth)
1708     GSF_peer_transmit_cancel_ (cp->migration_pth);
1709   cp->migration_pth =
1710       GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1711                           GNUNET_TIME_UNIT_FOREVER_REL,
1712                           sizeof (struct MigrationStopMessage),
1713                           &create_migration_stop_message, cp);
1714 }
1715
1716
1717 /**
1718  * Write peer-respect information to a file - flush the buffer entry!
1719  *
1720  * @param cls unused
1721  * @param key peer identity
1722  * @param value the 'struct GSF_ConnectedPeer' to flush
1723  * @return GNUNET_OK to continue iteration
1724  */
1725 static int
1726 flush_respect (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
1727 {
1728   struct GSF_ConnectedPeer *cp = value;
1729   struct GNUNET_PeerIdentity pid;
1730
1731   if (cp->ppd.respect == cp->disk_respect)
1732     return GNUNET_OK;           /* unchanged */
1733   GNUNET_assert (0 != cp->ppd.pid);
1734   GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1735   GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1736                           sizeof (cp->ppd.respect),
1737                           GNUNET_TIME_UNIT_FOREVER_ABS,
1738                           GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL);
1739   return GNUNET_OK;
1740 }
1741
1742
1743 /**
1744  * Notify core about a preference we have for the given peer
1745  * (to allocate more resources towards it).  The change will
1746  * be communicated the next time we reserve bandwidth with
1747  * core (not instantly).
1748  *
1749  * @param cp peer to reserve bandwidth from
1750  * @param pref preference change
1751  */
1752 void
1753 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1754                                        uint64_t pref)
1755 {
1756   cp->inc_preference += pref;
1757 }
1758
1759
1760 /**
1761  * Call this method periodically to flush respect information to disk.
1762  *
1763  * @param cls closure, not used
1764  * @param tc task context, not used
1765  */
1766 static void
1767 cron_flush_respect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1768 {
1769
1770   if (NULL == cp_map)
1771     return;
1772   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &flush_respect, NULL);
1773   if (NULL == tc)
1774     return;
1775   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1776     return;
1777   GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1778                                               GNUNET_SCHEDULER_PRIORITY_HIGH,
1779                                               &cron_flush_respect, NULL);
1780 }
1781
1782
1783 /**
1784  * Initialize peer management subsystem.
1785  */
1786 void
1787 GSF_connected_peer_init_ ()
1788 {
1789   cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1790   peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
1791   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1792                                       &cron_flush_respect, NULL);
1793 }
1794
1795
1796 /**
1797  * Iterator to free peer entries.
1798  *
1799  * @param cls closure, unused
1800  * @param key current key code
1801  * @param value value in the hash map (peer entry)
1802  * @return #GNUNET_YES (we should continue to iterate)
1803  */
1804 static int
1805 clean_peer (void *cls,
1806             const struct GNUNET_PeerIdentity *key,
1807             void *value)
1808 {
1809   GSF_peer_disconnect_handler_ (NULL, key);
1810   return GNUNET_YES;
1811 }
1812
1813
1814 /**
1815  * Shutdown peer management subsystem.
1816  */
1817 void
1818 GSF_connected_peer_done_ ()
1819 {
1820   cron_flush_respect (NULL, NULL);
1821   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_peer, NULL);
1822   GNUNET_CONTAINER_multipeermap_destroy (cp_map);
1823   cp_map = NULL;
1824   GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
1825 }
1826
1827
1828 /**
1829  * Iterator to remove references to LC entry.
1830  *
1831  * @param cls the 'struct GSF_LocalClient*' to look for
1832  * @param key current key code
1833  * @param value value in the hash map (peer entry)
1834  * @return #GNUNET_YES (we should continue to iterate)
1835  */
1836 static int
1837 clean_local_client (void *cls,
1838                     const struct GNUNET_PeerIdentity *key,
1839                     void *value)
1840 {
1841   const struct GSF_LocalClient *lc = cls;
1842   struct GSF_ConnectedPeer *cp = value;
1843   unsigned int i;
1844
1845   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1846     if (cp->ppd.last_client_replies[i] == lc)
1847       cp->ppd.last_client_replies[i] = NULL;
1848   return GNUNET_YES;
1849 }
1850
1851
1852 /**
1853  * Notification that a local client disconnected.  Clean up all of our
1854  * references to the given handle.
1855  *
1856  * @param lc handle to the local client (henceforth invalid)
1857  */
1858 void
1859 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1860 {
1861   if (NULL == cp_map)
1862     return;                     /* already cleaned up */
1863   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client,
1864                                          (void *) lc);
1865 }
1866
1867
1868 /* end of gnunet-service-fs_cp.c */