- htons => htonl
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_load_lib.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
31 #include "gnunet-service-fs_pe.h"
32 #include "gnunet-service-fs_pr.h"
33 #include "gnunet-service-fs_push.h"
34
35
36 /**
37  * Ratio for moving average delay calculation.  The previous
38  * average goes in with a factor of (n-1) into the calculation.
39  * Must be > 0.
40  */
41 #define RUNAVG_DELAY_N 16
42
43 /**
44  * How often do we flush respect values to disk?
45  */
46 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
47
48 /**
49  * After how long do we discard a reply?
50  */
51 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
52
53 /**
54  * Collect an instane number of statistics?  May cause excessive IPC.
55  */
56 #define INSANE_STATISTICS GNUNET_NO
57
58
59 /**
60  * Handle to cancel a transmission request.
61  */
62 struct GSF_PeerTransmitHandle
63 {
64
65   /**
66    * Kept in a doubly-linked list.
67    */
68   struct GSF_PeerTransmitHandle *next;
69
70   /**
71    * Kept in a doubly-linked list.
72    */
73   struct GSF_PeerTransmitHandle *prev;
74
75   /**
76    * Time when this transmission request was issued.
77    */
78   struct GNUNET_TIME_Absolute transmission_request_start_time;
79
80   /**
81    * Timeout for this request.
82    */
83   struct GNUNET_TIME_Absolute timeout;
84
85   /**
86    * Task called on timeout, or 0 for none.
87    */
88   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
89
90   /**
91    * Function to call to get the actual message.
92    */
93   GSF_GetMessageCallback gmc;
94
95   /**
96    * Peer this request targets.
97    */
98   struct GSF_ConnectedPeer *cp;
99
100   /**
101    * Closure for @e gmc.
102    */
103   void *gmc_cls;
104
105   /**
106    * Size of the message to be transmitted.
107    */
108   size_t size;
109
110   /**
111    * #GNUNET_YES if this is a query, #GNUNET_NO for content.
112    */
113   int is_query;
114
115   /**
116    * Did we get a reservation already?
117    */
118   int was_reserved;
119
120   /**
121    * Priority of this request.
122    */
123   uint32_t priority;
124
125 };
126
127
128 /**
129  * Handle for an entry in our delay list.
130  */
131 struct GSF_DelayedHandle
132 {
133
134   /**
135    * Kept in a doubly-linked list.
136    */
137   struct GSF_DelayedHandle *next;
138
139   /**
140    * Kept in a doubly-linked list.
141    */
142   struct GSF_DelayedHandle *prev;
143
144   /**
145    * Peer this transmission belongs to.
146    */
147   struct GSF_ConnectedPeer *cp;
148
149   /**
150    * The PUT that was delayed.
151    */
152   struct PutMessage *pm;
153
154   /**
155    * Task for the delay.
156    */
157   GNUNET_SCHEDULER_TaskIdentifier delay_task;
158
159   /**
160    * Size of the message.
161    */
162   size_t msize;
163
164 };
165
166
167 /**
168  * Information per peer and request.
169  */
170 struct PeerRequest
171 {
172
173   /**
174    * Handle to generic request.
175    */
176   struct GSF_PendingRequest *pr;
177
178   /**
179    * Handle to specific peer.
180    */
181   struct GSF_ConnectedPeer *cp;
182
183   /**
184    * Task for asynchronous stopping of this request.
185    */
186   GNUNET_SCHEDULER_TaskIdentifier kill_task;
187
188 };
189
190
191 /**
192  * A connected peer.
193  */
194 struct GSF_ConnectedPeer
195 {
196
197   /**
198    * Performance data for this peer.
199    */
200   struct GSF_PeerPerformanceData ppd;
201
202   /**
203    * Time until when we blocked this peer from migrating
204    * data to us.
205    */
206   struct GNUNET_TIME_Absolute last_migration_block;
207
208   /**
209    * Task scheduled to revive migration to this peer.
210    */
211   GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
212
213   /**
214    * Messages (replies, queries, content migration) we would like to
215    * send to this peer in the near future.  Sorted by priority, head.
216    */
217   struct GSF_PeerTransmitHandle *pth_head;
218
219   /**
220    * Messages (replies, queries, content migration) we would like to
221    * send to this peer in the near future.  Sorted by priority, tail.
222    */
223   struct GSF_PeerTransmitHandle *pth_tail;
224
225   /**
226    * Messages (replies, queries, content migration) we would like to
227    * send to this peer in the near future.  Sorted by priority, head.
228    */
229   struct GSF_DelayedHandle *delayed_head;
230
231   /**
232    * Messages (replies, queries, content migration) we would like to
233    * send to this peer in the near future.  Sorted by priority, tail.
234    */
235   struct GSF_DelayedHandle *delayed_tail;
236
237   /**
238    * Migration stop message in our queue, or NULL if we have none pending.
239    */
240   struct GSF_PeerTransmitHandle *migration_pth;
241
242   /**
243    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
244    */
245   struct GNUNET_ATS_ReservationContext *rc;
246
247   /**
248    * Task scheduled if we need to retry bandwidth reservation later.
249    */
250   GNUNET_SCHEDULER_TaskIdentifier rc_delay_task;
251
252   /**
253    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
254    */
255   struct GNUNET_CONTAINER_MultiHashMap *request_map;
256
257   /**
258    * Handle for an active request for transmission to this
259    * peer, or NULL (if core queue was full).
260    */
261   struct GNUNET_CORE_TransmitHandle *cth;
262
263   /**
264    * Increase in traffic preference still to be submitted
265    * to the core service for this peer.
266    */
267   uint64_t inc_preference;
268
269   /**
270    * Set to 1 if we're currently in the process of calling
271    * 'GNUNET_CORE_notify_transmit_ready' (so while cth is
272    * NULL, we should not call notify_transmit_ready for this
273    * handle right now).
274    */
275   unsigned int cth_in_progress;
276
277   /**
278    * Respect rating for this peer on disk.
279    */
280   uint32_t disk_respect;
281
282   /**
283    * Which offset in "last_p2p_replies" will be updated next?
284    * (we go round-robin).
285    */
286   unsigned int last_p2p_replies_woff;
287
288   /**
289    * Which offset in "last_client_replies" will be updated next?
290    * (we go round-robin).
291    */
292   unsigned int last_client_replies_woff;
293
294   /**
295    * Current offset into 'last_request_times' ring buffer.
296    */
297   unsigned int last_request_times_off;
298
299   /**
300    * GNUNET_YES if we did successfully reserve 32k bandwidth,
301    * GNUNET_NO if not.
302    */
303   int did_reserve;
304
305 };
306
307
308 /**
309  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
310  */
311 static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
312
313 /**
314  * Where do we store respect information?
315  */
316 static char *respectDirectory;
317
318
319 /**
320  * Get the filename under which we would store respect
321  * for the given peer.
322  *
323  * @param id peer to get the filename for
324  * @return filename of the form DIRECTORY/PEERID
325  */
326 static char *
327 get_respect_filename (const struct GNUNET_PeerIdentity *id)
328 {
329   char *fn;
330
331   GNUNET_asprintf (&fn,
332                    "%s%s%s",
333                    respectDirectory,
334                    DIR_SEPARATOR_STR,
335                    GNUNET_i2s_full (id));
336   return fn;
337 }
338
339
340 /**
341  * Update the latency information kept for the given peer.
342  *
343  * @param id peer record to update
344  * @param latency current latency value
345  */
346 void
347 GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
348                           struct GNUNET_TIME_Relative latency)
349 {
350   struct GSF_ConnectedPeer *cp;
351
352   cp = GSF_peer_get_ (id);
353   if (NULL == cp)
354     return; /* we're not yet connected at the core level, ignore */
355   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay, latency);
356   /* LATER: merge atsi into cp's performance data (if we ever care...) */
357 }
358
359
360 /**
361  * Return the performance data record for the given peer
362  *
363  * @param cp peer to query
364  * @return performance data record for the peer
365  */
366 struct GSF_PeerPerformanceData *
367 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
368 {
369   return &cp->ppd;
370 }
371
372
373 /**
374  * Core is ready to transmit to a peer, get the message.
375  *
376  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
377  * @param size number of bytes core is willing to take
378  * @param buf where to copy the message
379  * @return number of bytes copied to buf
380  */
381 static size_t
382 peer_transmit_ready_cb (void *cls, size_t size, void *buf);
383
384
385 /**
386  * Function called by core upon success or failure of our bandwidth reservation request.
387  *
388  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
389  * @param peer identifies the peer
390  * @param amount set to the amount that was actually reserved or unreserved;
391  *               either the full requested amount or zero (no partial reservations)
392  * @param res_delay if the reservation could not be satisfied (amount was 0), how
393  *        long should the client wait until re-trying?
394  */
395 static void
396 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
397                       int32_t amount, struct GNUNET_TIME_Relative res_delay);
398
399
400 /**
401  * If ready (bandwidth reserved), try to schedule transmission via
402  * core for the given handle.
403  *
404  * @param pth transmission handle to schedule
405  */
406 static void
407 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
408 {
409   struct GSF_ConnectedPeer *cp;
410   struct GNUNET_PeerIdentity target;
411
412   cp = pth->cp;
413   if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
414     return;                     /* already done */
415   GNUNET_assert (0 != cp->ppd.pid);
416   GNUNET_PEER_resolve (cp->ppd.pid, &target);
417
418   if (0 != cp->inc_preference)
419   {
420     GNUNET_ATS_performance_change_preference (GSF_ats, &target, GNUNET_ATS_PREFERENCE_BANDWIDTH,
421                                   (double) cp->inc_preference,
422                                   GNUNET_ATS_PREFERENCE_END);
423     cp->inc_preference = 0;
424   }
425
426   if ((GNUNET_YES == pth->is_query) && (GNUNET_YES != pth->was_reserved))
427   {
428     /* query, need reservation */
429     if (GNUNET_YES != cp->did_reserve)
430       return;                   /* not ready */
431     cp->did_reserve = GNUNET_NO;
432     /* reservation already done! */
433     pth->was_reserved = GNUNET_YES;
434     cp->rc =
435         GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
436                                       &ats_reserve_callback, cp);
437     return;
438   }
439   GNUNET_assert (NULL == cp->cth);
440   cp->cth_in_progress++;
441   cp->cth =
442     GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, pth->priority,
443                                        GNUNET_TIME_absolute_get_remaining
444                                        (pth->timeout), &target, pth->size,
445                                        &peer_transmit_ready_cb, cp);
446   GNUNET_assert (NULL != cp->cth);
447   GNUNET_assert (0 < cp->cth_in_progress--);
448 }
449
450
451 /**
452  * Core is ready to transmit to a peer, get the message.
453  *
454  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
455  * @param size number of bytes core is willing to take
456  * @param buf where to copy the message
457  * @return number of bytes copied to buf
458  */
459 static size_t
460 peer_transmit_ready_cb (void *cls, size_t size, void *buf)
461 {
462   struct GSF_ConnectedPeer *cp = cls;
463   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
464   struct GSF_PeerTransmitHandle *pos;
465   size_t ret;
466
467   cp->cth = NULL;
468   if (NULL == pth)
469     return 0;
470   if (pth->size > size)
471   {
472     schedule_transmission (pth);
473     return 0;
474   }
475   if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
476   {
477     GNUNET_SCHEDULER_cancel (pth->timeout_task);
478     pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
479   }
480   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
481   if (GNUNET_YES == pth->is_query)
482   {
483     cp->ppd.last_request_times[(cp->last_request_times_off++) %
484                                MAX_QUEUE_PER_PEER] =
485         GNUNET_TIME_absolute_get ();
486     GNUNET_assert (0 < cp->ppd.pending_queries--);
487   }
488   else if (GNUNET_NO == pth->is_query)
489   {
490     GNUNET_assert (0 < cp->ppd.pending_replies--);
491   }
492   GNUNET_LOAD_update (cp->ppd.transmission_delay,
493                       GNUNET_TIME_absolute_get_duration
494                       (pth->transmission_request_start_time).rel_value_us);
495   ret = pth->gmc (pth->gmc_cls, size, buf);
496   if (NULL != (pos = cp->pth_head))
497   {
498     GNUNET_assert (pos != pth);
499     schedule_transmission (pos);
500   }
501   GNUNET_free (pth);
502   return ret;
503 }
504
505
506 /**
507  * (re)try to reserve bandwidth from the given peer.
508  *
509  * @param cls the 'struct GSF_ConnectedPeer' to reserve from
510  * @param tc scheduler context
511  */
512 static void
513 retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
514 {
515   struct GSF_ConnectedPeer *cp = cls;
516   struct GNUNET_PeerIdentity target;
517
518   GNUNET_PEER_resolve (cp->ppd.pid, &target);
519   cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
520   cp->rc =
521     GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
522                                   &ats_reserve_callback, cp);
523 }
524
525
526 /**
527  * Function called by core upon success or failure of our bandwidth reservation request.
528  *
529  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
530  * @param peer identifies the peer
531  * @param amount set to the amount that was actually reserved or unreserved;
532  *               either the full requested amount or zero (no partial reservations)
533  * @param res_delay if the reservation could not be satisfied (amount was 0), how
534  *        long should the client wait until re-trying?
535  */
536 static void
537 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
538                       int32_t amount, struct GNUNET_TIME_Relative res_delay)
539 {
540   struct GSF_ConnectedPeer *cp = cls;
541   struct GSF_PeerTransmitHandle *pth;
542
543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
544               "Reserved %d bytes / need to wait %s for reservation\n",
545               (int) amount,
546               GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
547   cp->rc = NULL;
548   if (0 == amount)
549   {
550     cp->rc_delay_task =
551         GNUNET_SCHEDULER_add_delayed (res_delay, &retry_reservation, cp);
552     return;
553   }
554   cp->did_reserve = GNUNET_YES;
555   pth = cp->pth_head;
556   if ((NULL != pth) && (NULL == cp->cth) && (0 == cp->cth_in_progress))
557   {
558     /* reservation success, try transmission now! */
559     cp->cth_in_progress++;
560     cp->cth =
561         GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, pth->priority,
562                                            GNUNET_TIME_absolute_get_remaining
563                                            (pth->timeout), peer, pth->size,
564                                            &peer_transmit_ready_cb, cp);
565     GNUNET_assert (NULL != cp->cth);
566     GNUNET_assert (0 < cp->cth_in_progress--);
567   }
568 }
569
570
571 /**
572  * A peer connected to us.  Setup the connected peer
573  * records.
574  *
575  * @param peer identity of peer that connected
576  * @return handle to connected peer entry
577  */
578 struct GSF_ConnectedPeer *
579 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer)
580 {
581   struct GSF_ConnectedPeer *cp;
582   char *fn;
583   uint32_t respect;
584
585   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to peer %s\n",
586               GNUNET_i2s (peer));
587   cp = GNUNET_new (struct GSF_ConnectedPeer);
588   cp->ppd.pid = GNUNET_PEER_intern (peer);
589   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
590   cp->rc =
591       GNUNET_ATS_reserve_bandwidth (GSF_ats, peer, DBLOCK_SIZE,
592                                     &ats_reserve_callback, cp);
593   fn = get_respect_filename (peer);
594   if ((GNUNET_YES == GNUNET_DISK_file_test (fn)) &&
595       (sizeof (respect) == GNUNET_DISK_fn_read (fn, &respect, sizeof (respect))))
596     cp->disk_respect = cp->ppd.respect = ntohl (respect);
597   GNUNET_free (fn);
598   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
599   GNUNET_break (GNUNET_OK ==
600                 GNUNET_CONTAINER_multipeermap_put (cp_map,
601                                                    GSF_connected_peer_get_identity2_ (cp),
602                                                    cp,
603                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
604   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
605                          GNUNET_CONTAINER_multipeermap_size (cp_map),
606                          GNUNET_NO);
607   GSF_push_start_ (cp);
608   return cp;
609 }
610
611
612 /**
613  * It may be time to re-start migrating content to this
614  * peer.  Check, and if so, restart migration.
615  *
616  * @param cls the 'struct GSF_ConnectedPeer'
617  * @param tc scheduler context
618  */
619 static void
620 revive_migration (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
621 {
622   struct GSF_ConnectedPeer *cp = cls;
623   struct GNUNET_TIME_Relative bt;
624
625   cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
626   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
627   if (0 != bt.rel_value_us)
628   {
629     /* still time left... */
630     cp->mig_revive_task =
631         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
632     return;
633   }
634   GSF_push_start_ (cp);
635 }
636
637
638 /**
639  * Get a handle for a connected peer.
640  *
641  * @param peer peer's identity
642  * @return NULL if the peer is not currently connected
643  */
644 struct GSF_ConnectedPeer *
645 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
646 {
647   if (NULL == cp_map)
648     return NULL;
649   return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
650 }
651
652
653 /**
654  * Handle P2P "MIGRATION_STOP" message.
655  *
656  * @param cls closure, always NULL
657  * @param other the other peer involved (sender or receiver, NULL
658  *        for loopback messages where we are both sender and receiver)
659  * @param message the actual message
660  * @return GNUNET_OK to keep the connection open,
661  *         GNUNET_SYSERR to close it (signal serious error)
662  */
663 int
664 GSF_handle_p2p_migration_stop_ (void *cls,
665                                 const struct GNUNET_PeerIdentity *other,
666                                 const struct GNUNET_MessageHeader *message)
667 {
668   struct GSF_ConnectedPeer *cp;
669   const struct MigrationStopMessage *msm;
670   struct GNUNET_TIME_Relative bt;
671
672   msm = (const struct MigrationStopMessage *) message;
673   cp = GSF_peer_get_ (other);
674   if (NULL == cp)
675   {
676     GNUNET_break (0);
677     return GNUNET_OK;
678   }
679   GNUNET_STATISTICS_update (GSF_stats,
680                             gettext_noop ("# migration stop messages received"),
681                             1, GNUNET_NO);
682   bt = GNUNET_TIME_relative_ntoh (msm->duration);
683   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
684               _("Migration of content to peer `%s' blocked for %s\n"),
685               GNUNET_i2s (other),
686               GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
687   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
688   if (GNUNET_SCHEDULER_NO_TASK == cp->mig_revive_task)
689   {
690     GSF_push_stop_ (cp);
691     cp->mig_revive_task =
692         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
693   }
694   return GNUNET_OK;
695 }
696
697
698 /**
699  * Copy reply and free put message.
700  *
701  * @param cls the 'struct PutMessage'
702  * @param buf_size number of bytes available in buf
703  * @param buf where to copy the message, NULL on error (peer disconnect)
704  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
705  */
706 static size_t
707 copy_reply (void *cls, size_t buf_size, void *buf)
708 {
709   struct PutMessage *pm = cls;
710   size_t size;
711
712   if (NULL != buf)
713   {
714     GNUNET_assert (buf_size >= ntohs (pm->header.size));
715     size = ntohs (pm->header.size);
716     memcpy (buf, pm, size);
717     GNUNET_STATISTICS_update (GSF_stats,
718                               gettext_noop
719                               ("# replies transmitted to other peers"), 1,
720                               GNUNET_NO);
721   }
722   else
723   {
724     size = 0;
725     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies dropped"), 1,
726                               GNUNET_NO);
727   }
728   GNUNET_free (pm);
729   return size;
730 }
731
732
733 /**
734  * Free resources associated with the given peer request.
735  *
736  * @param peerreq request to free
737  * @param query associated key for the request
738  */
739 static void
740 free_pending_request (struct PeerRequest *peerreq,
741                       const struct GNUNET_HashCode *query)
742 {
743   struct GSF_ConnectedPeer *cp = peerreq->cp;
744
745   if (GNUNET_SCHEDULER_NO_TASK != peerreq->kill_task)
746   {
747     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
748     peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
749   }
750   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
751                             -1, GNUNET_NO);
752   GNUNET_break (GNUNET_YES ==
753                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
754                                                       query, peerreq));
755   GNUNET_free (peerreq);
756 }
757
758
759 /**
760  * Cancel all requests associated with the peer.
761  *
762  * @param cls unused
763  * @param query hash code of the request
764  * @param value the 'struct GSF_PendingRequest'
765  * @return GNUNET_YES (continue to iterate)
766  */
767 static int
768 cancel_pending_request (void *cls, const struct GNUNET_HashCode * query, void *value)
769 {
770   struct PeerRequest *peerreq = value;
771   struct GSF_PendingRequest *pr = peerreq->pr;
772   struct GSF_PendingRequestData *prd;
773
774   prd = GSF_pending_request_get_data_ (pr);
775   GSF_pending_request_cancel_ (pr, GNUNET_NO);
776   free_pending_request (peerreq, &prd->query);
777   return GNUNET_OK;
778 }
779
780
781 /**
782  * Free the given request.
783  *
784  * @param cls the request to free
785  * @param tc task context
786  */
787 static void
788 peer_request_destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
789 {
790   struct PeerRequest *peerreq = cls;
791   struct GSF_PendingRequest *pr = peerreq->pr;
792   struct GSF_PendingRequestData *prd;
793
794   peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
795   prd = GSF_pending_request_get_data_ (pr);
796   cancel_pending_request (NULL, &prd->query, peerreq);
797 }
798
799
800 /**
801  * The artificial delay is over, transmit the message now.
802  *
803  * @param cls the 'struct GSF_DelayedHandle' with the message
804  * @param tc scheduler context
805  */
806 static void
807 transmit_delayed_now (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
808 {
809   struct GSF_DelayedHandle *dh = cls;
810   struct GSF_ConnectedPeer *cp = dh->cp;
811
812   GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
813   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
814   {
815     GNUNET_free (dh->pm);
816     GNUNET_free (dh);
817     return;
818   }
819   (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
820                              dh->msize, &copy_reply, dh->pm);
821   GNUNET_free (dh);
822 }
823
824
825 /**
826  * Get the randomized delay a response should be subjected to.
827  *
828  * @return desired delay
829  */
830 static struct GNUNET_TIME_Relative
831 get_randomized_delay ()
832 {
833   struct GNUNET_TIME_Relative ret;
834
835   ret =
836       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
837                                      GNUNET_CRYPTO_random_u32
838                                      (GNUNET_CRYPTO_QUALITY_WEAK,
839                                       2 * GSF_avg_latency.rel_value_us + 1));
840 #if INSANE_STATISTICS
841   GNUNET_STATISTICS_update (GSF_stats,
842                             gettext_noop
843                             ("# artificial delays introduced (ms)"),
844                             ret.rel_value_us / 1000LL, GNUNET_NO);
845 #endif
846   return ret;
847 }
848
849
850 /**
851  * Handle a reply to a pending request.  Also called if a request
852  * expires (then with data == NULL).  The handler may be called
853  * many times (depending on the request type), but will not be
854  * called during or after a call to GSF_pending_request_cancel
855  * and will also not be called anymore after a call signalling
856  * expiration.
857  *
858  * @param cls 'struct PeerRequest' this is an answer for
859  * @param eval evaluation of the result
860  * @param pr handle to the original pending request
861  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
862  * @param expiration when does 'data' expire?
863  * @param last_transmission when did we last transmit a request for this block
864  * @param type type of the block
865  * @param data response data, NULL on request expiration
866  * @param data_len number of bytes in data
867  */
868 static void
869 handle_p2p_reply (void *cls, enum GNUNET_BLOCK_EvaluationResult eval,
870                   struct GSF_PendingRequest *pr, uint32_t reply_anonymity_level,
871                   struct GNUNET_TIME_Absolute expiration,
872                   struct GNUNET_TIME_Absolute last_transmission,
873                   enum GNUNET_BLOCK_Type type, const void *data,
874                   size_t data_len)
875 {
876   struct PeerRequest *peerreq = cls;
877   struct GSF_ConnectedPeer *cp = peerreq->cp;
878   struct GSF_PendingRequestData *prd;
879   struct PutMessage *pm;
880   size_t msize;
881
882   GNUNET_assert (data_len + sizeof (struct PutMessage) <
883                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
884   GNUNET_assert (peerreq->pr == pr);
885   prd = GSF_pending_request_get_data_ (pr);
886   if (NULL == data)
887   {
888     free_pending_request (peerreq, &prd->query);
889     return;
890   }
891   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
892   if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
893   {
894     GNUNET_STATISTICS_update (GSF_stats,
895                               gettext_noop
896                               ("# replies dropped due to type mismatch"),
897                                 1, GNUNET_NO);
898     return;
899   }
900   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
901               "Transmitting result for query `%s' to peer\n",
902               GNUNET_h2s (&prd->query));
903   GNUNET_STATISTICS_update (GSF_stats,
904                             gettext_noop ("# replies received for other peers"),
905                             1, GNUNET_NO);
906   msize = sizeof (struct PutMessage) + data_len;
907   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
908   {
909     GNUNET_break (0);
910     return;
911   }
912   if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
913   {
914     if (reply_anonymity_level - 1 > GSF_cover_content_count)
915     {
916       GNUNET_STATISTICS_update (GSF_stats,
917                                 gettext_noop
918                                 ("# replies dropped due to insufficient cover traffic"),
919                                 1, GNUNET_NO);
920       return;
921     }
922     GSF_cover_content_count -= (reply_anonymity_level - 1);
923   }
924
925   pm = GNUNET_malloc (msize);
926   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
927   pm->header.size = htons (msize);
928   pm->type = htonl (type);
929   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
930   memcpy (&pm[1], data, data_len);
931   if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) &&
932       (GNUNET_YES == GSF_enable_randomized_delays))
933   {
934     struct GSF_DelayedHandle *dh;
935
936     dh = GNUNET_new (struct GSF_DelayedHandle);
937     dh->cp = cp;
938     dh->pm = pm;
939     dh->msize = msize;
940     GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
941     dh->delay_task =
942         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
943                                       &transmit_delayed_now, dh);
944   }
945   else
946   {
947     (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
948                                &copy_reply, pm);
949   }
950   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
951     return;
952   if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task)
953   {
954     GNUNET_STATISTICS_update (GSF_stats,
955                               gettext_noop
956                               ("# P2P searches destroyed due to ultimate reply"),
957                               1, GNUNET_NO);
958     peerreq->kill_task =
959         GNUNET_SCHEDULER_add_now (&peer_request_destroy, peerreq);
960   }
961 }
962
963
964 /**
965  * Increase the peer's respect by a value.
966  *
967  * @param cp which peer to change the respect value on
968  * @param value is the int value by which the
969  *  peer's credit is to be increased or decreased
970  * @returns the actual change in respect (positive or negative)
971  */
972 static int
973 change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
974 {
975   if (0 == value)
976     return 0;
977   GNUNET_assert (NULL != cp);
978   if (value > 0)
979   {
980     if (cp->ppd.respect + value < cp->ppd.respect)
981     {
982       value = UINT32_MAX - cp->ppd.respect;
983       cp->ppd.respect = UINT32_MAX;
984     }
985     else
986       cp->ppd.respect += value;
987   }
988   else
989   {
990     if (cp->ppd.respect < -value)
991     {
992       value = -cp->ppd.respect;
993       cp->ppd.respect = 0;
994     }
995     else
996       cp->ppd.respect += value;
997   }
998   return value;
999 }
1000
1001
1002 /**
1003  * We've received a request with the specified priority.  Bound it
1004  * according to how much we respect the given peer.
1005  *
1006  * @param prio_in requested priority
1007  * @param cp the peer making the request
1008  * @return effective priority
1009  */
1010 static int32_t
1011 bound_priority (uint32_t prio_in, struct GSF_ConnectedPeer *cp)
1012 {
1013 #define N ((double)128.0)
1014   uint32_t ret;
1015   double rret;
1016   int ld;
1017
1018   ld = GSF_test_get_load_too_high_ (0);
1019   if (GNUNET_SYSERR == ld)
1020   {
1021 #if INSANE_STATISTICS
1022     GNUNET_STATISTICS_update (GSF_stats,
1023                               gettext_noop
1024                               ("# requests done for free (low load)"), 1,
1025                               GNUNET_NO);
1026 #endif
1027     return 0;                   /* excess resources */
1028   }
1029   if (prio_in > INT32_MAX)
1030     prio_in = INT32_MAX;
1031   ret = -change_peer_respect (cp, -(int) prio_in);
1032   if (ret > 0)
1033   {
1034     if (ret > GSF_current_priorities + N)
1035       rret = GSF_current_priorities + N;
1036     else
1037       rret = ret;
1038     GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1039   }
1040   if ((GNUNET_YES == ld) && (ret > 0))
1041   {
1042     /* try with charging */
1043     ld = GSF_test_get_load_too_high_ (ret);
1044   }
1045   if (GNUNET_YES == ld)
1046   {
1047     GNUNET_STATISTICS_update (GSF_stats,
1048                               gettext_noop
1049                               ("# request dropped, priority insufficient"), 1,
1050                               GNUNET_NO);
1051     /* undo charge */
1052     change_peer_respect (cp, (int) ret);
1053     return -1;                  /* not enough resources */
1054   }
1055   else
1056   {
1057     GNUNET_STATISTICS_update (GSF_stats,
1058                               gettext_noop
1059                               ("# requests done for a price (normal load)"), 1,
1060                               GNUNET_NO);
1061   }
1062 #undef N
1063   return ret;
1064 }
1065
1066
1067 /**
1068  * The priority level imposes a bound on the maximum
1069  * value for the ttl that can be requested.
1070  *
1071  * @param ttl_in requested ttl
1072  * @param prio given priority
1073  * @return ttl_in if ttl_in is below the limit,
1074  *         otherwise the ttl-limit for the given priority
1075  */
1076 static int32_t
1077 bound_ttl (int32_t ttl_in, uint32_t prio)
1078 {
1079   unsigned long long allowed;
1080
1081   if (ttl_in <= 0)
1082     return ttl_in;
1083   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1084   if (ttl_in > allowed)
1085   {
1086     if (allowed >= (1 << 30))
1087       return 1 << 30;
1088     return allowed;
1089   }
1090   return ttl_in;
1091 }
1092
1093
1094 /**
1095  * Handle P2P "QUERY" message.  Creates the pending request entry
1096  * and sets up all of the data structures to that we will
1097  * process replies properly.  Does not initiate forwarding or
1098  * local database lookups.
1099  *
1100  * @param other the other peer involved (sender or receiver, NULL
1101  *        for loopback messages where we are both sender and receiver)
1102  * @param message the actual message
1103  * @return pending request handle, NULL on error
1104  */
1105 struct GSF_PendingRequest *
1106 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1107                        const struct GNUNET_MessageHeader *message)
1108 {
1109   struct PeerRequest *peerreq;
1110   struct GSF_PendingRequest *pr;
1111   struct GSF_PendingRequestData *prd;
1112   struct GSF_ConnectedPeer *cp;
1113   struct GSF_ConnectedPeer *cps;
1114   const struct GNUNET_PeerIdentity *target;
1115   enum GSF_PendingRequestOptions options;
1116   uint16_t msize;
1117   const struct GetMessage *gm;
1118   unsigned int bits;
1119   const struct GNUNET_PeerIdentity *opt;
1120   uint32_t bm;
1121   size_t bfsize;
1122   uint32_t ttl_decrement;
1123   int32_t priority;
1124   int32_t ttl;
1125   enum GNUNET_BLOCK_Type type;
1126   GNUNET_PEER_Id spid;
1127
1128   GNUNET_assert (other != NULL);
1129   msize = ntohs (message->size);
1130   if (msize < sizeof (struct GetMessage))
1131   {
1132     GNUNET_break_op (0);
1133     return NULL;
1134   }
1135   GNUNET_STATISTICS_update (GSF_stats,
1136                             gettext_noop
1137                             ("# GET requests received (from other peers)"), 1,
1138                             GNUNET_NO);
1139   gm = (const struct GetMessage *) message;
1140   type = ntohl (gm->type);
1141   bm = ntohl (gm->hash_bitmap);
1142   bits = 0;
1143   while (bm > 0)
1144   {
1145     if (1 == (bm & 1))
1146       bits++;
1147     bm >>= 1;
1148   }
1149   if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
1150   {
1151     GNUNET_break_op (0);
1152     return NULL;
1153   }
1154   opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1155   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
1156   /* bfsize must be power of 2, check! */
1157   if (0 != ((bfsize - 1) & bfsize))
1158   {
1159     GNUNET_break_op (0);
1160     return NULL;
1161   }
1162   GSF_cover_query_count++;
1163   bm = ntohl (gm->hash_bitmap);
1164   bits = 0;
1165   cps = GSF_peer_get_ (other);
1166   if (NULL == cps)
1167   {
1168     /* peer must have just disconnected */
1169     GNUNET_STATISTICS_update (GSF_stats,
1170                               gettext_noop
1171                               ("# requests dropped due to initiator not being connected"),
1172                               1, GNUNET_NO);
1173     return NULL;
1174   }
1175   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1176     cp = GSF_peer_get_ (&opt[bits++]);
1177   else
1178     cp = cps;
1179   if (NULL == cp)
1180   {
1181     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1182       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1183                   "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
1184                   GNUNET_i2s (&opt[bits - 1]));
1185
1186     else
1187       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1188                   "Failed to find peer `%4s' in connection set. Dropping query.\n",
1189                   GNUNET_i2s (other));
1190 #if INSANE_STATISTICS
1191     GNUNET_STATISTICS_update (GSF_stats,
1192                               gettext_noop
1193                               ("# requests dropped due to missing reverse route"),
1194                               1, GNUNET_NO);
1195 #endif
1196     return NULL;
1197   }
1198   /* note that we can really only check load here since otherwise
1199    * peers could find out that we are overloaded by not being
1200    * disconnected after sending us a malformed query... */
1201   priority = bound_priority (ntohl (gm->priority), cps);
1202   if (priority < 0)
1203   {
1204     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1205                 "Dropping query from `%s', this peer is too busy.\n",
1206                 GNUNET_i2s (other));
1207     return NULL;
1208   }
1209   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1210               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
1211               GNUNET_h2s (&gm->query),
1212               (unsigned int) type,
1213               GNUNET_i2s (other),
1214               (unsigned int) bm);
1215   target =
1216       (0 !=
1217        (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1218   options = GSF_PRO_DEFAULTS;
1219   spid = 0;
1220   if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
1221       || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1222           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
1223           GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1224   {
1225     /* don't have BW to send to peer, or would likely take longer than we have for it,
1226      * so at best indirect the query */
1227     priority = 0;
1228     options |= GSF_PRO_FORWARD_ONLY;
1229     spid = GNUNET_PEER_intern (other);
1230     GNUNET_assert (0 != spid);
1231   }
1232   ttl = bound_ttl (ntohl (gm->ttl), priority);
1233   /* decrement ttl (always) */
1234   ttl_decrement =
1235       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1236                                                     TTL_DECREMENT);
1237   if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
1238   {
1239     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1240                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1241                 GNUNET_i2s (other), ttl, ttl_decrement);
1242     GNUNET_STATISTICS_update (GSF_stats,
1243                               gettext_noop
1244                               ("# requests dropped due TTL underflow"), 1,
1245                               GNUNET_NO);
1246     /* integer underflow => drop (should be very rare)! */
1247     return NULL;
1248   }
1249   ttl -= ttl_decrement;
1250
1251   /* test if the request already exists */
1252   peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map, &gm->query);
1253   if (peerreq != NULL)
1254   {
1255     pr = peerreq->pr;
1256     prd = GSF_pending_request_get_data_ (pr);
1257     if (prd->type == type)
1258     {
1259       if (prd->ttl.abs_value_us >= GNUNET_TIME_absolute_get ().abs_value_us + ttl * 1000LL)
1260       {
1261         /* existing request has higher TTL, drop new one! */
1262         prd->priority += priority;
1263         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1264                     "Have existing request with higher TTL, dropping new request.\n",
1265                     GNUNET_i2s (other));
1266         GNUNET_STATISTICS_update (GSF_stats,
1267                                   gettext_noop
1268                                   ("# requests dropped due to higher-TTL request"),
1269                                   1, GNUNET_NO);
1270         return NULL;
1271       }
1272       /* existing request has lower TTL, drop old one! */
1273       priority += prd->priority;
1274       GSF_pending_request_cancel_ (pr, GNUNET_YES);
1275       free_pending_request (peerreq, &gm->query);
1276     }
1277   }
1278
1279   peerreq = GNUNET_new (struct PeerRequest);
1280   peerreq->cp = cp;
1281   pr = GSF_pending_request_create_ (options, type, &gm->query,
1282                                     target,
1283                                     (bfsize >
1284                                      0) ? (const char *) &opt[bits] : NULL,
1285                                     bfsize, ntohl (gm->filter_mutator),
1286                                     1 /* anonymity */ ,
1287                                     (uint32_t) priority, ttl, spid, GNUNET_PEER_intern (other), NULL, 0,        /* replies_seen */
1288                                     &handle_p2p_reply, peerreq);
1289   GNUNET_assert (NULL != pr);
1290   peerreq->pr = pr;
1291   GNUNET_break (GNUNET_OK ==
1292                 GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query,
1293                                                    peerreq,
1294                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1295   GNUNET_STATISTICS_update (GSF_stats,
1296                             gettext_noop
1297                             ("# P2P query messages received and processed"), 1,
1298                             GNUNET_NO);
1299   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
1300                             1, GNUNET_NO);
1301   return pr;
1302 }
1303
1304
1305 /**
1306  * Function called if there has been a timeout trying to satisfy
1307  * a transmission request.
1308  *
1309  * @param cls the 'struct GSF_PeerTransmitHandle' of the request
1310  * @param tc scheduler context
1311  */
1312 static void
1313 peer_transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1314 {
1315   struct GSF_PeerTransmitHandle *pth = cls;
1316   struct GSF_ConnectedPeer *cp;
1317
1318   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1319               "Timeout trying to transmit to other peer\n");
1320   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1321   cp = pth->cp;
1322   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1323   if (GNUNET_YES == pth->is_query)
1324     GNUNET_assert (0 < cp->ppd.pending_queries--);
1325   else if (GNUNET_NO == pth->is_query)
1326     GNUNET_assert (0 < cp->ppd.pending_replies--);
1327   GNUNET_LOAD_update (cp->ppd.transmission_delay, UINT64_MAX);
1328   if (NULL != cp->cth)
1329   {
1330     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1331     cp->cth = NULL;
1332   }
1333   pth->gmc (pth->gmc_cls, 0, NULL);
1334   GNUNET_assert (0 == cp->cth_in_progress);
1335   GNUNET_free (pth);
1336 }
1337
1338
1339 /**
1340  * Transmit a message to the given peer as soon as possible.
1341  * If the peer disconnects before the transmission can happen,
1342  * the callback is invoked with a `NULL` @a buffer.
1343  *
1344  * @param cp target peer
1345  * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1346  * @param priority how important is this request?
1347  * @param timeout when does this request timeout (call gmc with error)
1348  * @param size number of bytes we would like to send to the peer
1349  * @param gmc function to call to get the message
1350  * @param gmc_cls closure for @a gmc
1351  * @return handle to cancel request
1352  */
1353 struct GSF_PeerTransmitHandle *
1354 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, int is_query,
1355                     uint32_t priority, struct GNUNET_TIME_Relative timeout,
1356                     size_t size, GSF_GetMessageCallback gmc, void *gmc_cls)
1357 {
1358   struct GSF_PeerTransmitHandle *pth;
1359   struct GSF_PeerTransmitHandle *pos;
1360   struct GSF_PeerTransmitHandle *prev;
1361
1362   pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1363   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1364   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1365   pth->gmc = gmc;
1366   pth->gmc_cls = gmc_cls;
1367   pth->size = size;
1368   pth->is_query = is_query;
1369   pth->priority = priority;
1370   pth->cp = cp;
1371   /* insertion sort (by priority, descending) */
1372   prev = NULL;
1373   pos = cp->pth_head;
1374   while ((NULL != pos) && (pos->priority > priority))
1375   {
1376     prev = pos;
1377     pos = pos->next;
1378   }
1379   GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
1380   if (GNUNET_YES == is_query)
1381     cp->ppd.pending_queries++;
1382   else if (GNUNET_NO == is_query)
1383     cp->ppd.pending_replies++;
1384   pth->timeout_task =
1385       GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
1386   schedule_transmission (pth);
1387   return pth;
1388 }
1389
1390
1391 /**
1392  * Cancel an earlier request for transmission.
1393  *
1394  * @param pth request to cancel
1395  */
1396 void
1397 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1398 {
1399   struct GSF_ConnectedPeer *cp;
1400
1401   if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
1402   {
1403     GNUNET_SCHEDULER_cancel (pth->timeout_task);
1404     pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1405   }
1406   cp = pth->cp;
1407   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1408   if (GNUNET_YES == pth->is_query)
1409     GNUNET_assert (0 < cp->ppd.pending_queries--);
1410   else if (GNUNET_NO == pth->is_query)
1411     GNUNET_assert (0 < cp->ppd.pending_replies--);
1412   GNUNET_free (pth);
1413 }
1414
1415
1416 /**
1417  * Report on receiving a reply; update the performance record of the given peer.
1418  *
1419  * @param cp responding peer (will be updated)
1420  * @param request_time time at which the original query was transmitted
1421  * @param request_priority priority of the original request
1422  */
1423 void
1424 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1425                               struct GNUNET_TIME_Absolute request_time,
1426                               uint32_t request_priority)
1427 {
1428   struct GNUNET_TIME_Relative delay;
1429
1430   delay = GNUNET_TIME_absolute_get_duration (request_time);
1431   cp->ppd.avg_reply_delay.rel_value_us =
1432       (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
1433        delay.rel_value_us) / RUNAVG_DELAY_N;
1434   cp->ppd.avg_priority =
1435       (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1436        request_priority) / RUNAVG_DELAY_N;
1437 }
1438
1439
1440 /**
1441  * Report on receiving a reply in response to an initiating client.
1442  * Remember that this peer is good for this client.
1443  *
1444  * @param cp responding peer (will be updated)
1445  * @param initiator_client local client on responsible for query
1446  */
1447 void
1448 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1449                                    struct GSF_LocalClient *initiator_client)
1450 {
1451   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1452                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1453 }
1454
1455
1456 /**
1457  * Report on receiving a reply in response to an initiating peer.
1458  * Remember that this peer is good for this initiating peer.
1459  *
1460  * @param cp responding peer (will be updated)
1461  * @param initiator_peer other peer responsible for query
1462  */
1463 void
1464 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1465                                  const struct GSF_ConnectedPeer *initiator_peer)
1466 {
1467   unsigned int woff;
1468
1469   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1470   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1471   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1472   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1473   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1474 }
1475
1476
1477 /**
1478  * A peer disconnected from us.  Tear down the connected peer
1479  * record.
1480  *
1481  * @param cls unused
1482  * @param peer identity of peer that connected
1483  */
1484 void
1485 GSF_peer_disconnect_handler_ (void *cls, const struct GNUNET_PeerIdentity *peer)
1486 {
1487   struct GSF_ConnectedPeer *cp;
1488   struct GSF_PeerTransmitHandle *pth;
1489   struct GSF_DelayedHandle *dh;
1490
1491   cp = GSF_peer_get_ (peer);
1492   if (NULL == cp)
1493     return;                     /* must have been disconnect from core with
1494                                  * 'peer' == my_id, ignore */
1495   GNUNET_assert (GNUNET_YES ==
1496                  GNUNET_CONTAINER_multipeermap_remove (cp_map,
1497                                                        peer,
1498                                                        cp));
1499   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1500                          GNUNET_CONTAINER_multipeermap_size (cp_map),
1501                          GNUNET_NO);
1502   if (NULL != cp->migration_pth)
1503   {
1504     GSF_peer_transmit_cancel_ (cp->migration_pth);
1505     cp->migration_pth = NULL;
1506   }
1507   if (NULL != cp->rc)
1508   {
1509     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1510     cp->rc = NULL;
1511   }
1512   if (GNUNET_SCHEDULER_NO_TASK != cp->rc_delay_task)
1513   {
1514     GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1515     cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
1516   }
1517   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1518                                          &cancel_pending_request, cp);
1519   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1520   cp->request_map = NULL;
1521   GSF_plan_notify_peer_disconnect_ (cp);
1522   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1523   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1524   memset (cp->ppd.last_p2p_replies, 0, sizeof (cp->ppd.last_p2p_replies));
1525   GSF_push_stop_ (cp);
1526   if (NULL != cp->cth)
1527   {
1528     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1529     cp->cth = NULL;
1530   }
1531   GNUNET_assert (0 == cp->cth_in_progress);
1532   while (NULL != (pth = cp->pth_head))
1533   {
1534     if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1535     {
1536       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1537       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1538     }
1539     GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1540     pth->gmc (pth->gmc_cls, 0, NULL);
1541     GNUNET_free (pth);
1542   }
1543   while (NULL != (dh = cp->delayed_head))
1544   {
1545     GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
1546     GNUNET_SCHEDULER_cancel (dh->delay_task);
1547     GNUNET_free (dh->pm);
1548     GNUNET_free (dh);
1549   }
1550   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1551   if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1552   {
1553     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1554     cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
1555   }
1556   GNUNET_free (cp);
1557 }
1558
1559
1560 /**
1561  * Closure for 'call_iterator'.
1562  */
1563 struct IterationContext
1564 {
1565   /**
1566    * Function to call on each entry.
1567    */
1568   GSF_ConnectedPeerIterator it;
1569
1570   /**
1571    * Closure for 'it'.
1572    */
1573   void *it_cls;
1574 };
1575
1576
1577 /**
1578  * Function that calls the callback for each peer.
1579  *
1580  * @param cls the `struct IterationContext *`
1581  * @param key identity of the peer
1582  * @param value the `struct GSF_ConnectedPeer *`
1583  * @return #GNUNET_YES to continue iteration
1584  */
1585 static int
1586 call_iterator (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
1587 {
1588   struct IterationContext *ic = cls;
1589   struct GSF_ConnectedPeer *cp = value;
1590
1591   ic->it (ic->it_cls, (const struct GNUNET_PeerIdentity *) key, cp, &cp->ppd);
1592   return GNUNET_YES;
1593 }
1594
1595
1596 /**
1597  * Iterate over all connected peers.
1598  *
1599  * @param it function to call for each peer
1600  * @param it_cls closure for @a it
1601  */
1602 void
1603 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it, void *it_cls)
1604 {
1605   struct IterationContext ic;
1606
1607   ic.it = it;
1608   ic.it_cls = it_cls;
1609   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &call_iterator, &ic);
1610 }
1611
1612
1613 /**
1614  * Obtain the identity of a connected peer.
1615  *
1616  * @param cp peer to get identity of
1617  * @param id identity to set (written to)
1618  */
1619 void
1620 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1621                                   struct GNUNET_PeerIdentity *id)
1622 {
1623   GNUNET_assert (0 != cp->ppd.pid);
1624   GNUNET_PEER_resolve (cp->ppd.pid, id);
1625 }
1626
1627
1628 /**
1629  * Obtain the identity of a connected peer.
1630  *
1631  * @param cp peer to get identity of
1632  * @return reference to peer identity, valid until peer disconnects (!)
1633  */
1634 const struct GNUNET_PeerIdentity *
1635 GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1636 {
1637   GNUNET_assert (0 != cp->ppd.pid);
1638   return GNUNET_PEER_resolve2 (cp->ppd.pid);
1639 }
1640
1641
1642 /**
1643  * Assemble a migration stop message for transmission.
1644  *
1645  * @param cls the 'struct GSF_ConnectedPeer' to use
1646  * @param size number of bytes we're allowed to write to buf
1647  * @param buf where to copy the message
1648  * @return number of bytes copied to buf
1649  */
1650 static size_t
1651 create_migration_stop_message (void *cls, size_t size, void *buf)
1652 {
1653   struct GSF_ConnectedPeer *cp = cls;
1654   struct MigrationStopMessage msm;
1655
1656   cp->migration_pth = NULL;
1657   if (NULL == buf)
1658     return 0;
1659   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1660   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1661   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1662   msm.reserved = htonl (0);
1663   msm.duration =
1664       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1665                                  (cp->last_migration_block));
1666   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1667   GNUNET_STATISTICS_update (GSF_stats,
1668                             gettext_noop ("# migration stop messages sent"),
1669                             1, GNUNET_NO);
1670   return sizeof (struct MigrationStopMessage);
1671 }
1672
1673
1674 /**
1675  * Ask a peer to stop migrating data to us until the given point
1676  * in time.
1677  *
1678  * @param cp peer to ask
1679  * @param block_time until when to block
1680  */
1681 void
1682 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1683                            struct GNUNET_TIME_Absolute block_time)
1684 {
1685   if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1686   {
1687     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1688                 "Migration already blocked for another %s\n",
1689                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
1690                                                         (cp->last_migration_block), GNUNET_YES));
1691     return;                     /* already blocked */
1692   }
1693   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1694               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
1695                                                       GNUNET_YES));
1696   cp->last_migration_block = block_time;
1697   if (NULL != cp->migration_pth)
1698     GSF_peer_transmit_cancel_ (cp->migration_pth);
1699   cp->migration_pth =
1700       GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1701                           GNUNET_TIME_UNIT_FOREVER_REL,
1702                           sizeof (struct MigrationStopMessage),
1703                           &create_migration_stop_message, cp);
1704 }
1705
1706
1707 /**
1708  * Write peer-respect information to a file - flush the buffer entry!
1709  *
1710  * @param cls unused
1711  * @param key peer identity
1712  * @param value the 'struct GSF_ConnectedPeer' to flush
1713  * @return GNUNET_OK to continue iteration
1714  */
1715 static int
1716 flush_respect (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
1717 {
1718   struct GSF_ConnectedPeer *cp = value;
1719   char *fn;
1720   uint32_t respect;
1721   struct GNUNET_PeerIdentity pid;
1722
1723   if (cp->ppd.respect == cp->disk_respect)
1724     return GNUNET_OK;           /* unchanged */
1725   GNUNET_assert (0 != cp->ppd.pid);
1726   GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1727   fn = get_respect_filename (&pid);
1728   if (cp->ppd.respect == 0)
1729   {
1730     if ((0 != UNLINK (fn)) && (errno != ENOENT))
1731       GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1732                                 GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1733   }
1734   else
1735   {
1736     respect = htonl (cp->ppd.respect);
1737     if (sizeof (uint32_t) ==
1738         GNUNET_DISK_fn_write (fn, &respect, sizeof (uint32_t),
1739                               GNUNET_DISK_PERM_USER_READ |
1740                               GNUNET_DISK_PERM_USER_WRITE |
1741                               GNUNET_DISK_PERM_GROUP_READ |
1742                               GNUNET_DISK_PERM_OTHER_READ))
1743       cp->disk_respect = cp->ppd.respect;
1744   }
1745   GNUNET_free (fn);
1746   return GNUNET_OK;
1747 }
1748
1749
1750 /**
1751  * Notify core about a preference we have for the given peer
1752  * (to allocate more resources towards it).  The change will
1753  * be communicated the next time we reserve bandwidth with
1754  * core (not instantly).
1755  *
1756  * @param cp peer to reserve bandwidth from
1757  * @param pref preference change
1758  */
1759 void
1760 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1761                                        uint64_t pref)
1762 {
1763   cp->inc_preference += pref;
1764 }
1765
1766
1767 /**
1768  * Call this method periodically to flush respect information to disk.
1769  *
1770  * @param cls closure, not used
1771  * @param tc task context, not used
1772  */
1773 static void
1774 cron_flush_respect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1775 {
1776
1777   if (NULL == cp_map)
1778     return;
1779   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &flush_respect, NULL);
1780   if (NULL == tc)
1781     return;
1782   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1783     return;
1784   GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1785                                               GNUNET_SCHEDULER_PRIORITY_HIGH,
1786                                               &cron_flush_respect, NULL);
1787 }
1788
1789
1790 /**
1791  * Initialize peer management subsystem.
1792  */
1793 void
1794 GSF_connected_peer_init_ ()
1795 {
1796   cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1797   GNUNET_assert (GNUNET_OK ==
1798                  GNUNET_CONFIGURATION_get_value_filename (GSF_cfg, "fs",
1799                                                           "RESPECT",
1800                                                           &respectDirectory));
1801   GNUNET_break (GNUNET_OK == GNUNET_DISK_directory_create (respectDirectory));
1802   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1803                                       &cron_flush_respect, NULL);
1804 }
1805
1806
1807 /**
1808  * Iterator to free peer entries.
1809  *
1810  * @param cls closure, unused
1811  * @param key current key code
1812  * @param value value in the hash map (peer entry)
1813  * @return #GNUNET_YES (we should continue to iterate)
1814  */
1815 static int
1816 clean_peer (void *cls,
1817             const struct GNUNET_PeerIdentity *key,
1818             void *value)
1819 {
1820   GSF_peer_disconnect_handler_ (NULL, key);
1821   return GNUNET_YES;
1822 }
1823
1824
1825 /**
1826  * Shutdown peer management subsystem.
1827  */
1828 void
1829 GSF_connected_peer_done_ ()
1830 {
1831   cron_flush_respect (NULL, NULL);
1832   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_peer, NULL);
1833   GNUNET_CONTAINER_multipeermap_destroy (cp_map);
1834   cp_map = NULL;
1835   GNUNET_free (respectDirectory);
1836   respectDirectory = NULL;
1837 }
1838
1839
1840 /**
1841  * Iterator to remove references to LC entry.
1842  *
1843  * @param cls the 'struct GSF_LocalClient*' to look for
1844  * @param key current key code
1845  * @param value value in the hash map (peer entry)
1846  * @return #GNUNET_YES (we should continue to iterate)
1847  */
1848 static int
1849 clean_local_client (void *cls,
1850                     const struct GNUNET_PeerIdentity *key,
1851                     void *value)
1852 {
1853   const struct GSF_LocalClient *lc = cls;
1854   struct GSF_ConnectedPeer *cp = value;
1855   unsigned int i;
1856
1857   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1858     if (cp->ppd.last_client_replies[i] == lc)
1859       cp->ppd.last_client_replies[i] = NULL;
1860   return GNUNET_YES;
1861 }
1862
1863
1864 /**
1865  * Notification that a local client disconnected.  Clean up all of our
1866  * references to the given handle.
1867  *
1868  * @param lc handle to the local client (henceforth invalid)
1869  */
1870 void
1871 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1872 {
1873   if (NULL == cp_map)
1874     return;                     /* already cleaned up */
1875   GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client,
1876                                          (void *) lc);
1877 }
1878
1879
1880 /* end of gnunet-service-fs_cp.c */