fixes
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_pe.h"
31 #include "gnunet-service-fs_pr.h"
32 #include "gnunet-service-fs_push.h"
33
34 /**
35  * How often do we flush trust values to disk?
36  */
37 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
38
39 /**
40  * After how long do we discard a reply?
41  */
42 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
43
44
45 /**
46  * Handle to cancel a transmission request.
47  */
48 struct GSF_PeerTransmitHandle
49 {
50
51   /**
52    * Kept in a doubly-linked list.
53    */
54   struct GSF_PeerTransmitHandle *next;
55
56   /**
57    * Kept in a doubly-linked list.
58    */
59   struct GSF_PeerTransmitHandle *prev;
60
61   /**
62    * Handle for an active request for transmission to this
63    * peer, or NULL (if core queue was full).
64    */
65   struct GNUNET_CORE_TransmitHandle *cth;
66
67   /**
68    * Time when this transmission request was issued.
69    */
70   struct GNUNET_TIME_Absolute transmission_request_start_time;
71
72   /**
73    * Timeout for this request.
74    */
75   struct GNUNET_TIME_Absolute timeout;
76
77   /**
78    * Task called on timeout, or 0 for none.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
81
82   /**
83    * Function to call to get the actual message.
84    */
85   GSF_GetMessageCallback gmc;
86
87   /**
88    * Peer this request targets.
89    */
90   struct GSF_ConnectedPeer *cp;
91
92   /**
93    * Closure for 'gmc'.
94    */
95   void *gmc_cls;
96
97   /**
98    * Size of the message to be transmitted.
99    */
100   size_t size;
101
102   /**
103    * GNUNET_YES if this is a query, GNUNET_NO for content.
104    */
105   int is_query;
106
107   /**
108    * Priority of this request.
109    */
110   uint32_t priority;
111
112 };
113
114
115 /**
116  * A connected peer.
117  */
118 struct GSF_ConnectedPeer 
119 {
120
121   /**
122    * Performance data for this peer.
123    */
124   struct GSF_PeerPerformanceData ppd;
125
126   /**
127    * Time until when we blocked this peer from migrating
128    * data to us.
129    */
130   struct GNUNET_TIME_Absolute last_migration_block;
131
132   /**
133    * Task scheduled to revive migration to this peer.
134    */
135   GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
136
137   /**
138    * Messages (replies, queries, content migration) we would like to
139    * send to this peer in the near future.  Sorted by priority, head.
140    */
141   struct GSF_PeerTransmitHandle *pth_head;
142
143   /**
144    * Messages (replies, queries, content migration) we would like to
145    * send to this peer in the near future.  Sorted by priority, tail.
146    */
147   struct GSF_PeerTransmitHandle *pth_tail;
148
149   /**
150    * Migration stop message in our queue, or NULL if we have none pending.
151    */
152   struct GSF_PeerTransmitHandle *migration_pth;
153
154   /**
155    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
156    * NULL if we have successfully reserved 32k, otherwise non-NULL.
157    */
158   struct GNUNET_CORE_InformationRequestContext *irc;
159
160   /**
161    * Active requests from this neighbour.
162    */
163   struct GNUNET_CONTAINER_MultiHashMap *request_map;
164
165   /**
166    * Increase in traffic preference still to be submitted
167    * to the core service for this peer.
168    */
169   uint64_t inc_preference;
170
171   /**
172    * Trust rating for this peer on disk.
173    */
174   uint32_t disk_trust;
175
176   /**
177    * Which offset in "last_p2p_replies" will be updated next?
178    * (we go round-robin).
179    */
180   unsigned int last_p2p_replies_woff;
181
182   /**
183    * Which offset in "last_client_replies" will be updated next?
184    * (we go round-robin).
185    */
186   unsigned int last_client_replies_woff;
187
188   /**
189    * Current offset into 'last_request_times' ring buffer.
190    */
191   unsigned int last_request_times_off;
192
193 };
194
195
196 /**
197  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
198  */
199 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
200
201 /**
202  * Where do we store trust information?
203  */
204 static char *trustDirectory;
205
206
207 /**
208  * Get the filename under which we would store the GNUNET_HELLO_Message
209  * for the given host and protocol.
210  * @return filename of the form DIRECTORY/HOSTID
211  */
212 static char *
213 get_trust_filename (const struct GNUNET_PeerIdentity *id)
214 {
215   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
216   char *fn;
217
218   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
219   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
220   return fn;
221 }
222
223
224 /**
225  * Find latency information in 'atsi'.
226  *
227  * @param atsi performance data
228  * @return connection latency
229  */
230 static struct GNUNET_TIME_Relative
231 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
232 {
233   if (atsi == NULL)
234     return GNUNET_TIME_UNIT_SECONDS;
235   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
236           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
237     atsi++;
238   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
239     {
240       GNUNET_break (0);
241       /* how can we not have latency data? */
242       return GNUNET_TIME_UNIT_SECONDS;
243     }
244   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
245                                         ntohl (atsi->value));
246 }
247
248
249 /**
250  * Update the performance information kept for the given peer.
251  *
252  * @param cp peer record to update
253  * @param atsi transport performance data
254  */
255 static void
256 update_atsi (struct GSF_ConnectedPeer *cp,
257              const struct GNUNET_TRANSPORT_ATS_Information *atsi)
258 {
259   struct GNUNET_TIME_Relative latency;
260
261   latency = get_latency (atsi);
262   GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
263                                  latency);
264   /* LATER: merge atsi into cp's performance data (if we ever care...) */
265 }
266
267
268 /**
269  * Return the performance data record for the given peer
270  * 
271  * @param cp peer to query
272  * @return performance data record for the peer
273  */
274 struct GSF_PeerPerformanceData *
275 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
276 {
277   return &cp->ppd;
278 }
279
280
281 /**
282  * Core is ready to transmit to a peer, get the message.
283  *
284  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
285  * @param size number of bytes core is willing to take
286  * @param buf where to copy the message
287  * @return number of bytes copied to buf
288  */
289 static size_t
290 peer_transmit_ready_cb (void *cls,
291                         size_t size,
292                         void *buf)
293 {
294   struct GSF_PeerTransmitHandle *pth = cls;
295   struct GSF_ConnectedPeer *cp;
296   size_t ret;
297
298   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
299     {
300       GNUNET_SCHEDULER_cancel (pth->timeout_task);
301       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
302     }
303   cp = pth->cp;
304   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
305                                cp->pth_tail,
306                                pth);
307   if (GNUNET_YES == pth->is_query)
308     {
309       cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
310       GNUNET_assert (0 < cp->ppd.pending_queries--);    
311     }
312   else if (GNUNET_NO == pth->is_query)
313     {
314       GNUNET_assert (0 < cp->ppd.pending_replies--);
315     }
316   GNUNET_LOAD_update (cp->ppd.transmission_delay,
317                       GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value);  
318   ret = pth->gmc (pth->gmc_cls, 
319                   0, NULL);
320   GNUNET_free (pth);  
321   return ret;
322 }
323
324
325 /**
326  * Function called by core upon success or failure of our bandwidth reservation request.
327  *
328  * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
329  * @param peer identifies the peer
330  * @param bandwidth_out available amount of outbound bandwidth
331  * @param amount set to the amount that was actually reserved or unreserved;
332  *               either the full requested amount or zero (no partial reservations)
333  * @param preference current traffic preference for the given peer
334  */
335 static void
336 core_reserve_callback (void *cls,
337                        const struct GNUNET_PeerIdentity *peer,
338                        struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
339                        int amount,
340                        uint64_t preference)
341 {
342   struct GSF_ConnectedPeer *cp = cls;
343   struct GSF_PeerTransmitHandle *pth;
344   uint64_t ip;
345
346   cp->irc = NULL;
347   if (0 == amount)
348     {
349       /* failed; retry! (how did we get here!?) */
350       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
351                   _("Failed to reserve bandwidth to peer `%s'\n"),
352                   GNUNET_i2s (peer));
353       ip = cp->inc_preference;
354       cp->inc_preference = 0;
355       cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
356                                                     peer,
357                                                     GNUNET_TIME_UNIT_FOREVER_REL,
358                                                     GNUNET_BANDWIDTH_VALUE_MAX,
359                                                     DBLOCK_SIZE,
360                                                     ip,
361                                                     &core_reserve_callback,
362                                                     cp);
363       return;
364     }
365   pth = cp->pth_head;
366   if ( (NULL != pth) &&
367        (NULL == pth->cth) )
368     {
369       /* reservation success, try transmission now! */
370       pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
371                                                     GNUNET_YES,
372                                                     pth->priority,
373                                                     GNUNET_TIME_absolute_get_remaining (pth->timeout),
374                                                     peer,
375                                                     pth->size,
376                                                     &peer_transmit_ready_cb,
377                                                     pth);
378     }
379 }
380
381
382 /**
383  * A peer connected to us.  Setup the connected peer
384  * records.
385  *
386  * @param peer identity of peer that connected
387  * @param atsi performance data for the connection
388  * @return handle to connected peer entry
389  */
390 struct GSF_ConnectedPeer *
391 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
392                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
393 {
394   struct GSF_ConnectedPeer *cp;
395   char *fn;
396   uint32_t trust;
397
398   cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
399   cp->ppd.pid = GNUNET_PEER_intern (peer);
400   cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
401   cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
402                                                 peer,
403                                                 GNUNET_TIME_UNIT_FOREVER_REL,
404                                                 GNUNET_BANDWIDTH_VALUE_MAX,
405                                                 DBLOCK_SIZE,
406                                                 0,
407                                                 &core_reserve_callback,
408                                                 cp);
409   fn = get_trust_filename (peer);
410   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
411       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
412     cp->disk_trust = cp->ppd.trust = ntohl (trust);
413   GNUNET_free (fn);
414   cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
415   GNUNET_break (GNUNET_OK ==
416                 GNUNET_CONTAINER_multihashmap_put (cp_map,
417                                                    &peer->hashPubKey,
418                                                    cp,
419                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
420   update_atsi (cp, atsi);
421   GSF_push_start_ (cp);
422   return cp;
423 }
424
425
426 /**
427  * It may be time to re-start migrating content to this
428  * peer.  Check, and if so, restart migration.
429  *
430  * @param cls the 'struct GSF_ConnectedPeer'
431  * @param tc scheduler context
432  */
433 static void
434 revive_migration (void *cls,
435                   const struct GNUNET_SCHEDULER_TaskContext *tc)
436 {
437   struct GSF_ConnectedPeer *cp = cls;
438   struct GNUNET_TIME_Relative bt;
439   
440   cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
441   bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
442   if (0 != bt.rel_value)
443     {
444       /* still time left... */
445       cp->mig_revive_task 
446         = GNUNET_SCHEDULER_add_delayed (bt,
447                                         &revive_migration,
448                                         cp);
449       return;
450     }
451   GSF_push_start_ (cp);
452 }
453
454
455 /**
456  * Get a handle for a connected peer.
457  *
458  * @param peer peer's identity
459  */
460 struct GSF_ConnectedPeer *
461 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
462 {
463   return GNUNET_CONTAINER_multihashmap_get (cp_map,
464                                             &peer->hashPubKey);
465 }
466
467
468 /**
469  * Handle P2P "MIGRATION_STOP" message.
470  *
471  * @param cls closure, always NULL
472  * @param other the other peer involved (sender or receiver, NULL
473  *        for loopback messages where we are both sender and receiver)
474  * @param message the actual message
475  * @param atsi performance information
476  * @return GNUNET_OK to keep the connection open,
477  *         GNUNET_SYSERR to close it (signal serious error)
478  */
479 int
480 GSF_handle_p2p_migration_stop_ (void *cls,
481                                 const struct GNUNET_PeerIdentity *other,
482                                 const struct GNUNET_MessageHeader *message,
483                                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
484 {
485   struct GSF_ConnectedPeer *cp; 
486   const struct MigrationStopMessage *msm;
487   struct GNUNET_TIME_Relative bt;
488
489   msm = (const struct MigrationStopMessage*) message;
490   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
491                                           &other->hashPubKey);
492   if (cp == NULL)
493     {
494       GNUNET_break (0);
495       return GNUNET_OK;
496     }
497   bt = GNUNET_TIME_relative_ntoh (msm->duration);
498   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
499   if (cp->mig_revive_task == GNUNET_SCHEDULER_NO_TASK)
500     {
501       GSF_push_stop_ (cp);
502       cp->mig_revive_task 
503         = GNUNET_SCHEDULER_add_delayed (bt,
504                                         &revive_migration,
505                                         cp);
506     }
507   update_atsi (cp, atsi);
508   return GNUNET_OK;
509 }
510
511
512 /**
513  * Copy reply and free put message.
514  *
515  * @param cls the 'struct PutMessage'
516  * @param buf_size number of bytes available in buf
517  * @param buf where to copy the message, NULL on error (peer disconnect)
518  * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
519  */
520 static size_t 
521 copy_reply (void *cls,
522             size_t buf_size,
523             void *buf)
524 {
525   struct PutMessage *pm = cls;
526   size_t size;
527
528   if (buf != NULL)
529     {
530       GNUNET_assert (buf_size >= ntohs (pm->header.size));
531       size = ntohs (pm->header.size);
532       memcpy (buf, pm, size); 
533       GNUNET_STATISTICS_update (GSF_stats,
534                                 gettext_noop ("# replies transmitted to other peers"),
535                                 1,
536                                 GNUNET_NO); 
537     }
538   else
539     {
540       size = 0;
541       GNUNET_STATISTICS_update (GSF_stats,
542                                 gettext_noop ("# replies dropped"),
543                                 1,
544                                 GNUNET_NO); 
545     }
546   GNUNET_free (pm);
547   return size;
548 }
549
550
551 /**
552  * Handle a reply to a pending request.  Also called if a request
553  * expires (then with data == NULL).  The handler may be called
554  * many times (depending on the request type), but will not be
555  * called during or after a call to GSF_pending_request_cancel 
556  * and will also not be called anymore after a call signalling
557  * expiration.
558  *
559  * @param cls 'struct GSF_ConnectedPeer' of the peer that would
560  *            have liked an answer to the request
561  * @param pr handle to the original pending request
562  * @param expiration when does 'data' expire?
563  * @param data response data, NULL on request expiration
564  * @param data_len number of bytes in data
565  * @param more GNUNET_YES if the request remains active (may call
566  *             this function again), GNUNET_NO if the request is
567  *             finished (client must not call GSF_pending_request_cancel_)
568  */
569 static void
570 handle_p2p_reply (void *cls,
571                   struct GSF_PendingRequest *pr,
572                   struct GNUNET_TIME_Absolute expiration,
573                   const void *data,
574                   size_t data_len,
575                   int more)
576 {
577   struct GSF_ConnectedPeer *cp = cls;
578   struct GSF_PendingRequestData *prd;
579   struct PutMessage *pm;
580   size_t msize;
581
582   prd = GSF_pending_request_get_data_ (pr);
583   if (NULL == data)
584     {
585       GNUNET_assert (GNUNET_NO == more);
586       GNUNET_STATISTICS_update (GSF_stats,
587                                 gettext_noop ("# P2P searches active"),
588                                 -1,
589                                 GNUNET_NO);
590       GNUNET_break (GNUNET_OK ==
591                     GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
592                                                           &prd->query,
593                                                           pr));
594       return;
595     }  
596 #if DEBUG_FS
597   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
598               "Transmitting result for query `%s'\n",
599               GNUNET_h2s (&prd->query));
600 #endif  
601   GNUNET_STATISTICS_update (GSF_stats,
602                             gettext_noop ("# replies received for other peers"),
603                             1,
604                             GNUNET_NO); 
605   msize = sizeof (struct PutMessage) + data_len;
606   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
607     {
608       GNUNET_break (0);
609       return;
610     }
611   pm = GNUNET_malloc (sizeof (msize));
612   pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
613   pm->header.size = htons (msize);
614   pm->type = htonl (prd->type);
615   pm->expiration = GNUNET_TIME_absolute_hton (expiration);
616   memcpy (&pm[1], data, data_len);
617   (void) GSF_peer_transmit_ (cp, GNUNET_NO,
618                              UINT32_MAX,
619                              REPLY_TIMEOUT,
620                              msize,
621                              &copy_reply,
622                              pm);
623 }
624
625
626 /**
627  * Increase the host credit by a value.
628  *
629  * @param cp which peer to change the trust value on
630  * @param value is the int value by which the
631  *  host credit is to be increased or decreased
632  * @returns the actual change in trust (positive or negative)
633  */
634 static int
635 change_host_trust (struct GSF_ConnectedPeer *cp, int value)
636 {
637   if (value == 0)
638     return 0;
639   GNUNET_assert (cp != NULL);
640   if (value > 0)
641     {
642       if (cp->ppd.trust + value < cp->ppd.trust)
643         {
644           value = UINT32_MAX - cp->ppd.trust;
645           cp->ppd.trust = UINT32_MAX;
646         }
647       else
648         cp->ppd.trust += value;
649     }
650   else
651     {
652       if (cp->ppd.trust < -value)
653         {
654           value = -cp->ppd.trust;
655           cp->ppd.trust = 0;
656         }
657       else
658         cp->ppd.trust += value;
659     }
660   return value;
661 }
662
663
664 /**
665  * We've received a request with the specified priority.  Bound it
666  * according to how much we trust the given peer.
667  * 
668  * @param prio_in requested priority
669  * @param cp the peer making the request
670  * @return effective priority
671  */
672 static int32_t
673 bound_priority (uint32_t prio_in,
674                 struct GSF_ConnectedPeer *cp)
675 {
676 #define N ((double)128.0)
677   uint32_t ret;
678   double rret;
679   int ld;
680
681   ld = GSF_test_get_load_too_high_ (0);
682   if (ld == GNUNET_SYSERR)
683     {
684       GNUNET_STATISTICS_update (GSF_stats,
685                                 gettext_noop ("# requests done for free (low load)"),
686                                 1,
687                                 GNUNET_NO);
688       return 0; /* excess resources */
689     }
690   if (prio_in > INT32_MAX)
691     prio_in = INT32_MAX;
692   ret = - change_host_trust (cp, - (int) prio_in);
693   if (ret > 0)
694     {
695       if (ret > GSF_current_priorities + N)
696         rret = GSF_current_priorities + N;
697       else
698         rret = ret;
699       GSF_current_priorities 
700         = (GSF_current_priorities * (N-1) + rret)/N;
701     }
702   if ( (ld == GNUNET_YES) && (ret > 0) )
703     {
704       /* try with charging */
705       ld = GSF_test_get_load_too_high_ (ret);
706     }
707   if (ld == GNUNET_YES)
708     {
709       GNUNET_STATISTICS_update (GSF_stats,
710                                 gettext_noop ("# request dropped, priority insufficient"),
711                                 1,
712                                 GNUNET_NO);
713       /* undo charge */
714       change_host_trust (cp, (int) ret);
715       return -1; /* not enough resources */
716     }
717   else
718     {
719       GNUNET_STATISTICS_update (GSF_stats,
720                                 gettext_noop ("# requests done for a price (normal load)"),
721                                 1,
722                                 GNUNET_NO);
723     }
724 #undef N
725   return ret;
726 }
727
728
729 /**
730  * The priority level imposes a bound on the maximum
731  * value for the ttl that can be requested.
732  *
733  * @param ttl_in requested ttl
734  * @param prio given priority
735  * @return ttl_in if ttl_in is below the limit,
736  *         otherwise the ttl-limit for the given priority
737  */
738 static int32_t
739 bound_ttl (int32_t ttl_in, uint32_t prio)
740 {
741   unsigned long long allowed;
742
743   if (ttl_in <= 0)
744     return ttl_in;
745   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
746   if (ttl_in > allowed)      
747     {
748       if (allowed >= (1 << 30))
749         return 1 << 30;
750       return allowed;
751     }
752   return ttl_in;
753 }
754
755
756 /**
757  * Handle P2P "QUERY" message.  Creates the pending request entry
758  * and sets up all of the data structures to that we will
759  * process replies properly.  Does not initiate forwarding or
760  * local database lookups.
761  *
762  * @param other the other peer involved (sender or receiver, NULL
763  *        for loopback messages where we are both sender and receiver)
764  * @param message the actual message
765  * @return pending request handle, NULL on error
766  */
767 struct GSF_PendingRequest *
768 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
769                        const struct GNUNET_MessageHeader *message)
770 {
771   struct GSF_PendingRequest *pr;
772   struct GSF_PendingRequestData *prd;
773   struct GSF_ConnectedPeer *cp;
774   struct GSF_ConnectedPeer *cps;
775   const GNUNET_HashCode *namespace;
776   const struct GNUNET_PeerIdentity *target;
777   enum GSF_PendingRequestOptions options;                            
778   uint16_t msize;
779   const struct GetMessage *gm;
780   unsigned int bits;
781   const GNUNET_HashCode *opt;
782   uint32_t bm;
783   size_t bfsize;
784   uint32_t ttl_decrement;
785   int32_t priority;
786   int32_t ttl;
787   enum GNUNET_BLOCK_Type type;
788   GNUNET_PEER_Id spid;
789
790   msize = ntohs(message->size);
791   if (msize < sizeof (struct GetMessage))
792     {
793       GNUNET_break_op (0);
794       return NULL;
795     }
796   gm = (const struct GetMessage*) message;
797 #if DEBUG_FS
798   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799               "Received request for `%s'\n",
800               GNUNET_h2s (&gm->query));
801 #endif
802   type = ntohl (gm->type);
803   bm = ntohl (gm->hash_bitmap);
804   bits = 0;
805   while (bm > 0)
806     {
807       if (1 == (bm & 1))
808         bits++;
809       bm >>= 1;
810     }
811   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
812     {
813       GNUNET_break_op (0);
814       return NULL;
815     }  
816   opt = (const GNUNET_HashCode*) &gm[1];
817   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
818   /* bfsize must be power of 2, check! */
819   if (0 != ( (bfsize - 1) & bfsize))
820     {
821       GNUNET_break_op (0);
822       return NULL;
823     }
824   GSF_cover_query_count++;
825   bm = ntohl (gm->hash_bitmap);
826   bits = 0;
827   cps = GNUNET_CONTAINER_multihashmap_get (cp_map,
828                                            &other->hashPubKey);
829   if (NULL == cps)
830     {
831       /* peer must have just disconnected */
832       GNUNET_STATISTICS_update (GSF_stats,
833                                 gettext_noop ("# requests dropped due to initiator not being connected"),
834                                 1,
835                                 GNUNET_NO);
836       return NULL;
837     }
838   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
839     cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
840                                             &opt[bits++]);
841   else
842     cp = cps;
843   if (cp == NULL)
844     {
845 #if DEBUG_FS
846       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
847         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
848                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
849                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
850       
851       else
852         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
853                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
854                     GNUNET_i2s (other));
855 #endif
856       GNUNET_STATISTICS_update (GSF_stats,
857                                 gettext_noop ("# requests dropped due to missing reverse route"),
858                                 1,
859                                 GNUNET_NO);
860       return NULL;
861     }
862   /* note that we can really only check load here since otherwise
863      peers could find out that we are overloaded by not being
864      disconnected after sending us a malformed query... */
865   priority = bound_priority (ntohl (gm->priority), cps);
866   if (priority < 0)
867     {
868 #if DEBUG_FS
869       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
870                   "Dropping query from `%s', this peer is too busy.\n",
871                   GNUNET_i2s (other));
872 #endif
873       return NULL;
874     }
875 #if DEBUG_FS 
876   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
877               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
878               GNUNET_h2s (&gm->query),
879               (unsigned int) type,
880               GNUNET_i2s (other),
881               (unsigned int) bm);
882 #endif
883   namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
884   target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
885   options = 0;
886   spid = 0;
887   if ( (GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority)) ||
888        (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) > 
889         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)) )
890     {
891       /* don't have BW to send to peer, or would likely take longer than we have for it,
892          so at best indirect the query */
893       priority = 0;
894       options |= GSF_PRO_FORWARD_ONLY;
895       spid = GNUNET_PEER_intern (other);
896     }
897   ttl = bound_ttl (ntohl (gm->ttl), priority);
898   /* decrement ttl (always) */
899   ttl_decrement = 2 * TTL_DECREMENT +
900     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
901                               TTL_DECREMENT);
902   if ( (ttl < 0) &&
903        (((int32_t)(ttl - ttl_decrement)) > 0) )
904     {
905 #if DEBUG_FS
906       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
908                   GNUNET_i2s (other),
909                   ttl,
910                   ttl_decrement);
911 #endif
912       GNUNET_STATISTICS_update (GSF_stats,
913                                 gettext_noop ("# requests dropped due TTL underflow"),
914                                 1,
915                                 GNUNET_NO);
916       /* integer underflow => drop (should be very rare)! */      
917       return NULL;
918     } 
919   ttl -= ttl_decrement;
920
921   /* test if the request already exists */
922   pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
923                                           &gm->query);
924   if (pr != NULL) 
925     {      
926       prd = GSF_pending_request_get_data_ (pr);
927       if ( (prd->type == type) &&
928            ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) ||
929              (0 == memcmp (&prd->namespace,
930                            namespace,
931                            sizeof (GNUNET_HashCode))) ) )
932         {
933           if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl)
934             {
935               /* existing request has higher TTL, drop new one! */
936               prd->priority += priority;
937 #if DEBUG_FS
938               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
939                           "Have existing request with higher TTL, dropping new request.\n",
940                           GNUNET_i2s (other));
941 #endif
942               GNUNET_STATISTICS_update (GSF_stats,
943                                         gettext_noop ("# requests dropped due to higher-TTL request"),
944                                         1,
945                                         GNUNET_NO);
946               return NULL;
947             }
948           /* existing request has lower TTL, drop old one! */
949           priority += prd->priority;
950           GSF_pending_request_cancel_ (pr);
951           GNUNET_assert (GNUNET_YES ==
952                          GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
953                                                                &gm->query,
954                                                                pr));
955         }
956     }
957   
958   pr = GSF_pending_request_create_ (options,
959                                     type,
960                                     &gm->query,
961                                     namespace,
962                                     target,
963                                     (bfsize > 0) ? (const char*)&opt[bits] : NULL,
964                                     bfsize,
965                                     ntohl (gm->filter_mutator),
966                                     1 /* anonymity */,
967                                     (uint32_t) priority,
968                                     ttl,
969                                     spid,
970                                     NULL, 0, /* replies_seen */
971                                     &handle_p2p_reply,
972                                     cp);
973   GNUNET_break (GNUNET_OK ==
974                 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
975                                                    &gm->query,
976                                                    pr,
977                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
978   GNUNET_STATISTICS_update (GSF_stats,
979                             gettext_noop ("# P2P searches received"),
980                             1,
981                             GNUNET_NO);
982   GNUNET_STATISTICS_update (GSF_stats,
983                             gettext_noop ("# P2P searches active"),
984                             1,
985                             GNUNET_NO);
986   return pr;
987 }
988
989
990 /**
991  * Function called if there has been a timeout trying to satisfy
992  * a transmission request.
993  *
994  * @param cls the 'struct GSF_PeerTransmitHandle' of the request 
995  * @param tc scheduler context
996  */
997 static void
998 peer_transmit_timeout (void *cls,
999                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1000 {
1001   struct GSF_PeerTransmitHandle *pth = cls;
1002   struct GSF_ConnectedPeer *cp;
1003   
1004   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1005   cp = pth->cp;
1006   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1007                                cp->pth_tail,
1008                                pth);
1009   if (GNUNET_YES == pth->is_query)
1010     GNUNET_assert (0 < cp->ppd.pending_queries--);    
1011   else if (GNUNET_NO == pth->is_query)
1012     GNUNET_assert (0 < cp->ppd.pending_replies--);
1013   GNUNET_LOAD_update (cp->ppd.transmission_delay,
1014                       UINT64_MAX);
1015   pth->gmc (pth->gmc_cls, 
1016             0, NULL);
1017   GNUNET_free (pth);
1018 }
1019
1020
1021 /**
1022  * Transmit a message to the given peer as soon as possible.
1023  * If the peer disconnects before the transmission can happen,
1024  * the callback is invoked with a 'NULL' buffer.
1025  *
1026  * @param cp target peer
1027  * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
1028  * @param priority how important is this request?
1029  * @param timeout when does this request timeout (call gmc with error)
1030  * @param size number of bytes we would like to send to the peer
1031  * @param gmc function to call to get the message
1032  * @param gmc_cls closure for gmc
1033  * @return handle to cancel request
1034  */
1035 struct GSF_PeerTransmitHandle *
1036 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1037                     int is_query,
1038                     uint32_t priority,
1039                     struct GNUNET_TIME_Relative timeout,
1040                     size_t size,
1041                     GSF_GetMessageCallback gmc,
1042                     void *gmc_cls)
1043 {
1044   struct GSF_PeerTransmitHandle *pth;
1045   struct GSF_PeerTransmitHandle *pos;
1046   struct GSF_PeerTransmitHandle *prev;
1047   struct GNUNET_PeerIdentity target;
1048   uint64_t ip;
1049   int is_ready;
1050
1051   pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
1052   pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1053   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1054   pth->gmc = gmc;
1055   pth->gmc_cls = gmc_cls;
1056   pth->size = size;
1057   pth->is_query = is_query;
1058   pth->priority = priority;
1059   pth->cp = cp;
1060   /* insertion sort (by priority, descending) */
1061   prev = NULL;
1062   pos = cp->pth_head;
1063   while ( (pos != NULL) &&
1064           (pos->priority > priority) )
1065     {
1066       prev = pos;
1067       pos = pos->next;
1068     }
1069   if (prev == NULL)
1070     GNUNET_CONTAINER_DLL_insert (cp->pth_head,
1071                                  cp->pth_tail,
1072                                  pth);
1073   else
1074     GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1075                                        cp->pth_tail,
1076                                        prev,
1077                                        pth);
1078   GNUNET_PEER_resolve (cp->ppd.pid,
1079                        &target);
1080   if (GNUNET_YES == is_query)
1081     {
1082       /* query, need reservation */
1083       cp->ppd.pending_queries++;
1084       if (NULL == cp->irc)
1085         {
1086           /* reservation already done! */
1087           is_ready = GNUNET_YES;
1088           ip = cp->inc_preference;
1089           cp->inc_preference = 0;
1090           cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
1091                                                         &target,
1092                                                         GNUNET_TIME_UNIT_FOREVER_REL,
1093                                                         GNUNET_BANDWIDTH_VALUE_MAX,
1094                                                         DBLOCK_SIZE,
1095                                                         ip,
1096                                                         &core_reserve_callback,
1097                                                         cp);      
1098         }
1099       else
1100         {
1101           /* still waiting for reservation */
1102           is_ready = GNUNET_NO;
1103         }
1104     }
1105   else if (GNUNET_NO == is_query)
1106     {
1107       /* no reservation needed for content */
1108       cp->ppd.pending_replies++;
1109       is_ready = GNUNET_YES;
1110     }
1111   else
1112     {
1113       /* not a query or content, no reservation needed */
1114       is_ready = GNUNET_YES;
1115     }
1116   if (is_ready)
1117     {
1118       pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
1119                                                     GNUNET_YES,
1120                                                     priority,
1121                                                     timeout,
1122                                                     &target,
1123                                                     size,
1124                                                     &peer_transmit_ready_cb,
1125                                                     pth);
1126       /* pth->cth could be NULL here, that's OK, we'll try again
1127          later... */
1128     }
1129   if (pth->cth == NULL)
1130     {
1131       /* if we're waiting for reservation OR if we could not do notify_transmit_ready,
1132          install a timeout task to be on the safe side */
1133       pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
1134                                                         &peer_transmit_timeout,
1135                                                         pth);
1136     }
1137   return pth;
1138 }
1139
1140
1141 /**
1142  * Cancel an earlier request for transmission.
1143  *
1144  * @param pth request to cancel
1145  */
1146 void
1147 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1148 {
1149   struct GSF_ConnectedPeer *cp;
1150
1151   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1152     {
1153       GNUNET_SCHEDULER_cancel (pth->timeout_task);
1154       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1155     }
1156   if (NULL != pth->cth)
1157     {
1158       GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
1159       pth->cth = NULL;
1160     }
1161   cp = pth->cp;
1162   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1163                                cp->pth_tail,
1164                                pth);
1165   if (GNUNET_YES == pth->is_query)
1166     GNUNET_assert (0 < cp->ppd.pending_queries--);    
1167   else if (GNUNET_NO == pth->is_query)
1168     GNUNET_assert (0 < cp->ppd.pending_replies--);
1169   GNUNET_free (pth);
1170 }
1171
1172
1173 /**
1174  * Report on receiving a reply; update the performance record of the given peer.
1175  *
1176  * @param cp responding peer (will be updated)
1177  * @param request_time time at which the original query was transmitted
1178  * @param request_priority priority of the original request
1179  */
1180 void
1181 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1182                               struct GNUNET_TIME_Absolute request_time,
1183                               uint32_t request_priority)
1184 {
1185   struct GNUNET_TIME_Relative delay;
1186
1187   delay = GNUNET_TIME_absolute_get_duration (request_time);  
1188   cp->ppd.avg_reply_delay.rel_value = (cp->ppd.avg_reply_delay.rel_value * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
1189   cp->ppd.avg_priority = (cp->ppd.avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
1190 }
1191
1192
1193 /**
1194  * Report on receiving a reply in response to an initiating client.
1195  * Remember that this peer is good for this client.
1196  *
1197  * @param cp responding peer (will be updated)
1198  * @param initiator_client local client on responsible for query
1199  */
1200 void
1201 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1202                                    struct GSF_LocalClient *initiator_client)
1203 {
1204   cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1205 }
1206
1207
1208 /**
1209  * Report on receiving a reply in response to an initiating peer.
1210  * Remember that this peer is good for this initiating peer.
1211  *
1212  * @param cp responding peer (will be updated)
1213  * @param initiator_peer other peer responsible for query
1214  */
1215 void
1216 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1217                                  const struct GSF_ConnectedPeer *initiator_peer)
1218 {
1219   GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
1220   cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->ppd.pid;
1221   GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1222 }
1223
1224
1225 /**
1226  * Method called whenever a given peer has a status change.
1227  *
1228  * @param cls closure
1229  * @param peer peer identity this notification is about
1230  * @param bandwidth_in available amount of inbound bandwidth
1231  * @param bandwidth_out available amount of outbound bandwidth
1232  * @param timeout absolute time when this peer will time out
1233  *        unless we see some further activity from it
1234  * @param atsi status information
1235  */
1236 void
1237 GSF_peer_status_handler_ (void *cls,
1238                           const struct GNUNET_PeerIdentity *peer,
1239                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1240                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1241                           struct GNUNET_TIME_Absolute timeout,
1242                           const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1243 {
1244   struct GSF_ConnectedPeer *cp;
1245
1246   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1247                                           &peer->hashPubKey);
1248   GNUNET_assert (NULL != cp);
1249   update_atsi (cp, atsi);
1250 }
1251
1252
1253 /**
1254  * Cancel all requests associated with the peer.
1255  *
1256  * @param cls unused
1257  * @param query hash code of the request
1258  * @param value the 'struct GSF_PendingRequest'
1259  * @return GNUNET_YES (continue to iterate)
1260  */
1261 static int
1262 cancel_pending_request (void *cls,
1263                         const GNUNET_HashCode *query,
1264                         void *value)
1265 {
1266   struct GSF_PendingRequest *pr = value;
1267
1268   GSF_pending_request_cancel_ (pr);
1269   return GNUNET_OK;
1270 }
1271
1272
1273 /**
1274  * A peer disconnected from us.  Tear down the connected peer
1275  * record.
1276  *
1277  * @param cls unused
1278  * @param peer identity of peer that connected
1279  */
1280 void
1281 GSF_peer_disconnect_handler_ (void *cls,
1282                               const struct GNUNET_PeerIdentity *peer)
1283 {
1284   struct GSF_ConnectedPeer *cp;
1285   struct GSF_PeerTransmitHandle *pth;
1286
1287   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
1288                                           &peer->hashPubKey);
1289   if (NULL == cp)
1290     return; /* must have been disconnect from core with
1291                'peer' == my_id, ignore */
1292   GNUNET_CONTAINER_multihashmap_remove (cp_map,
1293                                         &peer->hashPubKey,
1294                                         cp);
1295   if (NULL != cp->migration_pth)
1296     {
1297       GSF_peer_transmit_cancel_ (cp->migration_pth);
1298       cp->migration_pth = NULL;
1299     }
1300   if (NULL != cp->irc)
1301     {
1302       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1303       cp->irc = NULL;
1304     }
1305   GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1306                                          &cancel_pending_request,
1307                                          cp);
1308   GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1309   cp->request_map = NULL;
1310   GSF_plan_notify_peer_disconnect_ (cp);
1311   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1312   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1313   while (NULL != (pth = cp->pth_head))
1314     {
1315       if (NULL != pth->cth)
1316         {
1317           GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
1318           pth->cth = NULL;
1319         }
1320       GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1321                                    cp->pth_tail,
1322                                    pth);
1323       GNUNET_free (pth);
1324     }
1325   GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1326   if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1327     {
1328       GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1329       cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
1330     }
1331   GNUNET_free (cp);
1332 }
1333
1334
1335 /**
1336  * Closure for 'call_iterator'.
1337  */
1338 struct IterationContext
1339 {
1340   /**
1341    * Function to call on each entry.
1342    */
1343   GSF_ConnectedPeerIterator it;
1344
1345   /**
1346    * Closure for 'it'.
1347    */
1348   void *it_cls;
1349 };
1350
1351
1352 /**
1353  * Function that calls the callback for each peer.
1354  *
1355  * @param cls the 'struct IterationContext*'
1356  * @param key identity of the peer
1357  * @param value the 'struct GSF_ConnectedPeer*'
1358  * @return GNUNET_YES to continue iteration
1359  */
1360 static int
1361 call_iterator (void *cls,
1362                const GNUNET_HashCode *key,
1363                void *value)
1364 {
1365   struct IterationContext *ic = cls;
1366   struct GSF_ConnectedPeer *cp = value;
1367   
1368   ic->it (ic->it_cls,
1369           (const struct GNUNET_PeerIdentity*) key,
1370           cp,
1371           &cp->ppd);
1372   return GNUNET_YES;
1373 }
1374
1375
1376 /**
1377  * Iterate over all connected peers.
1378  *
1379  * @param it function to call for each peer
1380  * @param it_cls closure for it
1381  */
1382 void
1383 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1384                               void *it_cls)
1385 {
1386   struct IterationContext ic;
1387
1388   ic.it = it;
1389   ic.it_cls = it_cls;
1390   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1391                                          &call_iterator,
1392                                          &ic);
1393 }
1394
1395
1396 /**
1397  * Obtain the identity of a connected peer.
1398  *
1399  * @param cp peer to reserve bandwidth from
1400  * @param id identity to set (written to)
1401  */
1402 void
1403 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1404                                   struct GNUNET_PeerIdentity *id)
1405 {
1406   GNUNET_PEER_resolve (cp->ppd.pid,
1407                        id);
1408 }
1409
1410
1411 /**
1412  * Assemble a migration stop message for transmission.
1413  *
1414  * @param cls the 'struct GSF_ConnectedPeer' to use
1415  * @param size number of bytes we're allowed to write to buf
1416  * @param buf where to copy the message
1417  * @return number of bytes copied to buf
1418  */
1419 static size_t
1420 create_migration_stop_message (void *cls,
1421                                size_t size,
1422                                void *buf)
1423 {
1424   struct GSF_ConnectedPeer *cp = cls;
1425   struct MigrationStopMessage msm;
1426
1427   cp->migration_pth = NULL;
1428   if (NULL == buf)
1429     return 0;
1430   GNUNET_assert (size > sizeof (struct MigrationStopMessage));
1431   msm.header.size = htons (sizeof (struct MigrationStopMessage));
1432   msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1433   msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block));
1434   memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1435   return sizeof (struct MigrationStopMessage);
1436 }
1437
1438
1439 /**
1440  * Ask a peer to stop migrating data to us until the given point
1441  * in time.
1442  * 
1443  * @param cp peer to ask
1444  * @param block_time until when to block
1445  */
1446 void
1447 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1448                            struct GNUNET_TIME_Relative block_time)
1449 {
1450   if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value)
1451     return; /* already blocked */
1452   cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
1453   if (cp->migration_pth != NULL)
1454     GSF_peer_transmit_cancel_ (cp->migration_pth);
1455   cp->migration_pth 
1456     = GSF_peer_transmit_ (cp,
1457                           GNUNET_SYSERR,
1458                           UINT32_MAX,
1459                           GNUNET_TIME_UNIT_FOREVER_REL,
1460                           sizeof (struct MigrationStopMessage),
1461                           &create_migration_stop_message,
1462                           cp);
1463 }
1464
1465
1466 /**
1467  * Write host-trust information to a file - flush the buffer entry!
1468  *
1469  * @param cls closure, not used
1470  * @param key host identity
1471  * @param value the 'struct GSF_ConnectedPeer' to flush
1472  * @return GNUNET_OK to continue iteration
1473  */
1474 static int
1475 flush_trust (void *cls,
1476              const GNUNET_HashCode *key,
1477              void *value)
1478 {
1479   struct GSF_ConnectedPeer *cp = value;
1480   char *fn;
1481   uint32_t trust;
1482   struct GNUNET_PeerIdentity pid;
1483
1484   if (cp->ppd.trust == cp->disk_trust)
1485     return GNUNET_OK;                     /* unchanged */
1486   GNUNET_PEER_resolve (cp->ppd.pid,
1487                        &pid);
1488   fn = get_trust_filename (&pid);
1489   if (cp->ppd.trust == 0)
1490     {
1491       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1492         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1493                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1494     }
1495   else
1496     {
1497       trust = htonl (cp->ppd.trust);
1498       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1499                                                     sizeof(uint32_t),
1500                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1501                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1502         cp->disk_trust = cp->ppd.trust;
1503     }
1504   GNUNET_free (fn);
1505   return GNUNET_OK;
1506 }
1507
1508
1509 /**
1510  * Notify core about a preference we have for the given peer
1511  * (to allocate more resources towards it).  The change will
1512  * be communicated the next time we reserve bandwidth with
1513  * core (not instantly).
1514  *
1515  * @param cp peer to reserve bandwidth from
1516  * @param pref preference change
1517  */
1518 void
1519 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1520                                        uint64_t pref)
1521 {
1522   cp->inc_preference += pref;
1523 }
1524
1525
1526 /**
1527  * Call this method periodically to flush trust information to disk.
1528  *
1529  * @param cls closure, not used
1530  * @param tc task context, not used
1531  */
1532 static void
1533 cron_flush_trust (void *cls,
1534                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1535 {
1536
1537   if (NULL == cp_map)
1538     return;
1539   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1540                                          &flush_trust,
1541                                          NULL);
1542   if (NULL == tc)
1543     return;
1544   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1545     return;
1546   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, 
1547                                 &cron_flush_trust, 
1548                                 NULL);
1549 }
1550
1551
1552 /**
1553  * Initialize peer management subsystem.
1554  */
1555 void
1556 GSF_connected_peer_init_ ()
1557 {
1558   cp_map = GNUNET_CONTAINER_multihashmap_create (128);
1559   GNUNET_assert (GNUNET_OK ==
1560                  GNUNET_CONFIGURATION_get_value_filename (GSF_cfg,
1561                                                           "fs",
1562                                                           "TRUST",
1563                                                           &trustDirectory));
1564   GNUNET_DISK_directory_create (trustDirectory);
1565   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1566                                       &cron_flush_trust, NULL);
1567 }
1568
1569
1570 /**
1571  * Iterator to free peer entries.
1572  *
1573  * @param cls closure, unused
1574  * @param key current key code
1575  * @param value value in the hash map (peer entry)
1576  * @return GNUNET_YES (we should continue to iterate)
1577  */
1578 static int 
1579 clean_peer (void *cls,
1580             const GNUNET_HashCode * key,
1581             void *value)
1582 {
1583   GSF_peer_disconnect_handler_ (NULL, 
1584                                 (const struct GNUNET_PeerIdentity*) key);
1585   return GNUNET_YES;
1586 }
1587
1588
1589 /**
1590  * Shutdown peer management subsystem.
1591  */
1592 void
1593 GSF_connected_peer_done_ ()
1594 {
1595   cron_flush_trust (NULL, NULL);
1596   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1597                                          &clean_peer,
1598                                          NULL);
1599   GNUNET_CONTAINER_multihashmap_destroy (cp_map);
1600   cp_map = NULL;
1601   GNUNET_free (trustDirectory);
1602   trustDirectory = NULL;
1603 }
1604
1605
1606 /**
1607  * Iterator to remove references to LC entry.
1608  *
1609  * @param cls the 'struct GSF_LocalClient*' to look for
1610  * @param key current key code
1611  * @param value value in the hash map (peer entry)
1612  * @return GNUNET_YES (we should continue to iterate)
1613  */
1614 static int 
1615 clean_local_client (void *cls,
1616                     const GNUNET_HashCode * key,
1617                     void *value)
1618 {
1619   const struct GSF_LocalClient *lc = cls;
1620   struct GSF_ConnectedPeer *cp = value;
1621   unsigned int i;
1622
1623   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1624     if (cp->ppd.last_client_replies[i] == lc)
1625       cp->ppd.last_client_replies[i] = NULL;
1626   return GNUNET_YES;
1627 }
1628
1629
1630 /**
1631  * Notification that a local client disconnected.  Clean up all of our
1632  * references to the given handle.
1633  *
1634  * @param lc handle to the local client (henceforth invalid)
1635  */
1636 void
1637 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1638 {
1639   if (NULL == cp_map)
1640     return; /* already cleaned up */
1641   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
1642                                          &clean_local_client,
1643                                          (void*) lc);
1644 }
1645
1646
1647 /* end of gnunet-service-fs_cp.c */