merging configs
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet_ats_service.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
31 #include "gnunet-service-fs_pe.h"
32 #include "gnunet-service-fs_pr.h"
33 #include "gnunet-service-fs_push.h"
34
35
36 /**
37  * Ratio for moving average delay calculation.  The previous
38  * average goes in with a factor of (n-1) into the calculation.
39  * Must be > 0.
40  */
41 #define RUNAVG_DELAY_N 16
42
43 /**
44  * How often do we flush respect values to disk?
45  */
46 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
47
48 /**
49  * After how long do we discard a reply?
50  */
51 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
52
53 /**
54  * Collect an instane number of statistics?  May cause excessive IPC.
55  */
56 #define INSANE_STATISTICS GNUNET_NO
57
58
59 /**
60  * Handle to cancel a transmission request.
61  */
62 struct GSF_PeerTransmitHandle
63 {
64
65   /**
66    * Kept in a doubly-linked list.
67    */
68   struct GSF_PeerTransmitHandle *next;
69
70   /**
71    * Kept in a doubly-linked list.
72    */
73   struct GSF_PeerTransmitHandle *prev;
74
75   /**
76    * Time when this transmission request was issued.
77    */
78   struct GNUNET_TIME_Absolute transmission_request_start_time;
79
80   /**
81    * Timeout for this request.
82    */
83   struct GNUNET_TIME_Absolute timeout;
84
85   /**
86    * Task called on timeout, or 0 for none.
87    */
88   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
89
90   /**
91    * Function to call to get the actual message.
92    */
93   GSF_GetMessageCallback gmc;
94
95   /**
96    * Peer this request targets.
97    */
98   struct GSF_ConnectedPeer *cp;
99
100   /**
101    * Closure for 'gmc'.
102    */
103   void *gmc_cls;
104
105   /**
106    * Size of the message to be transmitted.
107    */
108   size_t size;
109
110   /**
111    * GNUNET_YES if this is a query, GNUNET_NO for content.
112    */
113   int is_query;
114
115   /**
116    * Did we get a reservation already?
117    */
118   int was_reserved;
119
120   /**
121    * Priority of this request.
122    */
123   uint32_t priority;
124
125 };
126
127
128 /**
129  * Handle for an entry in our delay list.
130  */
131 struct GSF_DelayedHandle
132 {
133
134   /**
135    * Kept in a doubly-linked list.
136    */
137   struct GSF_DelayedHandle *next;
138
139   /**
140    * Kept in a doubly-linked list.
141    */
142   struct GSF_DelayedHandle *prev;
143
144   /**
145    * Peer this transmission belongs to.
146    */
147   struct GSF_ConnectedPeer *cp;
148
149   /**
150    * The PUT that was delayed.
151    */
152   struct PutMessage *pm;
153
154   /**
155    * Task for the delay.
156    */
157   GNUNET_SCHEDULER_TaskIdentifier delay_task;
158
159   /**
160    * Size of the message.
161    */
162   size_t msize;
163
164 };
165
166
167 /**
168  * Information per peer and request.
169  */
170 struct PeerRequest
171 {
172
173   /**
174    * Handle to generic request.
175    */
176   struct GSF_PendingRequest *pr;
177
178   /**
179    * Handle to specific peer.
180    */
181   struct GSF_ConnectedPeer *cp;
182
183   /**
184    * Task for asynchronous stopping of this request.
185    */
186   GNUNET_SCHEDULER_TaskIdentifier kill_task;
187
188 };
189
190
191 /**
192  * A connected peer.
193  */
194 struct GSF_ConnectedPeer
195 {
196
197   /**
198    * Performance data for this peer.
199    */
200   struct GSF_PeerPerformanceData ppd;
201
202   /**
203    * Time until when we blocked this peer from migrating
204    * data to us.
205    */
206   struct GNUNET_TIME_Absolute last_migration_block;
207
208   /**
209    * Task scheduled to revive migration to this peer.
210    */
211   GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
212
213   /**
214    * Messages (replies, queries, content migration) we would like to
215    * send to this peer in the near future.  Sorted by priority, head.
216    */
217   struct GSF_PeerTransmitHandle *pth_head;
218
219   /**
220    * Messages (replies, queries, content migration) we would like to
221    * send to this peer in the near future.  Sorted by priority, tail.
222    */
223   struct GSF_PeerTransmitHandle *pth_tail;
224
225   /**
226    * Messages (replies, queries, content migration) we would like to
227    * send to this peer in the near future.  Sorted by priority, head.
228    */
229   struct GSF_DelayedHandle *delayed_head;
230
231   /**
232    * Messages (replies, queries, content migration) we would like to
233    * send to this peer in the near future.  Sorted by priority, tail.
234    */
235   struct GSF_DelayedHandle *delayed_tail;
236
237   /**
238    * Migration stop message in our queue, or NULL if we have none pending.
239    */
240   struct GSF_PeerTransmitHandle *migration_pth;
241
242   /**
243    * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
244    */
245   struct GNUNET_ATS_ReservationContext *rc;
246
247   /**
248    * Task scheduled if we need to retry bandwidth reservation later.
249    */
250   GNUNET_SCHEDULER_TaskIdentifier rc_delay_task;
251
252   /**
253    * Active requests from this neighbour, map of query to 'struct PeerRequest'.
254    */
255   struct GNUNET_CONTAINER_MultiHashMap *request_map;
256
257   /**
258    * Handle for an active request for transmission to this
259    * peer, or NULL (if core queue was full).
260    */
261   struct GNUNET_CORE_TransmitHandle *cth;
262
263   /**
264    * Increase in traffic preference still to be submitted
265    * to the core service for this peer.
266    */
267   uint64_t inc_preference;
268
269   /**
270    * Set to 1 if we're currently in the process of calling
271    * 'GNUNET_CORE_notify_transmit_ready' (so while cth is
272    * NULL, we should not call notify_transmit_ready for this
273    * handle right now).
274    */
275   unsigned int cth_in_progress;
276
277   /**
278    * Respect rating for this peer on disk.
279    */
280   uint32_t disk_respect;
281
282   /**
283    * Which offset in "last_p2p_replies" will be updated next?
284    * (we go round-robin).
285    */
286   unsigned int last_p2p_replies_woff;
287
288   /**
289    * Which offset in "last_client_replies" will be updated next?
290    * (we go round-robin).
291    */
292   unsigned int last_client_replies_woff;
293
294   /**
295    * Current offset into 'last_request_times' ring buffer.
296    */
297   unsigned int last_request_times_off;
298
299   /**
300    * GNUNET_YES if we did successfully reserve 32k bandwidth,
301    * GNUNET_NO if not.
302    */
303   int did_reserve;
304
305 };
306
307
308 /**
309  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
310  */
311 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
312
313 /**
314  * Where do we store respect information?
315  */
316 static char *respectDirectory;
317
318 /**
319  * Handle to ATS service.
320  */
321 static struct GNUNET_ATS_PerformanceHandle *ats;
322
323
324 /**
325  * Get the filename under which we would store respect
326  * for the given peer.
327  *
328  * @param id peer to get the filename for
329  * @return filename of the form DIRECTORY/PEERID
330  */
331 static char *
332 get_respect_filename (const struct GNUNET_PeerIdentity *id)
333 {
334   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
335   char *fn;
336
337   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
338   GNUNET_asprintf (&fn, "%s%s%s", respectDirectory, DIR_SEPARATOR_STR, &fil);
339   return fn;
340 }
341
342
343 /**
344  * Find latency information in 'atsi'.
345  *
346  * @param atsi performance data
347  * @param atsi_count number of records in 'atsi'
348  * @return connection latency
349  */
350 static struct GNUNET_TIME_Relative
351 get_latency (const struct GNUNET_ATS_Information *atsi, unsigned int atsi_count)
352 {
353   unsigned int i;
354
355   for (i = 0; i < atsi_count; i++)
356     if (ntohl (atsi->type) == GNUNET_ATS_QUALITY_NET_DELAY)
357       return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
358                                             ntohl (atsi->value));
359   return GNUNET_TIME_UNIT_SECONDS;
360 }
361
362
363 /**
364  * Update the performance information kept for the given peer.
365  *
366  * @param cp peer record to update
367  * @param atsi transport performance data
368  * @param atsi_count number of records in 'atsi'
369  */
370 static void
371 update_atsi (struct GSF_ConnectedPeer *cp,
372              const struct GNUNET_ATS_Information *atsi, unsigned int atsi_count)
373 {
374   struct GNUNET_TIME_Relative latency;
375
376   latency = get_latency (atsi, atsi_count);
377   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay, latency);
378   /* LATER: merge atsi into cp's performance data (if we ever care...) */
379 }
380
381
382 /**
383  * Return the performance data record for the given peer
384  *
385  * @param cp peer to query
386  * @return performance data record for the peer
387  */
388 struct GSF_PeerPerformanceData *
389 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
390 {
391   return &cp->ppd;
392 }
393
394
395 /**
396  * Core is ready to transmit to a peer, get the message.
397  *
398  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
399  * @param size number of bytes core is willing to take
400  * @param buf where to copy the message
401  * @return number of bytes copied to buf
402  */
403 static size_t
404 peer_transmit_ready_cb (void *cls, size_t size, void *buf);
405
406
407 /**
408  * Function called by core upon success or failure of our bandwidth reservation request.
409  *
410  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
411  * @param peer identifies the peer
412  * @param amount set to the amount that was actually reserved or unreserved;
413  *               either the full requested amount or zero (no partial reservations)
414  * @param res_delay if the reservation could not be satisfied (amount was 0), how
415  *        long should the client wait until re-trying?
416  */
417 static void
418 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
419                       int32_t amount, struct GNUNET_TIME_Relative res_delay);
420
421
422 /**
423  * If ready (bandwidth reserved), try to schedule transmission via
424  * core for the given handle.
425  *
426  * @param pth transmission handle to schedule
427  */
428 static void
429 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
430 {
431   struct GSF_ConnectedPeer *cp;
432   struct GNUNET_PeerIdentity target;
433
434   cp = pth->cp;
435   if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
436     return;                     /* already done */
437   GNUNET_assert (0 != cp->ppd.pid);
438   GNUNET_PEER_resolve (cp->ppd.pid, &target);
439
440   if (0 != cp->inc_preference)
441   {
442     GNUNET_ATS_change_preference (ats, &target, GNUNET_ATS_PREFERENCE_BANDWIDTH,
443                                   (double) cp->inc_preference,
444                                   GNUNET_ATS_PREFERENCE_END);
445     cp->inc_preference = 0;
446   }
447
448   if ((GNUNET_YES == pth->is_query) && (GNUNET_YES != pth->was_reserved))
449   {
450     /* query, need reservation */
451     if (GNUNET_YES != cp->did_reserve)
452       return;                   /* not ready */
453     cp->did_reserve = GNUNET_NO;
454     /* reservation already done! */
455     pth->was_reserved = GNUNET_YES;
456     cp->rc =
457         GNUNET_ATS_reserve_bandwidth (ats, &target, DBLOCK_SIZE,
458                                       &ats_reserve_callback, cp);
459     return;
460   }
461   GNUNET_assert (NULL == cp->cth);
462   cp->cth_in_progress++;
463   cp->cth =
464     GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, pth->priority,
465                                        GNUNET_TIME_absolute_get_remaining
466                                        (pth->timeout), &target, pth->size,
467                                        &peer_transmit_ready_cb, cp);
468   GNUNET_assert (NULL != cp->cth);
469   GNUNET_assert (0 < cp->cth_in_progress--);
470 }
471
472
473 /**
474  * Core is ready to transmit to a peer, get the message.
475  *
476  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
477  * @param size number of bytes core is willing to take
478  * @param buf where to copy the message
479  * @return number of bytes copied to buf
480  */
481 static size_t
482 peer_transmit_ready_cb (void *cls, size_t size, void *buf)
483 {
484   struct GSF_ConnectedPeer *cp = cls;
485   struct GSF_PeerTransmitHandle *pth = cp->pth_head;
486   struct GSF_PeerTransmitHandle *pos;
487   size_t ret;
488
489   cp->cth = NULL;
490   if (NULL == pth)
491     return 0;
492   if (pth->size > size)
493   {
494     schedule_transmission (pth);
495     return 0;
496   }
497   if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
498   {
499     GNUNET_SCHEDULER_cancel (pth->timeout_task);
500     pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
501   }
502   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
503   if (GNUNET_YES == pth->is_query)
504   {
505     cp->ppd.last_request_times[(cp->last_request_times_off++) %
506                                MAX_QUEUE_PER_PEER] =
507         GNUNET_TIME_absolute_get ();
508     GNUNET_assert (0 < cp->ppd.pending_queries--);
509   }
510   else if (GNUNET_NO == pth->is_query)
511   {
512     GNUNET_assert (0 < cp->ppd.pending_replies--);
513   }
514   GNUNET_LOAD_update (cp->ppd.transmission_delay,
515                       GNUNET_TIME_absolute_get_duration
516                       (pth->transmission_request_start_time).rel_value);
517   ret = pth->gmc (pth->gmc_cls, size, buf);
518   if (NULL != (pos = cp->pth_head))
519   {
520     GNUNET_assert (pos != pth);
521     schedule_transmission (pos);
522   }
523   GNUNET_free (pth);
524   return ret;
525 }
526
527
528 /**
529  * (re)try to reserve bandwidth from the given peer.
530  *
531  * @param cls the 'struct GSF_ConnectedPeer' to reserve from
532  * @param tc scheduler context
533  */
534 static void
535 retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
536 {
537   struct GSF_ConnectedPeer *cp = cls;
538   struct GNUNET_PeerIdentity target;
539
540   GNUNET_PEER_resolve (cp->ppd.pid, &target);
541   cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
542   cp->rc =
543       GNUNET_ATS_reserve_bandwidth (ats, &target, DBLOCK_SIZE,
544                                     &ats_reserve_callback, cp);
545 }
546
547
548 /**
549  * Function called by core upon success or failure of our bandwidth reservation request.
550  *
551  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
552  * @param peer identifies the peer
553  * @param amount set to the amount that was actually reserved or unreserved;
554  *               either the full requested amount or zero (no partial reservations)
555  * @param res_delay if the reservation could not be satisfied (amount was 0), how
556  *        long should the client wait until re-trying?
557  */
558 static void
559 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
560                       int32_t amount, struct GNUNET_TIME_Relative res_delay)
561 {
562   struct GSF_ConnectedPeer *cp = cls;
563   struct GSF_PeerTransmitHandle *pth;
564
565   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
566               "Reserved %d bytes / need to wait %s for reservation\n",
567               (int) amount, 
568               GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
569   cp->rc = NULL;
570   if (0 == amount)
571   {
572     cp->rc_delay_task =
573         GNUNET_SCHEDULER_add_delayed (res_delay, &retry_reservation, cp);
574     return;
575   }
576   cp->did_reserve = GNUNET_YES;
577   pth = cp->pth_head;
578   if ((NULL != pth) && (NULL == cp->cth) && (0 == cp->cth_in_progress))
579   {
580     /* reservation success, try transmission now! */
581     cp->cth_in_progress++;
582     cp->cth =
583         GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, pth->priority,
584                                            GNUNET_TIME_absolute_get_remaining
585                                            (pth->timeout), peer, pth->size,
586                                            &peer_transmit_ready_cb, cp);
587     GNUNET_assert (NULL != cp->cth);
588     GNUNET_assert (0 < cp->cth_in_progress--);
589   }
590 }
591
592
593 /**
594  * A peer connected to us.  Setup the connected peer
595  * records.
596  *
597  * @param peer identity of peer that connected
598  * @param atsi performance data for the connection
599  * @param atsi_count number of records in 'atsi'
600  * @return handle to connected peer entry
601  */
602 struct GSF_ConnectedPeer *
603 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
604                            const struct GNUNET_ATS_Information *atsi,
605                            unsigned int atsi_count)
606 {
607   struct GSF_ConnectedPeer *cp;
608   char *fn;
609   uint32_t respect;
610
611   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to peer %s\n",
612               GNUNET_i2s (peer));
613   cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
614   cp->ppd.pid = GNUNET_PEER_intern (peer);
615   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
616   cp->rc =
617       GNUNET_ATS_reserve_bandwidth (ats, peer, DBLOCK_SIZE,
618                                     &ats_reserve_callback, cp);
619   fn = get_respect_filename (peer);
620   if ((GNUNET_YES == GNUNET_DISK_file_test (fn)) &&
621       (sizeof (respect) == GNUNET_DISK_fn_read (fn, &respect, sizeof (respect))))
622     cp->disk_respect = cp->ppd.respect = ntohl (respect);
623   GNUNET_free (fn);
624   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
625   GNUNET_break (GNUNET_OK ==
626                 GNUNET_CONTAINER_multihashmap_put (cp_map, 
627                                                    &GSF_connected_peer_get_identity2_ (cp)->hashPubKey,
628                                                    cp,
629                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
630   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
631                          GNUNET_CONTAINER_multihashmap_size (cp_map),
632                          GNUNET_NO);
633   update_atsi (cp, atsi, atsi_count);
634   GSF_push_start_ (cp);
635   return cp;
636 }
637
638
639 /**
640  * It may be time to re-start migrating content to this
641  * peer.  Check, and if so, restart migration.
642  *
643  * @param cls the 'struct GSF_ConnectedPeer'
644  * @param tc scheduler context
645  */
646 static void
647 revive_migration (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
648 {
649   struct GSF_ConnectedPeer *cp = cls;
650   struct GNUNET_TIME_Relative bt;
651
652   cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
653   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
654   if (0 != bt.rel_value)
655   {
656     /* still time left... */
657     cp->mig_revive_task =
658         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
659     return;
660   }
661   GSF_push_start_ (cp);
662 }
663
664
665 /**
666  * Get a handle for a connected peer.
667  *
668  * @param peer peer's identity
669  * @return NULL if the peer is not currently connected
670  */
671 struct GSF_ConnectedPeer *
672 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
673 {
674   if (NULL == cp_map)
675     return NULL;
676   return GNUNET_CONTAINER_multihashmap_get (cp_map, &peer->hashPubKey);
677 }
678
679
680 /**
681  * Handle P2P "MIGRATION_STOP" message.
682  *
683  * @param cls closure, always NULL
684  * @param other the other peer involved (sender or receiver, NULL
685  *        for loopback messages where we are both sender and receiver)
686  * @param message the actual message
687  * @param atsi performance information
688  * @param atsi_count number of records in 'atsi'
689  * @return GNUNET_OK to keep the connection open,
690  *         GNUNET_SYSERR to close it (signal serious error)
691  */
692 int
693 GSF_handle_p2p_migration_stop_ (void *cls,
694                                 const struct GNUNET_PeerIdentity *other,
695                                 const struct GNUNET_MessageHeader *message,
696                                 const struct GNUNET_ATS_Information *atsi,
697                                 unsigned int atsi_count)
698 {
699   struct GSF_ConnectedPeer *cp;
700   const struct MigrationStopMessage *msm;
701   struct GNUNET_TIME_Relative bt;
702
703   msm = (const struct MigrationStopMessage *) message;
704   cp = GSF_peer_get_ (other);
705   if (NULL == cp)
706   {
707     GNUNET_break (0);
708     return GNUNET_OK;
709   }
710   GNUNET_STATISTICS_update (GSF_stats,
711                             gettext_noop ("# migration stop messages received"),
712                             1, GNUNET_NO);
713   bt = GNUNET_TIME_relative_ntoh (msm->duration);
714   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
715               _("Migration of content to peer `%s' blocked for %s\n"),
716               GNUNET_i2s (other), 
717               GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
718   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
719   if (GNUNET_SCHEDULER_NO_TASK == cp->mig_revive_task)
720   {
721     GSF_push_stop_ (cp);
722     cp->mig_revive_task =
723         GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
724   }
725   update_atsi (cp, atsi, atsi_count);
726   return GNUNET_OK;
727 }
728
729
730 /**
731  * Copy reply and free put message.
732  *
733  * @param cls the 'struct PutMessage'
734  * @param buf_size number of bytes available in buf
735  * @param buf where to copy the message, NULL on error (peer disconnect)
736  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
737  */
738 static size_t
739 copy_reply (void *cls, size_t buf_size, void *buf)
740 {
741   struct PutMessage *pm = cls;
742   size_t size;
743
744   if (NULL != buf)
745   {
746     GNUNET_assert (buf_size >= ntohs (pm->header.size));
747     size = ntohs (pm->header.size);
748     memcpy (buf, pm, size);
749     GNUNET_STATISTICS_update (GSF_stats,
750                               gettext_noop
751                               ("# replies transmitted to other peers"), 1,
752                               GNUNET_NO);
753   }
754   else
755   {
756     size = 0;
757     GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies dropped"), 1,
758                               GNUNET_NO);
759   }
760   GNUNET_free (pm);
761   return size;
762 }
763
764
765 /**
766  * Free resources associated with the given peer request.
767  *
768  * @param peerreq request to free
769  * @param query associated key for the request
770  */
771 static void
772 free_pending_request (struct PeerRequest *peerreq,
773                       const struct GNUNET_HashCode *query)
774 {
775   struct GSF_ConnectedPeer *cp = peerreq->cp;
776
777   if (GNUNET_SCHEDULER_NO_TASK != peerreq->kill_task)
778   {
779     GNUNET_SCHEDULER_cancel (peerreq->kill_task);
780     peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
781   }
782   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
783                             -1, GNUNET_NO);
784   GNUNET_break (GNUNET_YES ==
785                 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
786                                                       query, peerreq));
787   GNUNET_free (peerreq);
788 }
789
790
791 /**
792  * Cancel all requests associated with the peer.
793  *
794  * @param cls unused
795  * @param query hash code of the request
796  * @param value the 'struct GSF_PendingRequest'
797  * @return GNUNET_YES (continue to iterate)
798  */
799 static int
800 cancel_pending_request (void *cls, const struct GNUNET_HashCode * query, void *value)
801 {
802   struct PeerRequest *peerreq = value;
803   struct GSF_PendingRequest *pr = peerreq->pr;
804   struct GSF_PendingRequestData *prd;
805
806   prd = GSF_pending_request_get_data_ (pr);
807   GSF_pending_request_cancel_ (pr, GNUNET_NO);
808   free_pending_request (peerreq, &prd->query);
809   return GNUNET_OK;
810 }
811
812
813 /**
814  * Free the given request.
815  *
816  * @param cls the request to free
817  * @param tc task context
818  */
819 static void
820 peer_request_destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
821 {
822   struct PeerRequest *peerreq = cls;
823   struct GSF_PendingRequest *pr = peerreq->pr;
824   struct GSF_PendingRequestData *prd;
825
826   peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
827   prd = GSF_pending_request_get_data_ (pr);
828   cancel_pending_request (NULL, &prd->query, peerreq);
829 }
830
831
832 /**
833  * The artificial delay is over, transmit the message now.
834  *
835  * @param cls the 'struct GSF_DelayedHandle' with the message
836  * @param tc scheduler context
837  */
838 static void
839 transmit_delayed_now (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
840 {
841   struct GSF_DelayedHandle *dh = cls;
842   struct GSF_ConnectedPeer *cp = dh->cp;
843
844   GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
845   if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
846   {
847     GNUNET_free (dh->pm);
848     GNUNET_free (dh);
849     return;
850   }
851   (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
852                              dh->msize, &copy_reply, dh->pm);
853   GNUNET_free (dh);
854 }
855
856
857 /**
858  * Get the randomized delay a response should be subjected to.
859  *
860  * @return desired delay
861  */
862 static struct GNUNET_TIME_Relative
863 get_randomized_delay ()
864 {
865   struct GNUNET_TIME_Relative ret;
866
867   ret =
868       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
869                                      GNUNET_CRYPTO_random_u32
870                                      (GNUNET_CRYPTO_QUALITY_WEAK,
871                                       2 * GSF_avg_latency.rel_value + 1));
872 #if INSANE_STATISTICS
873   GNUNET_STATISTICS_update (GSF_stats,
874                             gettext_noop
875                             ("# artificial delays introduced (ms)"),
876                             ret.rel_value, GNUNET_NO);
877 #endif
878   return ret;
879 }
880
881
882 /**
883  * Handle a reply to a pending request.  Also called if a request
884  * expires (then with data == NULL).  The handler may be called
885  * many times (depending on the request type), but will not be
886  * called during or after a call to GSF_pending_request_cancel
887  * and will also not be called anymore after a call signalling
888  * expiration.
889  *
890  * @param cls 'struct PeerRequest' this is an answer for
891  * @param eval evaluation of the result
892  * @param pr handle to the original pending request
893  * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
894  * @param expiration when does 'data' expire?
895  * @param last_transmission when did we last transmit a request for this block
896  * @param type type of the block
897  * @param data response data, NULL on request expiration
898  * @param data_len number of bytes in data
899  */
900 static void
901 handle_p2p_reply (void *cls, enum GNUNET_BLOCK_EvaluationResult eval,
902                   struct GSF_PendingRequest *pr, uint32_t reply_anonymity_level,
903                   struct GNUNET_TIME_Absolute expiration,
904                   struct GNUNET_TIME_Absolute last_transmission,
905                   enum GNUNET_BLOCK_Type type, const void *data,
906                   size_t data_len)
907 {
908   struct PeerRequest *peerreq = cls;
909   struct GSF_ConnectedPeer *cp = peerreq->cp;
910   struct GSF_PendingRequestData *prd;
911   struct PutMessage *pm;
912   size_t msize;
913
914   GNUNET_assert (data_len + sizeof (struct PutMessage) <
915                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
916   GNUNET_assert (peerreq->pr == pr);
917   prd = GSF_pending_request_get_data_ (pr);
918   if (NULL == data)
919   {
920     free_pending_request (peerreq, &prd->query);
921     return;
922   }
923   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
924   if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
925   {
926     GNUNET_STATISTICS_update (GSF_stats,
927                               gettext_noop
928                               ("# replies dropped due to type mismatch"),
929                                 1, GNUNET_NO);
930     return;
931   }
932   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933               "Transmitting result for query `%s' to peer\n",
934               GNUNET_h2s (&prd->query));
935   GNUNET_STATISTICS_update (GSF_stats,
936                             gettext_noop ("# replies received for other peers"),
937                             1, GNUNET_NO);
938   msize = sizeof (struct PutMessage) + data_len;
939   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
940   {
941     GNUNET_break (0);
942     return;
943   }
944   if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
945   {
946     if (reply_anonymity_level - 1 > GSF_cover_content_count)
947     {
948       GNUNET_STATISTICS_update (GSF_stats,
949                                 gettext_noop
950                                 ("# replies dropped due to insufficient cover traffic"),
951                                 1, GNUNET_NO);
952       return;
953     }
954     GSF_cover_content_count -= (reply_anonymity_level - 1);
955   }
956
957   pm = GNUNET_malloc (msize);
958   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
959   pm->header.size = htons (msize);
960   pm->type = htonl (type);
961   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
962   memcpy (&pm[1], data, data_len);
963   if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) &&
964       (GNUNET_YES == GSF_enable_randomized_delays))
965   {
966     struct GSF_DelayedHandle *dh;
967
968     dh = GNUNET_malloc (sizeof (struct GSF_DelayedHandle));
969     dh->cp = cp;
970     dh->pm = pm;
971     dh->msize = msize;
972     GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
973     dh->delay_task =
974         GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
975                                       &transmit_delayed_now, dh);
976   }
977   else
978   {
979     (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
980                                &copy_reply, pm);
981   }
982   if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
983     return;
984   if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task)
985   {
986     GNUNET_STATISTICS_update (GSF_stats,
987                               gettext_noop
988                               ("# P2P searches destroyed due to ultimate reply"),
989                               1, GNUNET_NO);
990     peerreq->kill_task =
991         GNUNET_SCHEDULER_add_now (&peer_request_destroy, peerreq);
992   }
993 }
994
995
996 /**
997  * Increase the peer's respect by a value.
998  *
999  * @param cp which peer to change the respect value on
1000  * @param value is the int value by which the
1001  *  peer's credit is to be increased or decreased
1002  * @returns the actual change in respect (positive or negative)
1003  */
1004 static int
1005 change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
1006 {
1007   if (0 == value)
1008     return 0;
1009   GNUNET_assert (NULL != cp);
1010   if (value > 0)
1011   {
1012     if (cp->ppd.respect + value < cp->ppd.respect)
1013     {
1014       value = UINT32_MAX - cp->ppd.respect;
1015       cp->ppd.respect = UINT32_MAX;
1016     }
1017     else
1018       cp->ppd.respect += value;
1019   }
1020   else
1021   {
1022     if (cp->ppd.respect < -value)
1023     {
1024       value = -cp->ppd.respect;
1025       cp->ppd.respect = 0;
1026     }
1027     else
1028       cp->ppd.respect += value;
1029   }
1030   return value;
1031 }
1032
1033
1034 /**
1035  * We've received a request with the specified priority.  Bound it
1036  * according to how much we respect the given peer.
1037  *
1038  * @param prio_in requested priority
1039  * @param cp the peer making the request
1040  * @return effective priority
1041  */
1042 static int32_t
1043 bound_priority (uint32_t prio_in, struct GSF_ConnectedPeer *cp)
1044 {
1045 #define N ((double)128.0)
1046   uint32_t ret;
1047   double rret;
1048   int ld;
1049
1050   ld = GSF_test_get_load_too_high_ (0);
1051   if (GNUNET_SYSERR == ld)
1052   {
1053 #if INSANE_STATISTICS
1054     GNUNET_STATISTICS_update (GSF_stats,
1055                               gettext_noop
1056                               ("# requests done for free (low load)"), 1,
1057                               GNUNET_NO);
1058 #endif
1059     return 0;                   /* excess resources */
1060   }
1061   if (prio_in > INT32_MAX)
1062     prio_in = INT32_MAX;
1063   ret = -change_peer_respect (cp, -(int) prio_in);
1064   if (ret > 0)
1065   {
1066     if (ret > GSF_current_priorities + N)
1067       rret = GSF_current_priorities + N;
1068     else
1069       rret = ret;
1070     GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1071   }
1072   if ((GNUNET_YES == ld) && (ret > 0))
1073   {
1074     /* try with charging */
1075     ld = GSF_test_get_load_too_high_ (ret);
1076   }
1077   if (GNUNET_YES == ld)
1078   {
1079     GNUNET_STATISTICS_update (GSF_stats,
1080                               gettext_noop
1081                               ("# request dropped, priority insufficient"), 1,
1082                               GNUNET_NO);
1083     /* undo charge */
1084     change_peer_respect (cp, (int) ret);
1085     return -1;                  /* not enough resources */
1086   }
1087   else
1088   {
1089     GNUNET_STATISTICS_update (GSF_stats,
1090                               gettext_noop
1091                               ("# requests done for a price (normal load)"), 1,
1092                               GNUNET_NO);
1093   }
1094 #undef N
1095   return ret;
1096 }
1097
1098
1099 /**
1100  * The priority level imposes a bound on the maximum
1101  * value for the ttl that can be requested.
1102  *
1103  * @param ttl_in requested ttl
1104  * @param prio given priority
1105  * @return ttl_in if ttl_in is below the limit,
1106  *         otherwise the ttl-limit for the given priority
1107  */
1108 static int32_t
1109 bound_ttl (int32_t ttl_in, uint32_t prio)
1110 {
1111   unsigned long long allowed;
1112
1113   if (ttl_in <= 0)
1114     return ttl_in;
1115   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1116   if (ttl_in > allowed)
1117   {
1118     if (allowed >= (1 << 30))
1119       return 1 << 30;
1120     return allowed;
1121   }
1122   return ttl_in;
1123 }
1124
1125
1126 /**
1127  * Handle P2P "QUERY" message.  Creates the pending request entry
1128  * and sets up all of the data structures to that we will
1129  * process replies properly.  Does not initiate forwarding or
1130  * local database lookups.
1131  *
1132  * @param other the other peer involved (sender or receiver, NULL
1133  *        for loopback messages where we are both sender and receiver)
1134  * @param message the actual message
1135  * @return pending request handle, NULL on error
1136  */
1137 struct GSF_PendingRequest *
1138 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1139                        const struct GNUNET_MessageHeader *message)
1140 {
1141   struct PeerRequest *peerreq;
1142   struct GSF_PendingRequest *pr;
1143   struct GSF_PendingRequestData *prd;
1144   struct GSF_ConnectedPeer *cp;
1145   struct GSF_ConnectedPeer *cps;
1146   const struct GNUNET_HashCode *namespace;
1147   const struct GNUNET_PeerIdentity *target;
1148   enum GSF_PendingRequestOptions options;
1149   uint16_t msize;
1150   const struct GetMessage *gm;
1151   unsigned int bits;
1152   const struct GNUNET_HashCode *opt;
1153   uint32_t bm;
1154   size_t bfsize;
1155   uint32_t ttl_decrement;
1156   int32_t priority;
1157   int32_t ttl;
1158   enum GNUNET_BLOCK_Type type;
1159   GNUNET_PEER_Id spid;
1160
1161   GNUNET_assert (other != NULL);
1162   msize = ntohs (message->size);
1163   if (msize < sizeof (struct GetMessage))
1164   {
1165     GNUNET_break_op (0);
1166     return NULL;
1167   }
1168   GNUNET_STATISTICS_update (GSF_stats,
1169                             gettext_noop
1170                             ("# GET requests received (from other peers)"), 1,
1171                             GNUNET_NO);
1172   gm = (const struct GetMessage *) message;
1173   type = ntohl (gm->type);
1174   bm = ntohl (gm->hash_bitmap);
1175   bits = 0;
1176   while (bm > 0)
1177   {
1178     if (1 == (bm & 1))
1179       bits++;
1180     bm >>= 1;
1181   }
1182   if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_HashCode))
1183   {
1184     GNUNET_break_op (0);
1185     return NULL;
1186   }
1187   opt = (const struct GNUNET_HashCode *) &gm[1];
1188   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_HashCode);
1189   /* bfsize must be power of 2, check! */
1190   if (0 != ((bfsize - 1) & bfsize))
1191   {
1192     GNUNET_break_op (0);
1193     return NULL;
1194   }
1195   GSF_cover_query_count++;
1196   bm = ntohl (gm->hash_bitmap);
1197   bits = 0;
1198   cps = GSF_peer_get_ (other);
1199   if (NULL == cps)
1200   {
1201     /* peer must have just disconnected */
1202     GNUNET_STATISTICS_update (GSF_stats,
1203                               gettext_noop
1204                               ("# requests dropped due to initiator not being connected"),
1205                               1, GNUNET_NO);
1206     return NULL;
1207   }
1208   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1209     cp = GSF_peer_get_ ((const struct GNUNET_PeerIdentity *) &opt[bits++]);
1210   else
1211     cp = cps;
1212   if (NULL == cp)
1213   {
1214     if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1215       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1216                   "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
1217                   GNUNET_i2s ((const struct GNUNET_PeerIdentity *)
1218                               &opt[bits - 1]));
1219
1220     else
1221       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222                   "Failed to find peer `%4s' in connection set. Dropping query.\n",
1223                   GNUNET_i2s (other));
1224 #if INSANE_STATISTICS
1225     GNUNET_STATISTICS_update (GSF_stats,
1226                               gettext_noop
1227                               ("# requests dropped due to missing reverse route"),
1228                               1, GNUNET_NO);
1229 #endif
1230     return NULL;
1231   }
1232   /* note that we can really only check load here since otherwise
1233    * peers could find out that we are overloaded by not being
1234    * disconnected after sending us a malformed query... */
1235   priority = bound_priority (ntohl (gm->priority), cps);
1236   if (priority < 0)
1237   {
1238     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1239                 "Dropping query from `%s', this peer is too busy.\n",
1240                 GNUNET_i2s (other));
1241     return NULL;
1242   }
1243   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1244               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
1245               GNUNET_h2s (&gm->query), (unsigned int) type, GNUNET_i2s (other),
1246               (unsigned int) bm);
1247   namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
1248   if ((GNUNET_BLOCK_TYPE_FS_SBLOCK == type) && (NULL == namespace))
1249   {
1250     GNUNET_break_op (0);
1251     return NULL;
1252   }
1253   if ((GNUNET_BLOCK_TYPE_FS_SBLOCK != type) && (NULL != namespace))
1254   {
1255     GNUNET_break_op (0);
1256     return NULL;
1257   }
1258   target =
1259       (0 !=
1260        (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity
1261                                                *) &opt[bits++]) : NULL;
1262   options = GSF_PRO_DEFAULTS;
1263   spid = 0;
1264   if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
1265       || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1266           GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 +
1267           GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1268   {
1269     /* don't have BW to send to peer, or would likely take longer than we have for it,
1270      * so at best indirect the query */
1271     priority = 0;
1272     options |= GSF_PRO_FORWARD_ONLY;
1273     spid = GNUNET_PEER_intern (other);
1274     GNUNET_assert (0 != spid);
1275   }
1276   ttl = bound_ttl (ntohl (gm->ttl), priority);
1277   /* decrement ttl (always) */
1278   ttl_decrement =
1279       2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1280                                                     TTL_DECREMENT);
1281   if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
1282   {
1283     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1285                 GNUNET_i2s (other), ttl, ttl_decrement);
1286     GNUNET_STATISTICS_update (GSF_stats,
1287                               gettext_noop
1288                               ("# requests dropped due TTL underflow"), 1,
1289                               GNUNET_NO);
1290     /* integer underflow => drop (should be very rare)! */
1291     return NULL;
1292   }
1293   ttl -= ttl_decrement;
1294
1295   /* test if the request already exists */
1296   peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map, &gm->query);
1297   if (peerreq != NULL)
1298   {
1299     pr = peerreq->pr;
1300     prd = GSF_pending_request_get_data_ (pr);
1301     if ((prd->type == type) &&
1302         ((type != GNUNET_BLOCK_TYPE_FS_SBLOCK) ||
1303          (0 == memcmp (&prd->namespace, namespace, sizeof (struct GNUNET_HashCode)))))
1304     {
1305       if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get ().abs_value + ttl)
1306       {
1307         /* existing request has higher TTL, drop new one! */
1308         prd->priority += priority;
1309         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1310                     "Have existing request with higher TTL, dropping new request.\n",
1311                     GNUNET_i2s (other));
1312         GNUNET_STATISTICS_update (GSF_stats,
1313                                   gettext_noop
1314                                   ("# requests dropped due to higher-TTL request"),
1315                                   1, GNUNET_NO);
1316         return NULL;
1317       }
1318       /* existing request has lower TTL, drop old one! */
1319       priority += prd->priority;
1320       GSF_pending_request_cancel_ (pr, GNUNET_YES);
1321       free_pending_request (peerreq, &gm->query);
1322     }
1323   }
1324
1325   peerreq = GNUNET_malloc (sizeof (struct PeerRequest));
1326   peerreq->cp = cp;
1327   pr = GSF_pending_request_create_ (options, type, &gm->query, namespace,
1328                                     target,
1329                                     (bfsize >
1330                                      0) ? (const char *) &opt[bits] : NULL,
1331                                     bfsize, ntohl (gm->filter_mutator),
1332                                     1 /* anonymity */ ,
1333                                     (uint32_t) priority, ttl, spid, GNUNET_PEER_intern (other), NULL, 0,        /* replies_seen */
1334                                     &handle_p2p_reply, peerreq);
1335   GNUNET_assert (NULL != pr);
1336   peerreq->pr = pr;
1337   GNUNET_break (GNUNET_OK ==
1338                 GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query,
1339                                                    peerreq,
1340                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1341   GNUNET_STATISTICS_update (GSF_stats,
1342                             gettext_noop
1343                             ("# P2P query messages received and processed"), 1,
1344                             GNUNET_NO);
1345   GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
1346                             1, GNUNET_NO);
1347   return pr;
1348 }
1349
1350
1351 /**
1352  * Function called if there has been a timeout trying to satisfy
1353  * a transmission request.
1354  *
1355  * @param cls the 'struct GSF_PeerTransmitHandle' of the request
1356  * @param tc scheduler context
1357  */
1358 static void
1359 peer_transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1360 {
1361   struct GSF_PeerTransmitHandle *pth = cls;
1362   struct GSF_ConnectedPeer *cp;
1363
1364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1365               "Timeout trying to transmit to other peer\n");
1366   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1367   cp = pth->cp;
1368   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1369   if (GNUNET_YES == pth->is_query)
1370     GNUNET_assert (0 < cp->ppd.pending_queries--);
1371   else if (GNUNET_NO == pth->is_query)
1372     GNUNET_assert (0 < cp->ppd.pending_replies--);
1373   GNUNET_LOAD_update (cp->ppd.transmission_delay, UINT64_MAX);
1374   if (NULL != cp->cth)
1375   {
1376     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1377     cp->cth = NULL;
1378   }
1379   pth->gmc (pth->gmc_cls, 0, NULL);
1380   GNUNET_assert (0 == cp->cth_in_progress);
1381   GNUNET_free (pth);
1382 }
1383
1384
1385 /**
1386  * Transmit a message to the given peer as soon as possible.
1387  * If the peer disconnects before the transmission can happen,
1388  * the callback is invoked with a 'NULL' buffer.
1389  *
1390  * @param cp target peer
1391  * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
1392  * @param priority how important is this request?
1393  * @param timeout when does this request timeout (call gmc with error)
1394  * @param size number of bytes we would like to send to the peer
1395  * @param gmc function to call to get the message
1396  * @param gmc_cls closure for gmc
1397  * @return handle to cancel request
1398  */
1399 struct GSF_PeerTransmitHandle *
1400 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, int is_query,
1401                     uint32_t priority, struct GNUNET_TIME_Relative timeout,
1402                     size_t size, GSF_GetMessageCallback gmc, void *gmc_cls)
1403 {
1404   struct GSF_PeerTransmitHandle *pth;
1405   struct GSF_PeerTransmitHandle *pos;
1406   struct GSF_PeerTransmitHandle *prev;
1407
1408   pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
1409   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1410   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1411   pth->gmc = gmc;
1412   pth->gmc_cls = gmc_cls;
1413   pth->size = size;
1414   pth->is_query = is_query;
1415   pth->priority = priority;
1416   pth->cp = cp;
1417   /* insertion sort (by priority, descending) */
1418   prev = NULL;
1419   pos = cp->pth_head;
1420   while ((NULL != pos) && (pos->priority > priority))
1421   {
1422     prev = pos;
1423     pos = pos->next;
1424   }
1425   GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
1426   if (GNUNET_YES == is_query)
1427     cp->ppd.pending_queries++;
1428   else if (GNUNET_NO == is_query)
1429     cp->ppd.pending_replies++;
1430   pth->timeout_task =
1431       GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
1432   schedule_transmission (pth);
1433   return pth;
1434 }
1435
1436
1437 /**
1438  * Cancel an earlier request for transmission.
1439  *
1440  * @param pth request to cancel
1441  */
1442 void
1443 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1444 {
1445   struct GSF_ConnectedPeer *cp;
1446
1447   if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
1448   {
1449     GNUNET_SCHEDULER_cancel (pth->timeout_task);
1450     pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1451   }
1452   cp = pth->cp;
1453   GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1454   if (GNUNET_YES == pth->is_query)
1455     GNUNET_assert (0 < cp->ppd.pending_queries--);
1456   else if (GNUNET_NO == pth->is_query)
1457     GNUNET_assert (0 < cp->ppd.pending_replies--);
1458   GNUNET_free (pth);
1459 }
1460
1461
1462 /**
1463  * Report on receiving a reply; update the performance record of the given peer.
1464  *
1465  * @param cp responding peer (will be updated)
1466  * @param request_time time at which the original query was transmitted
1467  * @param request_priority priority of the original request
1468  */
1469 void
1470 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1471                               struct GNUNET_TIME_Absolute request_time,
1472                               uint32_t request_priority)
1473 {
1474   struct GNUNET_TIME_Relative delay;
1475
1476   delay = GNUNET_TIME_absolute_get_duration (request_time);
1477   cp->ppd.avg_reply_delay.rel_value =
1478       (cp->ppd.avg_reply_delay.rel_value * (RUNAVG_DELAY_N - 1) +
1479        delay.rel_value) / RUNAVG_DELAY_N;
1480   cp->ppd.avg_priority =
1481       (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1482        request_priority) / RUNAVG_DELAY_N;
1483 }
1484
1485
1486 /**
1487  * Report on receiving a reply in response to an initiating client.
1488  * Remember that this peer is good for this client.
1489  *
1490  * @param cp responding peer (will be updated)
1491  * @param initiator_client local client on responsible for query
1492  */
1493 void
1494 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1495                                    struct GSF_LocalClient *initiator_client)
1496 {
1497   cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1498                               CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1499 }
1500
1501
1502 /**
1503  * Report on receiving a reply in response to an initiating peer.
1504  * Remember that this peer is good for this initiating peer.
1505  *
1506  * @param cp responding peer (will be updated)
1507  * @param initiator_peer other peer responsible for query
1508  */
1509 void
1510 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1511                                  const struct GSF_ConnectedPeer *initiator_peer)
1512 {
1513   unsigned int woff;
1514
1515   woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1516   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1517   cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1518   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1519   cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1520 }
1521
1522
1523 /**
1524  * A peer disconnected from us.  Tear down the connected peer
1525  * record.
1526  *
1527  * @param cls unused
1528  * @param peer identity of peer that connected
1529  */
1530 void
1531 GSF_peer_disconnect_handler_ (void *cls, const struct GNUNET_PeerIdentity *peer)
1532 {
1533   struct GSF_ConnectedPeer *cp;
1534   struct GSF_PeerTransmitHandle *pth;
1535   struct GSF_DelayedHandle *dh;
1536
1537   cp = GSF_peer_get_ (peer);
1538   if (NULL == cp)
1539     return;                     /* must have been disconnect from core with
1540                                  * 'peer' == my_id, ignore */
1541   GNUNET_assert (GNUNET_YES ==
1542                  GNUNET_CONTAINER_multihashmap_remove (cp_map,
1543                                                        &peer->hashPubKey, cp));
1544   GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1545                          GNUNET_CONTAINER_multihashmap_size (cp_map),
1546                          GNUNET_NO);
1547   if (NULL != cp->migration_pth)
1548   {
1549     GSF_peer_transmit_cancel_ (cp->migration_pth);
1550     cp->migration_pth = NULL;
1551   }
1552   if (NULL != cp->rc)
1553   {
1554     GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1555     cp->rc = NULL;
1556   }
1557   if (GNUNET_SCHEDULER_NO_TASK != cp->rc_delay_task)
1558   {
1559     GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1560     cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
1561   }
1562   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1563                                          &cancel_pending_request, cp);
1564   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1565   cp->request_map = NULL;
1566   GSF_plan_notify_peer_disconnect_ (cp);
1567   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1568   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1569   memset (cp->ppd.last_p2p_replies, 0, sizeof (cp->ppd.last_p2p_replies));
1570   GSF_push_stop_ (cp);
1571   if (NULL != cp->cth)
1572   {
1573     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1574     cp->cth = NULL;
1575   }
1576   GNUNET_assert (0 == cp->cth_in_progress);
1577   while (NULL != (pth = cp->pth_head))
1578   {
1579     if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1580     {
1581       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1582       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1583     }
1584     GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1585     pth->gmc (pth->gmc_cls, 0, NULL);
1586     GNUNET_free (pth);
1587   }
1588   while (NULL != (dh = cp->delayed_head))
1589   {
1590     GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
1591     GNUNET_SCHEDULER_cancel (dh->delay_task);
1592     GNUNET_free (dh->pm);
1593     GNUNET_free (dh);
1594   }
1595   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1596   if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1597   {
1598     GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1599     cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
1600   }
1601   GNUNET_free (cp);
1602 }
1603
1604
1605 /**
1606  * Closure for 'call_iterator'.
1607  */
1608 struct IterationContext
1609 {
1610   /**
1611    * Function to call on each entry.
1612    */
1613   GSF_ConnectedPeerIterator it;
1614
1615   /**
1616    * Closure for 'it'.
1617    */
1618   void *it_cls;
1619 };
1620
1621
1622 /**
1623  * Function that calls the callback for each peer.
1624  *
1625  * @param cls the 'struct IterationContext*'
1626  * @param key identity of the peer
1627  * @param value the 'struct GSF_ConnectedPeer*'
1628  * @return GNUNET_YES to continue iteration
1629  */
1630 static int
1631 call_iterator (void *cls, const struct GNUNET_HashCode * key, void *value)
1632 {
1633   struct IterationContext *ic = cls;
1634   struct GSF_ConnectedPeer *cp = value;
1635
1636   ic->it (ic->it_cls, (const struct GNUNET_PeerIdentity *) key, cp, &cp->ppd);
1637   return GNUNET_YES;
1638 }
1639
1640
1641 /**
1642  * Iterate over all connected peers.
1643  *
1644  * @param it function to call for each peer
1645  * @param it_cls closure for it
1646  */
1647 void
1648 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it, void *it_cls)
1649 {
1650   struct IterationContext ic;
1651
1652   ic.it = it;
1653   ic.it_cls = it_cls;
1654   GNUNET_CONTAINER_multihashmap_iterate (cp_map, &call_iterator, &ic);
1655 }
1656
1657
1658 /**
1659  * Obtain the identity of a connected peer.
1660  *
1661  * @param cp peer to get identity of
1662  * @param id identity to set (written to)
1663  */
1664 void
1665 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1666                                   struct GNUNET_PeerIdentity *id)
1667 {
1668   GNUNET_assert (0 != cp->ppd.pid);
1669   GNUNET_PEER_resolve (cp->ppd.pid, id);
1670 }
1671
1672
1673 /**
1674  * Obtain the identity of a connected peer.
1675  *
1676  * @param cp peer to get identity of
1677  * @return reference to peer identity, valid until peer disconnects (!)
1678  */
1679 const struct GNUNET_PeerIdentity *
1680 GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1681 {
1682   GNUNET_assert (0 != cp->ppd.pid);
1683   return GNUNET_PEER_resolve2 (cp->ppd.pid);
1684 }
1685
1686
1687 /**
1688  * Assemble a migration stop message for transmission.
1689  *
1690  * @param cls the 'struct GSF_ConnectedPeer' to use
1691  * @param size number of bytes we're allowed to write to buf
1692  * @param buf where to copy the message
1693  * @return number of bytes copied to buf
1694  */
1695 static size_t
1696 create_migration_stop_message (void *cls, size_t size, void *buf)
1697 {
1698   struct GSF_ConnectedPeer *cp = cls;
1699   struct MigrationStopMessage msm;
1700
1701   cp->migration_pth = NULL;
1702   if (NULL == buf)
1703     return 0;
1704   GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1705   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1706   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1707   msm.reserved = htonl (0);
1708   msm.duration =
1709       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1710                                  (cp->last_migration_block));
1711   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1712   GNUNET_STATISTICS_update (GSF_stats,
1713                             gettext_noop ("# migration stop messages sent"),
1714                             1, GNUNET_NO);
1715   return sizeof (struct MigrationStopMessage);
1716 }
1717
1718
1719 /**
1720  * Ask a peer to stop migrating data to us until the given point
1721  * in time.
1722  *
1723  * @param cp peer to ask
1724  * @param block_time until when to block
1725  */
1726 void
1727 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1728                            struct GNUNET_TIME_Absolute block_time)
1729 {
1730   if (cp->last_migration_block.abs_value > block_time.abs_value)
1731   {
1732     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1733                 "Migration already blocked for another %s\n",
1734                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
1735                                                         (cp->last_migration_block), GNUNET_YES));
1736     return;                     /* already blocked */
1737   }
1738   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %llu ms\n",
1739               (unsigned long long) GNUNET_TIME_absolute_get_remaining (block_time).rel_value);
1740   cp->last_migration_block = block_time;
1741   if (NULL != cp->migration_pth)
1742     GSF_peer_transmit_cancel_ (cp->migration_pth);
1743   cp->migration_pth =
1744       GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1745                           GNUNET_TIME_UNIT_FOREVER_REL,
1746                           sizeof (struct MigrationStopMessage),
1747                           &create_migration_stop_message, cp);
1748 }
1749
1750
1751 /**
1752  * Write peer-respect information to a file - flush the buffer entry!
1753  *
1754  * @param cls unused
1755  * @param key peer identity
1756  * @param value the 'struct GSF_ConnectedPeer' to flush
1757  * @return GNUNET_OK to continue iteration
1758  */
1759 static int
1760 flush_respect (void *cls, const struct GNUNET_HashCode * key, void *value)
1761 {
1762   struct GSF_ConnectedPeer *cp = value;
1763   char *fn;
1764   uint32_t respect;
1765   struct GNUNET_PeerIdentity pid;
1766
1767   if (cp->ppd.respect == cp->disk_respect)
1768     return GNUNET_OK;           /* unchanged */
1769   GNUNET_assert (0 != cp->ppd.pid);
1770   GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1771   fn = get_respect_filename (&pid);
1772   if (cp->ppd.respect == 0)
1773   {
1774     if ((0 != UNLINK (fn)) && (errno != ENOENT))
1775       GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1776                                 GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1777   }
1778   else
1779   {
1780     respect = htonl (cp->ppd.respect);
1781     if (sizeof (uint32_t) ==
1782         GNUNET_DISK_fn_write (fn, &respect, sizeof (uint32_t),
1783                               GNUNET_DISK_PERM_USER_READ |
1784                               GNUNET_DISK_PERM_USER_WRITE |
1785                               GNUNET_DISK_PERM_GROUP_READ |
1786                               GNUNET_DISK_PERM_OTHER_READ))
1787       cp->disk_respect = cp->ppd.respect;
1788   }
1789   GNUNET_free (fn);
1790   return GNUNET_OK;
1791 }
1792
1793
1794 /**
1795  * Notify core about a preference we have for the given peer
1796  * (to allocate more resources towards it).  The change will
1797  * be communicated the next time we reserve bandwidth with
1798  * core (not instantly).
1799  *
1800  * @param cp peer to reserve bandwidth from
1801  * @param pref preference change
1802  */
1803 void
1804 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1805                                        uint64_t pref)
1806 {
1807   cp->inc_preference += pref;
1808 }
1809
1810
1811 /**
1812  * Call this method periodically to flush respect information to disk.
1813  *
1814  * @param cls closure, not used
1815  * @param tc task context, not used
1816  */
1817 static void
1818 cron_flush_respect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1819 {
1820
1821   if (NULL == cp_map)
1822     return;
1823   GNUNET_CONTAINER_multihashmap_iterate (cp_map, &flush_respect, NULL);
1824   if (NULL == tc)
1825     return;
1826   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1827     return;
1828   GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1829                                               GNUNET_SCHEDULER_PRIORITY_HIGH,
1830                                               &cron_flush_respect, NULL);
1831 }
1832
1833
1834 /**
1835  * Initialize peer management subsystem.
1836  */
1837 void
1838 GSF_connected_peer_init_ ()
1839 {
1840   cp_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_YES);
1841   ats = GNUNET_ATS_performance_init (GSF_cfg, NULL, NULL);
1842   GNUNET_assert (GNUNET_OK ==
1843                  GNUNET_CONFIGURATION_get_value_filename (GSF_cfg, "fs",
1844                                                           "RESPECT",
1845                                                           &respectDirectory));
1846   GNUNET_DISK_directory_create (respectDirectory);
1847   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1848                                       &cron_flush_respect, NULL);
1849 }
1850
1851
1852 /**
1853  * Iterator to free peer entries.
1854  *
1855  * @param cls closure, unused
1856  * @param key current key code
1857  * @param value value in the hash map (peer entry)
1858  * @return GNUNET_YES (we should continue to iterate)
1859  */
1860 static int
1861 clean_peer (void *cls, const struct GNUNET_HashCode * key, void *value)
1862 {
1863   GSF_peer_disconnect_handler_ (NULL, (const struct GNUNET_PeerIdentity *) key);
1864   return GNUNET_YES;
1865 }
1866
1867
1868 /**
1869  * Shutdown peer management subsystem.
1870  */
1871 void
1872 GSF_connected_peer_done_ ()
1873 {
1874   cron_flush_respect (NULL, NULL);
1875   GNUNET_CONTAINER_multihashmap_iterate (cp_map, &clean_peer, NULL);
1876   GNUNET_CONTAINER_multihashmap_destroy (cp_map);
1877   cp_map = NULL;
1878   GNUNET_free (respectDirectory);
1879   respectDirectory = NULL;
1880   GNUNET_ATS_performance_done (ats);
1881   ats = NULL;
1882 }
1883
1884
1885 /**
1886  * Iterator to remove references to LC entry.
1887  *
1888  * @param cls the 'struct GSF_LocalClient*' to look for
1889  * @param key current key code
1890  * @param value value in the hash map (peer entry)
1891  * @return GNUNET_YES (we should continue to iterate)
1892  */
1893 static int
1894 clean_local_client (void *cls, const struct GNUNET_HashCode * key, void *value)
1895 {
1896   const struct GSF_LocalClient *lc = cls;
1897   struct GSF_ConnectedPeer *cp = value;
1898   unsigned int i;
1899
1900   for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1901     if (cp->ppd.last_client_replies[i] == lc)
1902       cp->ppd.last_client_replies[i] = NULL;
1903   return GNUNET_YES;
1904 }
1905
1906
1907 /**
1908  * Notification that a local client disconnected.  Clean up all of our
1909  * references to the given handle.
1910  *
1911  * @param lc handle to the local client (henceforth invalid)
1912  */
1913 void
1914 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1915 {
1916   if (NULL == cp_map)
1917     return;                     /* already cleaned up */
1918   GNUNET_CONTAINER_multihashmap_iterate (cp_map, &clean_local_client,
1919                                          (void *) lc);
1920 }
1921
1922
1923 /* end of gnunet-service-fs_cp.c */