doxygen
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet_ats_service.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
31 #include "gnunet-service-fs_pe.h"
32 #include "gnunet-service-fs_pr.h"
33 #include "gnunet-service-fs_push.h"
34
35
36 /**
37  * Ratio for moving average delay calculation.  The previous
38  * average goes in with a factor of (n-1) into the calculation.
39  * Must be > 0.
40  */
41 #define RUNAVG_DELAY_N 16
42
43 /**
44  * How often do we flush respect values to disk?
45  */
46 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
47
48 /**
49  * After how long do we discard a reply?
50  */
51 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
52
53 /**
54  * Collect an instane number of statistics?  May cause excessive IPC.
55  */
56 #define INSANE_STATISTICS GNUNET_NO
57
58
59 /**
60  * Handle to cancel a transmission request.
61  */
62 struct GSF_PeerTransmitHandle
63 {
64
65   /**
66    * Kept in a doubly-linked list.
67    */
68   struct GSF_PeerTransmitHandle *next;
69
70   /**
71    * Kept in a doubly-linked list.
72    */
73   struct GSF_PeerTransmitHandle *prev;
74
75   /**
76    * Time when this transmission request was issued.
77    */
78   struct GNUNET_TIME_Absolute transmission_request_start_time;
79
80   /**
81    * Timeout for this request.
82    */
83   struct GNUNET_TIME_Absolute timeout;
84
85   /**
86    * Task called on timeout, or 0 for none.
87    */
88   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
89
90   /**
91    * Function to call to get the actual message.
92    */
93   GSF_GetMessageCallback gmc;
94
95   /**
96    * Peer this request targets.
97    */
98   struct GSF_ConnectedPeer *cp;
99
100   /**
101    * Closure for 'gmc'.
102    */
103   void *gmc_cls;
104
105   /**
106    * Size of the message to be transmitted.
107    */
108   size_t size;
109
110   /**
111    * GNUNET_YES if this is a query, GNUNET_NO for content.
112    */
113   int is_query;
114
115   /**
116    * Did we get a reservation already?
117    */
118   int was_reserved;
119
120   /**
121    * Priority of this request.
122    */
123   uint32_t priority;
124
125 };
126
127
128 /**
129  * Handle for an entry in our delay list.
130  */
131 struct GSF_DelayedHandle
132 {
133
134   /**
135    * Kept in a doubly-linked list.
136    */
137   struct GSF_DelayedHandle *next;
138
139   /**
140    * Kept in a doubly-linked list.
141    */
142   struct GSF_DelayedHandle *prev;
143
144   /**
145    * Peer this transmission belongs to.
146    */
147   struct GSF_ConnectedPeer *cp;
148
149   /**
150    * The PUT that was delayed.
151    */
152   struct PutMessage *pm;
153
154   /**
155    * Task for the delay.
156    */
157   GNUNET_SCHEDULER_TaskIdentifier delay_task;
158
159   /**
160    * Size of the message.
161    */
162   size_t msize;
163
164 };
165
166
167 /**
168  * Information per peer and request.
169  */
170 struct PeerRequest
171 {
172
173   /**
174    * Handle to generic request.
175    */
176   struct GSF_PendingRequest *pr;
177
178   /**
179    * Handle to specific peer.
180    */
181   struct GSF_ConnectedPeer *cp;
182
183   /**
184    * Task for asynchronous stopping of this request.
185    */
186   GNUNET_SCHEDULER_TaskIdentifier kill_task;
187
188 };
189
190
191 /**
192  * A connected peer.
193  */
194 struct GSF_ConnectedPeer
195 {
196
197   /**
198    * Performance data for this peer.
199    */
200   struct GSF_PeerPerformanceData ppd;
201
202   /**
203    * Time until when we blocked this peer from migrating
204    * data to us.
205    */
206   struct GNUNET_TIME_Absolute last_migration_block;
207
208   /**
209    * Task scheduled to revive migration to this peer.
210    */
211   GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
212
213   /**
214    * Messages (replies, queries, content migration) we would like to
215    * send to this peer in the near future.  Sorted by priority, head.
216    */
217   struct GSF_PeerTransmitHandle *pth_head;
218
219   /**
220    * Messages (replies, queries, content migration) we would like to
221    * send to this peer in the near future.  Sorted by priority, tail.
222    */
223   struct GSF_PeerTransmitHandle *pth_tail;
224
225   /**
226    * Messages (replies, queries, content migration) we would like to
227    * send to this peer in the near future.  Sorted by priority, head.
228    */
229   struct GSF_DelayedHandle *delayed_head;
230
231   /**
232    * Messages (replies, queries, content migration) we would like to
233    * send to this peer in the near future.  Sorted by priority, tail.
234    */
235   struct GSF_DelayedHandle *delayed_tail;
236
237   /**
238    * Migration stop message in our queue, or NULL if we have none pending.
239    */
240   struct GSF_PeerTransmitHandle *migration_pth;
241
242   /**
243    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
244    */
245   struct GNUNET_ATS_ReservationContext *rc;
246
247   /**
248    * Task scheduled if we need to retry bandwidth reservation later.
249    */
250   GNUNET_SCHEDULER_TaskIdentifier rc_delay_task;
251
252   /**
253    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
254    */
255   struct GNUNET_CONTAINER_MultiHashMap *request_map;
256
257   /**
258    * Handle for an active request for transmission to this
259    * peer, or NULL (if core queue was full).
260    */
261   struct GNUNET_CORE_TransmitHandle *cth;
262
263   /**
264    * Increase in traffic preference still to be submitted
265    * to the core service for this peer.
266    */
267   uint64_t inc_preference;
268
269   /**
270    * Set to 1 if we're currently in the process of calling
271    * 'GNUNET_CORE_notify_transmit_ready' (so while cth is
272    * NULL, we should not call notify_transmit_ready for this
273    * handle right now).
274    */
275   unsigned int cth_in_progress;
276
277   /**
278    * Respect rating for this peer on disk.
279    */
280   uint32_t disk_respect;
281
282   /**
283    * Which offset in "last_p2p_replies" will be updated next?
284    * (we go round-robin).
285    */
286   unsigned int last_p2p_replies_woff;
287
288   /**
289    * Which offset in "last_client_replies" will be updated next?
290    * (we go round-robin).
291    */
292   unsigned int last_client_replies_woff;
293
294   /**
295    * Current offset into 'last_request_times' ring buffer.
296    */
297   unsigned int last_request_times_off;
298
299   /**
300    * GNUNET_YES if we did successfully reserve 32k bandwidth,
301    * GNUNET_NO if not.
302    */
303   int did_reserve;
304
305 };
306
307
308 /**
309  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
310  */
311 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
312
313 /**
314  * Where do we store respect information?
315  */
316 static char *respectDirectory;
317
318 /**
319  * Handle to ATS service.
320  */
321 static struct GNUNET_ATS_PerformanceHandle *ats;
322
323
324 /**
325  * Get the filename under which we would store respect
326  * for the given peer.
327  *
328  * @param id peer to get the filename for
329  * @return filename of the form DIRECTORY/PEERID
330  */
331 static char *
332 get_respect_filename (const struct GNUNET_PeerIdentity *id)
333 {
334   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
335   char *fn;
336
337   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
338   GNUNET_asprintf (&fn, "%s%s%s", respectDirectory, DIR_SEPARATOR_STR, &fil);
339   return fn;
340 }
341
342
343 /**
344  * Find latency information in 'atsi'.
345  *
346  * @param atsi performance data
347  * @param atsi_count number of records in 'atsi'
348  * @return connection latency
349  */
350 static struct GNUNET_TIME_Relative
351 get_latency (const struct GNUNET_ATS_Information *atsi, unsigned int atsi_count)
352 {
353   unsigned int i;
354
355   for (i = 0; i < atsi_count; i++)
356     if (ntohl (atsi->type) == GNUNET_ATS_QUALITY_NET_DELAY)
357       return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
358                                             ntohl (atsi->value));
359   return GNUNET_TIME_UNIT_SECONDS;
360 }
361
362
363 /**
364  * Update the performance information kept for the given peer.
365  *
366  * @param cp peer record to update
367  * @param atsi transport performance data
368  * @param atsi_count number of records in 'atsi'
369  */
370 static void
371 update_atsi (struct GSF_ConnectedPeer *cp,
372              const struct GNUNET_ATS_Information *atsi, unsigned int atsi_count)
373 {
374   struct GNUNET_TIME_Relative latency;
375
376   latency = get_latency (atsi, atsi_count);
377   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay, latency);
378   /* LATER: merge atsi into cp's performance data (if we ever care...) */
379 }
380
381
382 /**
383  * Return the performance data record for the given peer
384  *
385  * @param cp peer to query
386  * @return performance data record for the peer
387  */
388 struct GSF_PeerPerformanceData *
389 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
390 {
391   return &cp->ppd;
392 }
393
394
395 /**
396  * Core is ready to transmit to a peer, get the message.
397  *
398  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
399  * @param size number of bytes core is willing to take
400  * @param buf where to copy the message
401  * @return number of bytes copied to buf
402  */
403 static size_t
404 peer_transmit_ready_cb (void *cls, size_t size, void *buf);
405
406
407 /**
408  * Function called by core upon success or failure of our bandwidth reservation request.
409  *
410  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
411  * @param peer identifies the peer
412  * @param amount set to the amount that was actually reserved or unreserved;
413  *               either the full requested amount or zero (no partial reservations)
414  * @param res_delay if the reservation could not be satisfied (amount was 0), how
415  *        long should the client wait until re-trying?
416  */
417 static void
418 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
419                       int32_t amount, struct GNUNET_TIME_Relative res_delay);
420
421
422 /**
423  * If ready (bandwidth reserved), try to schedule transmission via
424  * core for the given handle.
425  *
426  * @param pth transmission handle to schedule
427  */
428 static void
429 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
430 {
431   struct GSF_ConnectedPeer *cp;
432   struct GNUNET_PeerIdentity target;
433
434   cp = pth->cp;
435   if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
436     return;                     /* already done */
437   GNUNET_assert (0 != cp->ppd.pid);
438   GNUNET_PEER_resolve (cp->ppd.pid, &target);
439
440   if (0 != cp->inc_preference)
441   {
442     GNUNET_ATS_change_preference (ats, &target, GNUNET_ATS_PREFERENCE_BANDWIDTH,
443                                   (double) cp->inc_preference,
444                                   GNUNET_ATS_PREFERENCE_END);
445     cp->inc_preference = 0;
446   }
447
448   if ((GNUNET_YES == pth->is_query) && (GNUNET_YES != pth->was_reserved))
449   {
450     /* query, need reservation */
451     if (GNUNET_YES != cp->did_reserve)
452       return;                   /* not ready */
453     cp->did_reserve = GNUNET_NO;
454     /* reservation already done! */
455     pth->was_reserved = GNUNET_YES;
456     cp->rc =
457         GNUNET_ATS_reserve_bandwidth (ats, &target, DBLOCK_SIZE,
458                                       &ats_reserve_callback, cp);
459     return;
460   }
461   GNUNET_assert (NULL == cp->cth);
462   cp->cth_in_progress++;
463   cp->cth =
464     GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, pth->priority,
465                                        GNUNET_TIME_absolute_get_remaining
466                                        (pth->timeout), &target, pth->size,
467                                        &peer_transmit_ready_cb, cp);
468   GNUNET_assert (NULL != cp->cth);
469   GNUNET_assert (0 < cp->cth_in_progress--);
470 }
471
472
473 /**
474  * Core is ready to transmit to a peer, get the message.
475  *
476  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
477  * @param size number of bytes core is willing to take
478  * @param buf where to copy the message
479  * @return number of bytes copied to buf
480  */
481 static size_t
482 peer_transmit_ready_cb (void *cls, size_t size, void *buf)
483 {
484   struct GSF_ConnectedPeer *cp = cls;
485   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
486   struct GSF_PeerTransmitHandle *pos;
487   size_t ret;
488
489   cp->cth = NULL;
490   if (NULL == pth)
491     return 0;
492   if (pth->size > size)
493   {
494     schedule_transmission (pth);
495     return 0;
496   }
497   if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
498   {
499     GNUNET_SCHEDULER_cancel (pth->timeout_task);
500     pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
501   }
502   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
503   if (GNUNET_YES == pth->is_query)
504   {
505     cp->ppd.last_request_times[(cp->last_request_times_off++) %
506                                MAX_QUEUE_PER_PEER] =
507         GNUNET_TIME_absolute_get ();
508     GNUNET_assert (0 < cp->ppd.pending_queries--);
509   }
510   else if (GNUNET_NO == pth->is_query)
511   {
512     GNUNET_assert (0 < cp->ppd.pending_replies--);
513   }
514   GNUNET_LOAD_update (cp->ppd.transmission_delay,
515                       GNUNET_TIME_absolute_get_duration
516                       (pth->transmission_request_start_time).rel_value);
517   ret = pth->gmc (pth->gmc_cls, size, buf);
518   if (NULL != (pos = cp->pth_head))
519   {
520     GNUNET_assert (pos != pth);
521     schedule_transmission (pos);
522   }
523   GNUNET_free (pth);
524   return ret;
525 }
526
527
528 /**
529  * (re)try to reserve bandwidth from the given peer.
530  *
531  * @param cls the 'struct GSF_ConnectedPeer' to reserve from
532  * @param tc scheduler context
533  */
534 static void
535 retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
536 {
537   struct GSF_ConnectedPeer *cp = cls;
538   struct GNUNET_PeerIdentity target;
539
540   GNUNET_PEER_resolve (cp->ppd.pid, &target);
541   cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
542   cp->rc =
543       GNUNET_ATS_reserve_bandwidth (ats, &target, DBLOCK_SIZE,
544                                     &ats_reserve_callback, cp);
545 }
546
547
548 /**
549  * Function called by core upon success or failure of our bandwidth reservation request.
550  *
551  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
552  * @param peer identifies the peer
553  * @param amount set to the amount that was actually reserved or unreserved;
554  *               either the full requested amount or zero (no partial reservations)
555  * @param res_delay if the reservation could not be satisfied (amount was 0), how
556  *        long should the client wait until re-trying?
557  */
558 static void
559 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
560                       int32_t amount, struct GNUNET_TIME_Relative res_delay)
561 {
562   struct GSF_ConnectedPeer *cp = cls;
563   struct GSF_PeerTransmitHandle *pth;
564
565   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
566               "Reserved %d bytes / need to wait %s for reservation\n",
567               (int) amount, 
568               GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
569   cp->rc = NULL;
570   if (0 == amount)
571   {
572     cp->rc_delay_task =
573         GNUNET_SCHEDULER_add_delayed (res_delay, &retry_reservation, cp);
574     return;
575   }
576   cp->did_reserve = GNUNET_YES;
577   pth = cp->pth_head;
578   if ((NULL != pth) && (NULL == cp->cth) && (0 == cp->cth_in_progress))
579   {
580     /* reservation success, try transmission now! */
581     cp->cth_in_progress++;
582     cp->cth =
583         GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, pth->priority,
584                                            GNUNET_TIME_absolute_get_remaining
585                                            (pth->timeout), peer, pth->size,
586                                            &peer_transmit_ready_cb, cp);
587     GNUNET_assert (NULL != cp->cth);
588     GNUNET_assert (0 < cp->cth_in_progress--);
589   }
590 }
591
592
593 /**
594  * A peer connected to us.  Setup the connected peer
595  * records.
596  *
597  * @param peer identity of peer that connected
598  * @param atsi performance data for the connection
599  * @param atsi_count number of records in 'atsi'
600  * @return handle to connected peer entry
601  */
602 struct GSF_ConnectedPeer *
603 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
604                            const struct GNUNET_ATS_Information *atsi,
605                            unsigned int atsi_count)
606 {
607   struct GSF_ConnectedPeer *cp;
608   char *fn;
609   uint32_t respect;
610
611   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to peer %s\n",
612               GNUNET_i2s (peer));
613   cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
614   cp->ppd.pid = GNUNET_PEER_intern (peer);
615   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
616   cp->rc =
617       GNUNET_ATS_reserve_bandwidth (ats, peer, DBLOCK_SIZE,
618                                     &ats_reserve_callback, cp);
619   fn = get_respect_filename (peer);
620   if ((GNUNET_YES == GNUNET_DISK_file_test (fn)) &&
621       (sizeof (respect) == GNUNET_DISK_fn_read (fn, &respect, sizeof (respect))))
622     cp->disk_respect = cp->ppd.respect = ntohl (respect);
623   GNUNET_free (fn);
624   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
625   GNUNET_break (GNUNET_OK ==
626                 GNUNET_CONTAINER_multihashmap_put (cp_map, 
627                                                    &GSF_connected_peer_get_identity2_ (cp)->hashPubKey,
628                                                    cp,
629                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
630   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
631                          GNUNET_CONTAINER_multihashmap_size (cp_map),
632                          GNUNET_NO);
633   update_atsi (cp, atsi, atsi_count);
634   GSF_push_start_ (cp);
635   return cp;
636 }
637
638
639 /**
640  * It may be time to re-start migrating content to this
641  * peer.  Check, and if so, restart migration.
642  *
643  * @param cls the 'struct GSF_ConnectedPeer'
644  * @param tc scheduler context
645  */
646 static void
647 revive_migration (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
648 {
649   struct GSF_ConnectedPeer *cp = cls;
650   struct GNUNET_TIME_Relative bt;
651
652   cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
653   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
654   if (0 != bt.rel_value)
655   {
656     /* still time left... */
657     cp->mig_revive_task =
658         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
659     return;
660   }
661   GSF_push_start_ (cp);
662 }
663
664
665 /**
666  * Get a handle for a connected peer.
667  *
668  * @param peer peer's identity
669  * @return NULL if the peer is not currently connected
670  */
671 struct GSF_ConnectedPeer *
672 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
673 {
674   if (NULL == cp_map)
675     return NULL;
676   return GNUNET_CONTAINER_multihashmap_get (cp_map, &peer->hashPubKey);
677 }
678
679
680 /**
681  * Handle P2P "MIGRATION_STOP" message.
682  *
683  * @param cls closure, always NULL
684  * @param other the other peer involved (sender or receiver, NULL
685  *        for loopback messages where we are both sender and receiver)
686  * @param message the actual message
687  * @return GNUNET_OK to keep the connection open,
688  *         GNUNET_SYSERR to close it (signal serious error)
689  */
690 int
691 GSF_handle_p2p_migration_stop_ (void *cls,
692                                 const struct GNUNET_PeerIdentity *other,
693                                 const struct GNUNET_MessageHeader *message)
694 {
695   struct GSF_ConnectedPeer *cp;
696   const struct MigrationStopMessage *msm;
697   struct GNUNET_TIME_Relative bt;
698
699   msm = (const struct MigrationStopMessage *) message;
700   cp = GSF_peer_get_ (other);
701   if (NULL == cp)
702   {
703     GNUNET_break (0);
704     return GNUNET_OK;
705   }
706   GNUNET_STATISTICS_update (GSF_stats,
707                             gettext_noop ("# migration stop messages received"),
708                             1, GNUNET_NO);
709   bt = GNUNET_TIME_relative_ntoh (msm->duration);
710   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
711               _("Migration of content to peer `%s' blocked for %s\n"),
712               GNUNET_i2s (other), 
713               GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
714   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
715   if (GNUNET_SCHEDULER_NO_TASK == cp->mig_revive_task)
716   {
717     GSF_push_stop_ (cp);
718     cp->mig_revive_task =
719         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
720   }
721   fprintf (stderr, "FIX ATS DATA: %s:%u!\n", __FILE__, __LINE__);
722   update_atsi (cp, NULL, 0);
723   return GNUNET_OK;
724 }
725
726
727 /**
728  * Copy reply and free put message.
729  *
730  * @param cls the 'struct PutMessage'
731  * @param buf_size number of bytes available in buf
732  * @param buf where to copy the message, NULL on error (peer disconnect)
733  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
734  */
735 static size_t
736 copy_reply (void *cls, size_t buf_size, void *buf)
737 {
738   struct PutMessage *pm = cls;
739   size_t size;
740
741   if (NULL != buf)
742   {
743     GNUNET_assert (buf_size >= ntohs (pm->header.size));
744     size = ntohs (pm->header.size);
745     memcpy (buf, pm, size);
746     GNUNET_STATISTICS_update (GSF_stats,
747                               gettext_noop
748                               ("# replies transmitted to other peers"), 1,
749                               GNUNET_NO);
750   }
751   else
752   {
753     size = 0;
754     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies dropped"), 1,
755                               GNUNET_NO);
756   }
757   GNUNET_free (pm);
758   return size;
759 }
760
761
762 /**
763  * Free resources associated with the given peer request.
764  *
765  * @param peerreq request to free
766  * @param query associated key for the request
767  */
768 static void
769 free_pending_request (struct PeerRequest *peerreq,
770                       const struct GNUNET_HashCode *query)
771 {
772   struct GSF_ConnectedPeer *cp = peerreq->cp;
773
774   if (GNUNET_SCHEDULER_NO_TASK != peerreq->kill_task)
775   {
776     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
777     peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
778   }
779   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
780                             -1, GNUNET_NO);
781   GNUNET_break (GNUNET_YES ==
782                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
783                                                       query, peerreq));
784   GNUNET_free (peerreq);
785 }
786
787
788 /**
789  * Cancel all requests associated with the peer.
790  *
791  * @param cls unused
792  * @param query hash code of the request
793  * @param value the 'struct GSF_PendingRequest'
794  * @return GNUNET_YES (continue to iterate)
795  */
796 static int
797 cancel_pending_request (void *cls, const struct GNUNET_HashCode * query, void *value)
798 {
799   struct PeerRequest *peerreq = value;
800   struct GSF_PendingRequest *pr = peerreq->pr;
801   struct GSF_PendingRequestData *prd;
802
803   prd = GSF_pending_request_get_data_ (pr);
804   GSF_pending_request_cancel_ (pr, GNUNET_NO);
805   free_pending_request (peerreq, &prd->query);
806   return GNUNET_OK;
807 }
808
809
810 /**
811  * Free the given request.
812  *
813  * @param cls the request to free
814  * @param tc task context
815  */
816 static void
817 peer_request_destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
818 {
819   struct PeerRequest *peerreq = cls;
820   struct GSF_PendingRequest *pr = peerreq->pr;
821   struct GSF_PendingRequestData *prd;
822
823   peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
824   prd = GSF_pending_request_get_data_ (pr);
825   cancel_pending_request (NULL, &prd->query, peerreq);
826 }
827
828
829 /**
830  * The artificial delay is over, transmit the message now.
831  *
832  * @param cls the 'struct GSF_DelayedHandle' with the message
833  * @param tc scheduler context
834  */
835 static void
836 transmit_delayed_now (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
837 {
838   struct GSF_DelayedHandle *dh = cls;
839   struct GSF_ConnectedPeer *cp = dh->cp;
840
841   GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
842   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
843   {
844     GNUNET_free (dh->pm);
845     GNUNET_free (dh);
846     return;
847   }
848   (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
849                              dh->msize, &copy_reply, dh->pm);
850   GNUNET_free (dh);
851 }
852
853
854 /**
855  * Get the randomized delay a response should be subjected to.
856  *
857  * @return desired delay
858  */
859 static struct GNUNET_TIME_Relative
860 get_randomized_delay ()
861 {
862   struct GNUNET_TIME_Relative ret;
863
864   ret =
865       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
866                                      GNUNET_CRYPTO_random_u32
867                                      (GNUNET_CRYPTO_QUALITY_WEAK,
868                                       2 * GSF_avg_latency.rel_value + 1));
869 #if INSANE_STATISTICS
870   GNUNET_STATISTICS_update (GSF_stats,
871                             gettext_noop
872                             ("# artificial delays introduced (ms)"),
873                             ret.rel_value, GNUNET_NO);
874 #endif
875   return ret;
876 }
877
878
879 /**
880  * Handle a reply to a pending request.  Also called if a request
881  * expires (then with data == NULL).  The handler may be called
882  * many times (depending on the request type), but will not be
883  * called during or after a call to GSF_pending_request_cancel
884  * and will also not be called anymore after a call signalling
885  * expiration.
886  *
887  * @param cls 'struct PeerRequest' this is an answer for
888  * @param eval evaluation of the result
889  * @param pr handle to the original pending request
890  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
891  * @param expiration when does 'data' expire?
892  * @param last_transmission when did we last transmit a request for this block
893  * @param type type of the block
894  * @param data response data, NULL on request expiration
895  * @param data_len number of bytes in data
896  */
897 static void
898 handle_p2p_reply (void *cls, enum GNUNET_BLOCK_EvaluationResult eval,
899                   struct GSF_PendingRequest *pr, uint32_t reply_anonymity_level,
900                   struct GNUNET_TIME_Absolute expiration,
901                   struct GNUNET_TIME_Absolute last_transmission,
902                   enum GNUNET_BLOCK_Type type, const void *data,
903                   size_t data_len)
904 {
905   struct PeerRequest *peerreq = cls;
906   struct GSF_ConnectedPeer *cp = peerreq->cp;
907   struct GSF_PendingRequestData *prd;
908   struct PutMessage *pm;
909   size_t msize;
910
911   GNUNET_assert (data_len + sizeof (struct PutMessage) <
912                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
913   GNUNET_assert (peerreq->pr == pr);
914   prd = GSF_pending_request_get_data_ (pr);
915   if (NULL == data)
916   {
917     free_pending_request (peerreq, &prd->query);
918     return;
919   }
920   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
921   if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
922   {
923     GNUNET_STATISTICS_update (GSF_stats,
924                               gettext_noop
925                               ("# replies dropped due to type mismatch"),
926                                 1, GNUNET_NO);
927     return;
928   }
929   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930               "Transmitting result for query `%s' to peer\n",
931               GNUNET_h2s (&prd->query));
932   GNUNET_STATISTICS_update (GSF_stats,
933                             gettext_noop ("# replies received for other peers"),
934                             1, GNUNET_NO);
935   msize = sizeof (struct PutMessage) + data_len;
936   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
937   {
938     GNUNET_break (0);
939     return;
940   }
941   if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
942   {
943     if (reply_anonymity_level - 1 > GSF_cover_content_count)
944     {
945       GNUNET_STATISTICS_update (GSF_stats,
946                                 gettext_noop
947                                 ("# replies dropped due to insufficient cover traffic"),
948                                 1, GNUNET_NO);
949       return;
950     }
951     GSF_cover_content_count -= (reply_anonymity_level - 1);
952   }
953
954   pm = GNUNET_malloc (msize);
955   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
956   pm->header.size = htons (msize);
957   pm->type = htonl (type);
958   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
959   memcpy (&pm[1], data, data_len);
960   if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) &&
961       (GNUNET_YES == GSF_enable_randomized_delays))
962   {
963     struct GSF_DelayedHandle *dh;
964
965     dh = GNUNET_malloc (sizeof (struct GSF_DelayedHandle));
966     dh->cp = cp;
967     dh->pm = pm;
968     dh->msize = msize;
969     GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
970     dh->delay_task =
971         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
972                                       &transmit_delayed_now, dh);
973   }
974   else
975   {
976     (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
977                                &copy_reply, pm);
978   }
979   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
980     return;
981   if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task)
982   {
983     GNUNET_STATISTICS_update (GSF_stats,
984                               gettext_noop
985                               ("# P2P searches destroyed due to ultimate reply"),
986                               1, GNUNET_NO);
987     peerreq->kill_task =
988         GNUNET_SCHEDULER_add_now (&peer_request_destroy, peerreq);
989   }
990 }
991
992
993 /**
994  * Increase the peer's respect by a value.
995  *
996  * @param cp which peer to change the respect value on
997  * @param value is the int value by which the
998  *  peer's credit is to be increased or decreased
999  * @returns the actual change in respect (positive or negative)
1000  */
1001 static int
1002 change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
1003 {
1004   if (0 == value)
1005     return 0;
1006   GNUNET_assert (NULL != cp);
1007   if (value > 0)
1008   {
1009     if (cp->ppd.respect + value < cp->ppd.respect)
1010     {
1011       value = UINT32_MAX - cp->ppd.respect;
1012       cp->ppd.respect = UINT32_MAX;
1013     }
1014     else
1015       cp->ppd.respect += value;
1016   }
1017   else
1018   {
1019     if (cp->ppd.respect < -value)
1020     {
1021       value = -cp->ppd.respect;
1022       cp->ppd.respect = 0;
1023     }
1024     else
1025       cp->ppd.respect += value;
1026   }
1027   return value;
1028 }
1029
1030
1031 /**
1032  * We've received a request with the specified priority.  Bound it
1033  * according to how much we respect the given peer.
1034  *
1035  * @param prio_in requested priority
1036  * @param cp the peer making the request
1037  * @return effective priority
1038  */
1039 static int32_t
1040 bound_priority (uint32_t prio_in, struct GSF_ConnectedPeer *cp)
1041 {
1042 #define N ((double)128.0)
1043   uint32_t ret;
1044   double rret;
1045   int ld;
1046
1047   ld = GSF_test_get_load_too_high_ (0);
1048   if (GNUNET_SYSERR == ld)
1049   {
1050 #if INSANE_STATISTICS
1051     GNUNET_STATISTICS_update (GSF_stats,
1052                               gettext_noop
1053                               ("# requests done for free (low load)"), 1,
1054                               GNUNET_NO);
1055 #endif
1056     return 0;                   /* excess resources */
1057   }
1058   if (prio_in > INT32_MAX)
1059     prio_in = INT32_MAX;
1060   ret = -change_peer_respect (cp, -(int) prio_in);
1061   if (ret > 0)
1062   {
1063     if (ret > GSF_current_priorities + N)
1064       rret = GSF_current_priorities + N;
1065     else
1066       rret = ret;
1067     GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1068   }
1069   if ((GNUNET_YES == ld) && (ret > 0))
1070   {
1071     /* try with charging */
1072     ld = GSF_test_get_load_too_high_ (ret);
1073   }
1074   if (GNUNET_YES == ld)
1075   {
1076     GNUNET_STATISTICS_update (GSF_stats,
1077                               gettext_noop
1078                               ("# request dropped, priority insufficient"), 1,
1079                               GNUNET_NO);
1080     /* undo charge */
1081     change_peer_respect (cp, (int) ret);
1082     return -1;                  /* not enough resources */
1083   }
1084   else
1085   {
1086     GNUNET_STATISTICS_update (GSF_stats,
1087                               gettext_noop
1088                               ("# requests done for a price (normal load)"), 1,
1089                               GNUNET_NO);
1090   }
1091 #undef N
1092   return ret;
1093 }
1094
1095
1096 /**
1097  * The priority level imposes a bound on the maximum
1098  * value for the ttl that can be requested.
1099  *
1100  * @param ttl_in requested ttl
1101  * @param prio given priority
1102  * @return ttl_in if ttl_in is below the limit,
1103  *         otherwise the ttl-limit for the given priority
1104  */
1105 static int32_t
1106 bound_ttl (int32_t ttl_in, uint32_t prio)
1107 {
1108   unsigned long long allowed;
1109
1110   if (ttl_in <= 0)
1111     return ttl_in;
1112   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1113   if (ttl_in > allowed)
1114   {
1115     if (allowed >= (1 << 30))
1116       return 1 << 30;
1117     return allowed;
1118   }
1119   return ttl_in;
1120 }
1121
1122
1123 /**
1124  * Handle P2P "QUERY" message.  Creates the pending request entry
1125  * and sets up all of the data structures to that we will
1126  * process replies properly.  Does not initiate forwarding or
1127  * local database lookups.
1128  *
1129  * @param other the other peer involved (sender or receiver, NULL
1130  *        for loopback messages where we are both sender and receiver)
1131  * @param message the actual message
1132  * @return pending request handle, NULL on error
1133  */
1134 struct GSF_PendingRequest *
1135 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1136                        const struct GNUNET_MessageHeader *message)
1137 {
1138   struct PeerRequest *peerreq;
1139   struct GSF_PendingRequest *pr;
1140   struct GSF_PendingRequestData *prd;
1141   struct GSF_ConnectedPeer *cp;
1142   struct GSF_ConnectedPeer *cps;
1143   const struct GNUNET_PeerIdentity *target;
1144   enum GSF_PendingRequestOptions options;
1145   uint16_t msize;
1146   const struct GetMessage *gm;
1147   unsigned int bits;
1148   const struct GNUNET_HashCode *opt;
1149   uint32_t bm;
1150   size_t bfsize;
1151   uint32_t ttl_decrement;
1152   int32_t priority;
1153   int32_t ttl;
1154   enum GNUNET_BLOCK_Type type;
1155   GNUNET_PEER_Id spid;
1156
1157   GNUNET_assert (other != NULL);
1158   msize = ntohs (message->size);
1159   if (msize < sizeof (struct GetMessage))
1160   {
1161     GNUNET_break_op (0);
1162     return NULL;
1163   }
1164   GNUNET_STATISTICS_update (GSF_stats,
1165                             gettext_noop
1166                             ("# GET requests received (from other peers)"), 1,
1167                             GNUNET_NO);
1168   gm = (const struct GetMessage *) message;
1169   type = ntohl (gm->type);
1170   bm = ntohl (gm->hash_bitmap);
1171   bits = 0;
1172   while (bm > 0)
1173   {
1174     if (1 == (bm & 1))
1175       bits++;
1176     bm >>= 1;
1177   }
1178   if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_HashCode))
1179   {
1180     GNUNET_break_op (0);
1181     return NULL;
1182   }
1183   opt = (const struct GNUNET_HashCode *) &gm[1];
1184   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_HashCode);
1185   /* bfsize must be power of 2, check! */
1186   if (0 != ((bfsize - 1) & bfsize))
1187   {
1188     GNUNET_break_op (0);
1189     return NULL;
1190   }
1191   GSF_cover_query_count++;
1192   bm = ntohl (gm->hash_bitmap);
1193   bits = 0;
1194   cps = GSF_peer_get_ (other);
1195   if (NULL == cps)
1196   {
1197     /* peer must have just disconnected */
1198     GNUNET_STATISTICS_update (GSF_stats,
1199                               gettext_noop
1200                               ("# requests dropped due to initiator not being connected"),
1201                               1, GNUNET_NO);
1202     return NULL;
1203   }
1204   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1205     cp = GSF_peer_get_ ((const struct GNUNET_PeerIdentity *) &opt[bits++]);
1206   else
1207     cp = cps;
1208   if (NULL == cp)
1209   {
1210     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1211       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212                   "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
1213                   GNUNET_i2s ((const struct GNUNET_PeerIdentity *)
1214                               &opt[bits - 1]));
1215
1216     else
1217       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1218                   "Failed to find peer `%4s' in connection set. Dropping query.\n",
1219                   GNUNET_i2s (other));
1220 #if INSANE_STATISTICS
1221     GNUNET_STATISTICS_update (GSF_stats,
1222                               gettext_noop
1223                               ("# requests dropped due to missing reverse route"),
1224                               1, GNUNET_NO);
1225 #endif
1226     return NULL;
1227   }
1228   /* note that we can really only check load here since otherwise
1229    * peers could find out that we are overloaded by not being
1230    * disconnected after sending us a malformed query... */
1231   priority = bound_priority (ntohl (gm->priority), cps);
1232   if (priority < 0)
1233   {
1234     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1235                 "Dropping query from `%s', this peer is too busy.\n",
1236                 GNUNET_i2s (other));
1237     return NULL;
1238   }
1239   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1240               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
1241               GNUNET_h2s (&gm->query), (unsigned int) type, GNUNET_i2s (other),
1242               (unsigned int) bm);
1243   target =
1244       (0 !=
1245        (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity
1246                                                *) &opt[bits++]) : NULL;
1247   options = GSF_PRO_DEFAULTS;
1248   spid = 0;
1249   if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
1250       || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1251           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 +
1252           GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1253   {
1254     /* don't have BW to send to peer, or would likely take longer than we have for it,
1255      * so at best indirect the query */
1256     priority = 0;
1257     options |= GSF_PRO_FORWARD_ONLY;
1258     spid = GNUNET_PEER_intern (other);
1259     GNUNET_assert (0 != spid);
1260   }
1261   ttl = bound_ttl (ntohl (gm->ttl), priority);
1262   /* decrement ttl (always) */
1263   ttl_decrement =
1264       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1265                                                     TTL_DECREMENT);
1266   if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
1267   {
1268     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1269                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1270                 GNUNET_i2s (other), ttl, ttl_decrement);
1271     GNUNET_STATISTICS_update (GSF_stats,
1272                               gettext_noop
1273                               ("# requests dropped due TTL underflow"), 1,
1274                               GNUNET_NO);
1275     /* integer underflow => drop (should be very rare)! */
1276     return NULL;
1277   }
1278   ttl -= ttl_decrement;
1279
1280   /* test if the request already exists */
1281   peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map, &gm->query);
1282   if (peerreq != NULL)
1283   {
1284     pr = peerreq->pr;
1285     prd = GSF_pending_request_get_data_ (pr);
1286     if (prd->type == type) 
1287     {
1288       if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get ().abs_value + ttl)
1289       {
1290         /* existing request has higher TTL, drop new one! */
1291         prd->priority += priority;
1292         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1293                     "Have existing request with higher TTL, dropping new request.\n",
1294                     GNUNET_i2s (other));
1295         GNUNET_STATISTICS_update (GSF_stats,
1296                                   gettext_noop
1297                                   ("# requests dropped due to higher-TTL request"),
1298                                   1, GNUNET_NO);
1299         return NULL;
1300       }
1301       /* existing request has lower TTL, drop old one! */
1302       priority += prd->priority;
1303       GSF_pending_request_cancel_ (pr, GNUNET_YES);
1304       free_pending_request (peerreq, &gm->query);
1305     }
1306   }
1307
1308   peerreq = GNUNET_malloc (sizeof (struct PeerRequest));
1309   peerreq->cp = cp;
1310   pr = GSF_pending_request_create_ (options, type, &gm->query, 
1311                                     target,
1312                                     (bfsize >
1313                                      0) ? (const char *) &opt[bits] : NULL,
1314                                     bfsize, ntohl (gm->filter_mutator),
1315                                     1 /* anonymity */ ,
1316                                     (uint32_t) priority, ttl, spid, GNUNET_PEER_intern (other), NULL, 0,        /* replies_seen */
1317                                     &handle_p2p_reply, peerreq);
1318   GNUNET_assert (NULL != pr);
1319   peerreq->pr = pr;
1320   GNUNET_break (GNUNET_OK ==
1321                 GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query,
1322                                                    peerreq,
1323                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1324   GNUNET_STATISTICS_update (GSF_stats,
1325                             gettext_noop
1326                             ("# P2P query messages received and processed"), 1,
1327                             GNUNET_NO);
1328   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
1329                             1, GNUNET_NO);
1330   return pr;
1331 }
1332
1333
1334 /**
1335  * Function called if there has been a timeout trying to satisfy
1336  * a transmission request.
1337  *
1338  * @param cls the 'struct GSF_PeerTransmitHandle' of the request
1339  * @param tc scheduler context
1340  */
1341 static void
1342 peer_transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1343 {
1344   struct GSF_PeerTransmitHandle *pth = cls;
1345   struct GSF_ConnectedPeer *cp;
1346
1347   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1348               "Timeout trying to transmit to other peer\n");
1349   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1350   cp = pth->cp;
1351   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1352   if (GNUNET_YES == pth->is_query)
1353     GNUNET_assert (0 < cp->ppd.pending_queries--);
1354   else if (GNUNET_NO == pth->is_query)
1355     GNUNET_assert (0 < cp->ppd.pending_replies--);
1356   GNUNET_LOAD_update (cp->ppd.transmission_delay, UINT64_MAX);
1357   if (NULL != cp->cth)
1358   {
1359     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1360     cp->cth = NULL;
1361   }
1362   pth->gmc (pth->gmc_cls, 0, NULL);
1363   GNUNET_assert (0 == cp->cth_in_progress);
1364   GNUNET_free (pth);
1365 }
1366
1367
1368 /**
1369  * Transmit a message to the given peer as soon as possible.
1370  * If the peer disconnects before the transmission can happen,
1371  * the callback is invoked with a 'NULL' buffer.
1372  *
1373  * @param cp target peer
1374  * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
1375  * @param priority how important is this request?
1376  * @param timeout when does this request timeout (call gmc with error)
1377  * @param size number of bytes we would like to send to the peer
1378  * @param gmc function to call to get the message
1379  * @param gmc_cls closure for gmc
1380  * @return handle to cancel request
1381  */
1382 struct GSF_PeerTransmitHandle *
1383 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, int is_query,
1384                     uint32_t priority, struct GNUNET_TIME_Relative timeout,
1385                     size_t size, GSF_GetMessageCallback gmc, void *gmc_cls)
1386 {
1387   struct GSF_PeerTransmitHandle *pth;
1388   struct GSF_PeerTransmitHandle *pos;
1389   struct GSF_PeerTransmitHandle *prev;
1390
1391   pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
1392   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1393   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1394   pth->gmc = gmc;
1395   pth->gmc_cls = gmc_cls;
1396   pth->size = size;
1397   pth->is_query = is_query;
1398   pth->priority = priority;
1399   pth->cp = cp;
1400   /* insertion sort (by priority, descending) */
1401   prev = NULL;
1402   pos = cp->pth_head;
1403   while ((NULL != pos) && (pos->priority > priority))
1404   {
1405     prev = pos;
1406     pos = pos->next;
1407   }
1408   GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
1409   if (GNUNET_YES == is_query)
1410     cp->ppd.pending_queries++;
1411   else if (GNUNET_NO == is_query)
1412     cp->ppd.pending_replies++;
1413   pth->timeout_task =
1414       GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
1415   schedule_transmission (pth);
1416   return pth;
1417 }
1418
1419
1420 /**
1421  * Cancel an earlier request for transmission.
1422  *
1423  * @param pth request to cancel
1424  */
1425 void
1426 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1427 {
1428   struct GSF_ConnectedPeer *cp;
1429
1430   if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
1431   {
1432     GNUNET_SCHEDULER_cancel (pth->timeout_task);
1433     pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1434   }
1435   cp = pth->cp;
1436   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1437   if (GNUNET_YES == pth->is_query)
1438     GNUNET_assert (0 < cp->ppd.pending_queries--);
1439   else if (GNUNET_NO == pth->is_query)
1440     GNUNET_assert (0 < cp->ppd.pending_replies--);
1441   GNUNET_free (pth);
1442 }
1443
1444
1445 /**
1446  * Report on receiving a reply; update the performance record of the given peer.
1447  *
1448  * @param cp responding peer (will be updated)
1449  * @param request_time time at which the original query was transmitted
1450  * @param request_priority priority of the original request
1451  */
1452 void
1453 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1454                               struct GNUNET_TIME_Absolute request_time,
1455                               uint32_t request_priority)
1456 {
1457   struct GNUNET_TIME_Relative delay;
1458
1459   delay = GNUNET_TIME_absolute_get_duration (request_time);
1460   cp->ppd.avg_reply_delay.rel_value =
1461       (cp->ppd.avg_reply_delay.rel_value * (RUNAVG_DELAY_N - 1) +
1462        delay.rel_value) / RUNAVG_DELAY_N;
1463   cp->ppd.avg_priority =
1464       (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1465        request_priority) / RUNAVG_DELAY_N;
1466 }
1467
1468
1469 /**
1470  * Report on receiving a reply in response to an initiating client.
1471  * Remember that this peer is good for this client.
1472  *
1473  * @param cp responding peer (will be updated)
1474  * @param initiator_client local client on responsible for query
1475  */
1476 void
1477 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1478                                    struct GSF_LocalClient *initiator_client)
1479 {
1480   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1481                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1482 }
1483
1484
1485 /**
1486  * Report on receiving a reply in response to an initiating peer.
1487  * Remember that this peer is good for this initiating peer.
1488  *
1489  * @param cp responding peer (will be updated)
1490  * @param initiator_peer other peer responsible for query
1491  */
1492 void
1493 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1494                                  const struct GSF_ConnectedPeer *initiator_peer)
1495 {
1496   unsigned int woff;
1497
1498   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1499   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1500   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1501   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1502   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1503 }
1504
1505
1506 /**
1507  * A peer disconnected from us.  Tear down the connected peer
1508  * record.
1509  *
1510  * @param cls unused
1511  * @param peer identity of peer that connected
1512  */
1513 void
1514 GSF_peer_disconnect_handler_ (void *cls, const struct GNUNET_PeerIdentity *peer)
1515 {
1516   struct GSF_ConnectedPeer *cp;
1517   struct GSF_PeerTransmitHandle *pth;
1518   struct GSF_DelayedHandle *dh;
1519
1520   cp = GSF_peer_get_ (peer);
1521   if (NULL == cp)
1522     return;                     /* must have been disconnect from core with
1523                                  * 'peer' == my_id, ignore */
1524   GNUNET_assert (GNUNET_YES ==
1525                  GNUNET_CONTAINER_multihashmap_remove (cp_map,
1526                                                        &peer->hashPubKey, cp));
1527   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1528                          GNUNET_CONTAINER_multihashmap_size (cp_map),
1529                          GNUNET_NO);
1530   if (NULL != cp->migration_pth)
1531   {
1532     GSF_peer_transmit_cancel_ (cp->migration_pth);
1533     cp->migration_pth = NULL;
1534   }
1535   if (NULL != cp->rc)
1536   {
1537     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1538     cp->rc = NULL;
1539   }
1540   if (GNUNET_SCHEDULER_NO_TASK != cp->rc_delay_task)
1541   {
1542     GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1543     cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
1544   }
1545   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1546                                          &cancel_pending_request, cp);
1547   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1548   cp->request_map = NULL;
1549   GSF_plan_notify_peer_disconnect_ (cp);
1550   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1551   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1552   memset (cp->ppd.last_p2p_replies, 0, sizeof (cp->ppd.last_p2p_replies));
1553   GSF_push_stop_ (cp);
1554   if (NULL != cp->cth)
1555   {
1556     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1557     cp->cth = NULL;
1558   }
1559   GNUNET_assert (0 == cp->cth_in_progress);
1560   while (NULL != (pth = cp->pth_head))
1561   {
1562     if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1563     {
1564       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1565       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1566     }
1567     GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1568     pth->gmc (pth->gmc_cls, 0, NULL);
1569     GNUNET_free (pth);
1570   }
1571   while (NULL != (dh = cp->delayed_head))
1572   {
1573     GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
1574     GNUNET_SCHEDULER_cancel (dh->delay_task);
1575     GNUNET_free (dh->pm);
1576     GNUNET_free (dh);
1577   }
1578   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1579   if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1580   {
1581     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1582     cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
1583   }
1584   GNUNET_free (cp);
1585 }
1586
1587
1588 /**
1589  * Closure for 'call_iterator'.
1590  */
1591 struct IterationContext
1592 {
1593   /**
1594    * Function to call on each entry.
1595    */
1596   GSF_ConnectedPeerIterator it;
1597
1598   /**
1599    * Closure for 'it'.
1600    */
1601   void *it_cls;
1602 };
1603
1604
1605 /**
1606  * Function that calls the callback for each peer.
1607  *
1608  * @param cls the 'struct IterationContext*'
1609  * @param key identity of the peer
1610  * @param value the 'struct GSF_ConnectedPeer*'
1611  * @return GNUNET_YES to continue iteration
1612  */
1613 static int
1614 call_iterator (void *cls, const struct GNUNET_HashCode * key, void *value)
1615 {
1616   struct IterationContext *ic = cls;
1617   struct GSF_ConnectedPeer *cp = value;
1618
1619   ic->it (ic->it_cls, (const struct GNUNET_PeerIdentity *) key, cp, &cp->ppd);
1620   return GNUNET_YES;
1621 }
1622
1623
1624 /**
1625  * Iterate over all connected peers.
1626  *
1627  * @param it function to call for each peer
1628  * @param it_cls closure for it
1629  */
1630 void
1631 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it, void *it_cls)
1632 {
1633   struct IterationContext ic;
1634
1635   ic.it = it;
1636   ic.it_cls = it_cls;
1637   GNUNET_CONTAINER_multihashmap_iterate (cp_map, &call_iterator, &ic);
1638 }
1639
1640
1641 /**
1642  * Obtain the identity of a connected peer.
1643  *
1644  * @param cp peer to get identity of
1645  * @param id identity to set (written to)
1646  */
1647 void
1648 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1649                                   struct GNUNET_PeerIdentity *id)
1650 {
1651   GNUNET_assert (0 != cp->ppd.pid);
1652   GNUNET_PEER_resolve (cp->ppd.pid, id);
1653 }
1654
1655
1656 /**
1657  * Obtain the identity of a connected peer.
1658  *
1659  * @param cp peer to get identity of
1660  * @return reference to peer identity, valid until peer disconnects (!)
1661  */
1662 const struct GNUNET_PeerIdentity *
1663 GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1664 {
1665   GNUNET_assert (0 != cp->ppd.pid);
1666   return GNUNET_PEER_resolve2 (cp->ppd.pid);
1667 }
1668
1669
1670 /**
1671  * Assemble a migration stop message for transmission.
1672  *
1673  * @param cls the 'struct GSF_ConnectedPeer' to use
1674  * @param size number of bytes we're allowed to write to buf
1675  * @param buf where to copy the message
1676  * @return number of bytes copied to buf
1677  */
1678 static size_t
1679 create_migration_stop_message (void *cls, size_t size, void *buf)
1680 {
1681   struct GSF_ConnectedPeer *cp = cls;
1682   struct MigrationStopMessage msm;
1683
1684   cp->migration_pth = NULL;
1685   if (NULL == buf)
1686     return 0;
1687   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1688   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1689   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1690   msm.reserved = htonl (0);
1691   msm.duration =
1692       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1693                                  (cp->last_migration_block));
1694   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1695   GNUNET_STATISTICS_update (GSF_stats,
1696                             gettext_noop ("# migration stop messages sent"),
1697                             1, GNUNET_NO);
1698   return sizeof (struct MigrationStopMessage);
1699 }
1700
1701
1702 /**
1703  * Ask a peer to stop migrating data to us until the given point
1704  * in time.
1705  *
1706  * @param cp peer to ask
1707  * @param block_time until when to block
1708  */
1709 void
1710 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1711                            struct GNUNET_TIME_Absolute block_time)
1712 {
1713   if (cp->last_migration_block.abs_value > block_time.abs_value)
1714   {
1715     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1716                 "Migration already blocked for another %s\n",
1717                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
1718                                                         (cp->last_migration_block), GNUNET_YES));
1719     return;                     /* already blocked */
1720   }
1721   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %llu ms\n",
1722               (unsigned long long) GNUNET_TIME_absolute_get_remaining (block_time).rel_value);
1723   cp->last_migration_block = block_time;
1724   if (NULL != cp->migration_pth)
1725     GSF_peer_transmit_cancel_ (cp->migration_pth);
1726   cp->migration_pth =
1727       GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1728                           GNUNET_TIME_UNIT_FOREVER_REL,
1729                           sizeof (struct MigrationStopMessage),
1730                           &create_migration_stop_message, cp);
1731 }
1732
1733
1734 /**
1735  * Write peer-respect information to a file - flush the buffer entry!
1736  *
1737  * @param cls unused
1738  * @param key peer identity
1739  * @param value the 'struct GSF_ConnectedPeer' to flush
1740  * @return GNUNET_OK to continue iteration
1741  */
1742 static int
1743 flush_respect (void *cls, const struct GNUNET_HashCode * key, void *value)
1744 {
1745   struct GSF_ConnectedPeer *cp = value;
1746   char *fn;
1747   uint32_t respect;
1748   struct GNUNET_PeerIdentity pid;
1749
1750   if (cp->ppd.respect == cp->disk_respect)
1751     return GNUNET_OK;           /* unchanged */
1752   GNUNET_assert (0 != cp->ppd.pid);
1753   GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1754   fn = get_respect_filename (&pid);
1755   if (cp->ppd.respect == 0)
1756   {
1757     if ((0 != UNLINK (fn)) && (errno != ENOENT))
1758       GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1759                                 GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1760   }
1761   else
1762   {
1763     respect = htonl (cp->ppd.respect);
1764     if (sizeof (uint32_t) ==
1765         GNUNET_DISK_fn_write (fn, &respect, sizeof (uint32_t),
1766                               GNUNET_DISK_PERM_USER_READ |
1767                               GNUNET_DISK_PERM_USER_WRITE |
1768                               GNUNET_DISK_PERM_GROUP_READ |
1769                               GNUNET_DISK_PERM_OTHER_READ))
1770       cp->disk_respect = cp->ppd.respect;
1771   }
1772   GNUNET_free (fn);
1773   return GNUNET_OK;
1774 }
1775
1776
1777 /**
1778  * Notify core about a preference we have for the given peer
1779  * (to allocate more resources towards it).  The change will
1780  * be communicated the next time we reserve bandwidth with
1781  * core (not instantly).
1782  *
1783  * @param cp peer to reserve bandwidth from
1784  * @param pref preference change
1785  */
1786 void
1787 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1788                                        uint64_t pref)
1789 {
1790   cp->inc_preference += pref;
1791 }
1792
1793
1794 /**
1795  * Call this method periodically to flush respect information to disk.
1796  *
1797  * @param cls closure, not used
1798  * @param tc task context, not used
1799  */
1800 static void
1801 cron_flush_respect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1802 {
1803
1804   if (NULL == cp_map)
1805     return;
1806   GNUNET_CONTAINER_multihashmap_iterate (cp_map, &flush_respect, NULL);
1807   if (NULL == tc)
1808     return;
1809   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1810     return;
1811   GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1812                                               GNUNET_SCHEDULER_PRIORITY_HIGH,
1813                                               &cron_flush_respect, NULL);
1814 }
1815
1816
1817 /**
1818  * Initialize peer management subsystem.
1819  */
1820 void
1821 GSF_connected_peer_init_ ()
1822 {
1823   cp_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_YES);
1824   ats = GNUNET_ATS_performance_init (GSF_cfg, NULL, NULL);
1825   GNUNET_assert (GNUNET_OK ==
1826                  GNUNET_CONFIGURATION_get_value_filename (GSF_cfg, "fs",
1827                                                           "RESPECT",
1828                                                           &respectDirectory));
1829   GNUNET_DISK_directory_create (respectDirectory);
1830   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1831                                       &cron_flush_respect, NULL);
1832 }
1833
1834
1835 /**
1836  * Iterator to free peer entries.
1837  *
1838  * @param cls closure, unused
1839  * @param key current key code
1840  * @param value value in the hash map (peer entry)
1841  * @return GNUNET_YES (we should continue to iterate)
1842  */
1843 static int
1844 clean_peer (void *cls, const struct GNUNET_HashCode * key, void *value)
1845 {
1846   GSF_peer_disconnect_handler_ (NULL, (const struct GNUNET_PeerIdentity *) key);
1847   return GNUNET_YES;
1848 }
1849
1850
1851 /**
1852  * Shutdown peer management subsystem.
1853  */
1854 void
1855 GSF_connected_peer_done_ ()
1856 {
1857   cron_flush_respect (NULL, NULL);
1858   GNUNET_CONTAINER_multihashmap_iterate (cp_map, &clean_peer, NULL);
1859   GNUNET_CONTAINER_multihashmap_destroy (cp_map);
1860   cp_map = NULL;
1861   GNUNET_free (respectDirectory);
1862   respectDirectory = NULL;
1863   GNUNET_ATS_performance_done (ats);
1864   ats = NULL;
1865 }
1866
1867
1868 /**
1869  * Iterator to remove references to LC entry.
1870  *
1871  * @param cls the 'struct GSF_LocalClient*' to look for
1872  * @param key current key code
1873  * @param value value in the hash map (peer entry)
1874  * @return GNUNET_YES (we should continue to iterate)
1875  */
1876 static int
1877 clean_local_client (void *cls, const struct GNUNET_HashCode * key, void *value)
1878 {
1879   const struct GSF_LocalClient *lc = cls;
1880   struct GSF_ConnectedPeer *cp = value;
1881   unsigned int i;
1882
1883   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1884     if (cp->ppd.last_client_replies[i] == lc)
1885       cp->ppd.last_client_replies[i] = NULL;
1886   return GNUNET_YES;
1887 }
1888
1889
1890 /**
1891  * Notification that a local client disconnected.  Clean up all of our
1892  * references to the given handle.
1893  *
1894  * @param lc handle to the local client (henceforth invalid)
1895  */
1896 void
1897 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1898 {
1899   if (NULL == cp_map)
1900     return;                     /* already cleaned up */
1901   GNUNET_CONTAINER_multihashmap_iterate (cp_map, &clean_local_client,
1902                                          (void *) lc);
1903 }
1904
1905
1906 /* end of gnunet-service-fs_cp.c */