2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_cp.c
23 * @brief API to handle 'connected peers'
24 * @author Christian Grothoff
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_pe.h"
31 #include "gnunet-service-fs_pr.h"
32 #include "gnunet-service-fs_push.h"
36 * Ratio for moving average delay calculation. The previous
37 * average goes in with a factor of (n-1) into the calculation.
40 #define RUNAVG_DELAY_N 16
43 * How often do we flush respect values to disk?
45 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
48 * After how long do we discard a reply?
50 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
53 * Collect an instane number of statistics? May cause excessive IPC.
55 #define INSANE_STATISTICS GNUNET_NO
59 * Handle to cancel a transmission request.
61 struct GSF_PeerTransmitHandle
65 * Kept in a doubly-linked list.
67 struct GSF_PeerTransmitHandle *next;
70 * Kept in a doubly-linked list.
72 struct GSF_PeerTransmitHandle *prev;
75 * Time when this transmission request was issued.
77 struct GNUNET_TIME_Absolute transmission_request_start_time;
80 * Timeout for this request.
82 struct GNUNET_TIME_Absolute timeout;
85 * Task called on timeout, or 0 for none.
87 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
90 * Function to call to get the actual message.
92 GSF_GetMessageCallback gmc;
95 * Peer this request targets.
97 struct GSF_ConnectedPeer *cp;
105 * Size of the message to be transmitted.
110 * GNUNET_YES if this is a query, GNUNET_NO for content.
115 * Did we get a reservation already?
120 * Priority of this request.
128 * Handle for an entry in our delay list.
130 struct GSF_DelayedHandle
134 * Kept in a doubly-linked list.
136 struct GSF_DelayedHandle *next;
139 * Kept in a doubly-linked list.
141 struct GSF_DelayedHandle *prev;
144 * Peer this transmission belongs to.
146 struct GSF_ConnectedPeer *cp;
149 * The PUT that was delayed.
151 struct PutMessage *pm;
154 * Task for the delay.
156 GNUNET_SCHEDULER_TaskIdentifier delay_task;
159 * Size of the message.
167 * Information per peer and request.
173 * Handle to generic request.
175 struct GSF_PendingRequest *pr;
178 * Handle to specific peer.
180 struct GSF_ConnectedPeer *cp;
183 * Task for asynchronous stopping of this request.
185 GNUNET_SCHEDULER_TaskIdentifier kill_task;
193 struct GSF_ConnectedPeer
197 * Performance data for this peer.
199 struct GSF_PeerPerformanceData ppd;
202 * Time until when we blocked this peer from migrating
205 struct GNUNET_TIME_Absolute last_migration_block;
208 * Task scheduled to revive migration to this peer.
210 GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
213 * Messages (replies, queries, content migration) we would like to
214 * send to this peer in the near future. Sorted by priority, head.
216 struct GSF_PeerTransmitHandle *pth_head;
219 * Messages (replies, queries, content migration) we would like to
220 * send to this peer in the near future. Sorted by priority, tail.
222 struct GSF_PeerTransmitHandle *pth_tail;
225 * Messages (replies, queries, content migration) we would like to
226 * send to this peer in the near future. Sorted by priority, head.
228 struct GSF_DelayedHandle *delayed_head;
231 * Messages (replies, queries, content migration) we would like to
232 * send to this peer in the near future. Sorted by priority, tail.
234 struct GSF_DelayedHandle *delayed_tail;
237 * Migration stop message in our queue, or NULL if we have none pending.
239 struct GSF_PeerTransmitHandle *migration_pth;
242 * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
244 struct GNUNET_ATS_ReservationContext *rc;
247 * Task scheduled if we need to retry bandwidth reservation later.
249 GNUNET_SCHEDULER_TaskIdentifier rc_delay_task;
252 * Active requests from this neighbour, map of query to 'struct PeerRequest'.
254 struct GNUNET_CONTAINER_MultiHashMap *request_map;
257 * Handle for an active request for transmission to this
258 * peer, or NULL (if core queue was full).
260 struct GNUNET_CORE_TransmitHandle *cth;
263 * Increase in traffic preference still to be submitted
264 * to the core service for this peer.
266 uint64_t inc_preference;
269 * Set to 1 if we're currently in the process of calling
270 * 'GNUNET_CORE_notify_transmit_ready' (so while cth is
271 * NULL, we should not call notify_transmit_ready for this
274 unsigned int cth_in_progress;
277 * Respect rating for this peer on disk.
279 uint32_t disk_respect;
282 * Which offset in "last_p2p_replies" will be updated next?
283 * (we go round-robin).
285 unsigned int last_p2p_replies_woff;
288 * Which offset in "last_client_replies" will be updated next?
289 * (we go round-robin).
291 unsigned int last_client_replies_woff;
294 * Current offset into 'last_request_times' ring buffer.
296 unsigned int last_request_times_off;
299 * GNUNET_YES if we did successfully reserve 32k bandwidth,
308 * Map from peer identities to 'struct GSF_ConnectPeer' entries.
310 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
313 * Where do we store respect information?
315 static char *respectDirectory;
319 * Get the filename under which we would store respect
320 * for the given peer.
322 * @param id peer to get the filename for
323 * @return filename of the form DIRECTORY/PEERID
326 get_respect_filename (const struct GNUNET_PeerIdentity *id)
328 struct GNUNET_CRYPTO_HashAsciiEncoded fil;
331 GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
332 GNUNET_asprintf (&fn, "%s%s%s", respectDirectory, DIR_SEPARATOR_STR, &fil);
338 * Update the latency information kept for the given peer.
340 * @param id peer record to update
341 * @param latency current latency value
344 GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
345 struct GNUNET_TIME_Relative latency)
347 struct GSF_ConnectedPeer *cp;
349 cp = GSF_peer_get_ (id);
350 GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay, latency);
351 /* LATER: merge atsi into cp's performance data (if we ever care...) */
356 * Return the performance data record for the given peer
358 * @param cp peer to query
359 * @return performance data record for the peer
361 struct GSF_PeerPerformanceData *
362 GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
369 * Core is ready to transmit to a peer, get the message.
371 * @param cls the 'struct GSF_PeerTransmitHandle' of the message
372 * @param size number of bytes core is willing to take
373 * @param buf where to copy the message
374 * @return number of bytes copied to buf
377 peer_transmit_ready_cb (void *cls, size_t size, void *buf);
381 * Function called by core upon success or failure of our bandwidth reservation request.
383 * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
384 * @param peer identifies the peer
385 * @param amount set to the amount that was actually reserved or unreserved;
386 * either the full requested amount or zero (no partial reservations)
387 * @param res_delay if the reservation could not be satisfied (amount was 0), how
388 * long should the client wait until re-trying?
391 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
392 int32_t amount, struct GNUNET_TIME_Relative res_delay);
396 * If ready (bandwidth reserved), try to schedule transmission via
397 * core for the given handle.
399 * @param pth transmission handle to schedule
402 schedule_transmission (struct GSF_PeerTransmitHandle *pth)
404 struct GSF_ConnectedPeer *cp;
405 struct GNUNET_PeerIdentity target;
408 if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
409 return; /* already done */
410 GNUNET_assert (0 != cp->ppd.pid);
411 GNUNET_PEER_resolve (cp->ppd.pid, &target);
413 if (0 != cp->inc_preference)
415 GNUNET_ATS_change_preference (GSF_ats, &target, GNUNET_ATS_PREFERENCE_BANDWIDTH,
416 (double) cp->inc_preference,
417 GNUNET_ATS_PREFERENCE_END);
418 cp->inc_preference = 0;
421 if ((GNUNET_YES == pth->is_query) && (GNUNET_YES != pth->was_reserved))
423 /* query, need reservation */
424 if (GNUNET_YES != cp->did_reserve)
425 return; /* not ready */
426 cp->did_reserve = GNUNET_NO;
427 /* reservation already done! */
428 pth->was_reserved = GNUNET_YES;
430 GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
431 &ats_reserve_callback, cp);
434 GNUNET_assert (NULL == cp->cth);
435 cp->cth_in_progress++;
437 GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, pth->priority,
438 GNUNET_TIME_absolute_get_remaining
439 (pth->timeout), &target, pth->size,
440 &peer_transmit_ready_cb, cp);
441 GNUNET_assert (NULL != cp->cth);
442 GNUNET_assert (0 < cp->cth_in_progress--);
447 * Core is ready to transmit to a peer, get the message.
449 * @param cls the 'struct GSF_PeerTransmitHandle' of the message
450 * @param size number of bytes core is willing to take
451 * @param buf where to copy the message
452 * @return number of bytes copied to buf
455 peer_transmit_ready_cb (void *cls, size_t size, void *buf)
457 struct GSF_ConnectedPeer *cp = cls;
458 struct GSF_PeerTransmitHandle *pth = cp->pth_head;
459 struct GSF_PeerTransmitHandle *pos;
465 if (pth->size > size)
467 schedule_transmission (pth);
470 if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
472 GNUNET_SCHEDULER_cancel (pth->timeout_task);
473 pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
475 GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
476 if (GNUNET_YES == pth->is_query)
478 cp->ppd.last_request_times[(cp->last_request_times_off++) %
479 MAX_QUEUE_PER_PEER] =
480 GNUNET_TIME_absolute_get ();
481 GNUNET_assert (0 < cp->ppd.pending_queries--);
483 else if (GNUNET_NO == pth->is_query)
485 GNUNET_assert (0 < cp->ppd.pending_replies--);
487 GNUNET_LOAD_update (cp->ppd.transmission_delay,
488 GNUNET_TIME_absolute_get_duration
489 (pth->transmission_request_start_time).rel_value);
490 ret = pth->gmc (pth->gmc_cls, size, buf);
491 if (NULL != (pos = cp->pth_head))
493 GNUNET_assert (pos != pth);
494 schedule_transmission (pos);
502 * (re)try to reserve bandwidth from the given peer.
504 * @param cls the 'struct GSF_ConnectedPeer' to reserve from
505 * @param tc scheduler context
508 retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
510 struct GSF_ConnectedPeer *cp = cls;
511 struct GNUNET_PeerIdentity target;
513 GNUNET_PEER_resolve (cp->ppd.pid, &target);
514 cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
516 GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
517 &ats_reserve_callback, cp);
522 * Function called by core upon success or failure of our bandwidth reservation request.
524 * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
525 * @param peer identifies the peer
526 * @param amount set to the amount that was actually reserved or unreserved;
527 * either the full requested amount or zero (no partial reservations)
528 * @param res_delay if the reservation could not be satisfied (amount was 0), how
529 * long should the client wait until re-trying?
532 ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
533 int32_t amount, struct GNUNET_TIME_Relative res_delay)
535 struct GSF_ConnectedPeer *cp = cls;
536 struct GSF_PeerTransmitHandle *pth;
538 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
539 "Reserved %d bytes / need to wait %s for reservation\n",
541 GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
546 GNUNET_SCHEDULER_add_delayed (res_delay, &retry_reservation, cp);
549 cp->did_reserve = GNUNET_YES;
551 if ((NULL != pth) && (NULL == cp->cth) && (0 == cp->cth_in_progress))
553 /* reservation success, try transmission now! */
554 cp->cth_in_progress++;
556 GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, pth->priority,
557 GNUNET_TIME_absolute_get_remaining
558 (pth->timeout), peer, pth->size,
559 &peer_transmit_ready_cb, cp);
560 GNUNET_assert (NULL != cp->cth);
561 GNUNET_assert (0 < cp->cth_in_progress--);
567 * A peer connected to us. Setup the connected peer
570 * @param peer identity of peer that connected
571 * @return handle to connected peer entry
573 struct GSF_ConnectedPeer *
574 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer)
576 struct GSF_ConnectedPeer *cp;
580 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to peer %s\n",
582 cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
583 cp->ppd.pid = GNUNET_PEER_intern (peer);
584 cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
586 GNUNET_ATS_reserve_bandwidth (GSF_ats, peer, DBLOCK_SIZE,
587 &ats_reserve_callback, cp);
588 fn = get_respect_filename (peer);
589 if ((GNUNET_YES == GNUNET_DISK_file_test (fn)) &&
590 (sizeof (respect) == GNUNET_DISK_fn_read (fn, &respect, sizeof (respect))))
591 cp->disk_respect = cp->ppd.respect = ntohl (respect);
593 cp->request_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
594 GNUNET_break (GNUNET_OK ==
595 GNUNET_CONTAINER_multihashmap_put (cp_map,
596 &GSF_connected_peer_get_identity2_ (cp)->hashPubKey,
598 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
599 GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
600 GNUNET_CONTAINER_multihashmap_size (cp_map),
602 GSF_push_start_ (cp);
608 * It may be time to re-start migrating content to this
609 * peer. Check, and if so, restart migration.
611 * @param cls the 'struct GSF_ConnectedPeer'
612 * @param tc scheduler context
615 revive_migration (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
617 struct GSF_ConnectedPeer *cp = cls;
618 struct GNUNET_TIME_Relative bt;
620 cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
621 bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
622 if (0 != bt.rel_value)
624 /* still time left... */
625 cp->mig_revive_task =
626 GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
629 GSF_push_start_ (cp);
634 * Get a handle for a connected peer.
636 * @param peer peer's identity
637 * @return NULL if the peer is not currently connected
639 struct GSF_ConnectedPeer *
640 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
644 return GNUNET_CONTAINER_multihashmap_get (cp_map, &peer->hashPubKey);
649 * Handle P2P "MIGRATION_STOP" message.
651 * @param cls closure, always NULL
652 * @param other the other peer involved (sender or receiver, NULL
653 * for loopback messages where we are both sender and receiver)
654 * @param message the actual message
655 * @return GNUNET_OK to keep the connection open,
656 * GNUNET_SYSERR to close it (signal serious error)
659 GSF_handle_p2p_migration_stop_ (void *cls,
660 const struct GNUNET_PeerIdentity *other,
661 const struct GNUNET_MessageHeader *message)
663 struct GSF_ConnectedPeer *cp;
664 const struct MigrationStopMessage *msm;
665 struct GNUNET_TIME_Relative bt;
667 msm = (const struct MigrationStopMessage *) message;
668 cp = GSF_peer_get_ (other);
674 GNUNET_STATISTICS_update (GSF_stats,
675 gettext_noop ("# migration stop messages received"),
677 bt = GNUNET_TIME_relative_ntoh (msm->duration);
678 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
679 _("Migration of content to peer `%s' blocked for %s\n"),
681 GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
682 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
683 if (GNUNET_SCHEDULER_NO_TASK == cp->mig_revive_task)
686 cp->mig_revive_task =
687 GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
694 * Copy reply and free put message.
696 * @param cls the 'struct PutMessage'
697 * @param buf_size number of bytes available in buf
698 * @param buf where to copy the message, NULL on error (peer disconnect)
699 * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
702 copy_reply (void *cls, size_t buf_size, void *buf)
704 struct PutMessage *pm = cls;
709 GNUNET_assert (buf_size >= ntohs (pm->header.size));
710 size = ntohs (pm->header.size);
711 memcpy (buf, pm, size);
712 GNUNET_STATISTICS_update (GSF_stats,
714 ("# replies transmitted to other peers"), 1,
720 GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies dropped"), 1,
729 * Free resources associated with the given peer request.
731 * @param peerreq request to free
732 * @param query associated key for the request
735 free_pending_request (struct PeerRequest *peerreq,
736 const struct GNUNET_HashCode *query)
738 struct GSF_ConnectedPeer *cp = peerreq->cp;
740 if (GNUNET_SCHEDULER_NO_TASK != peerreq->kill_task)
742 GNUNET_SCHEDULER_cancel (peerreq->kill_task);
743 peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
745 GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
747 GNUNET_break (GNUNET_YES ==
748 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
750 GNUNET_free (peerreq);
755 * Cancel all requests associated with the peer.
758 * @param query hash code of the request
759 * @param value the 'struct GSF_PendingRequest'
760 * @return GNUNET_YES (continue to iterate)
763 cancel_pending_request (void *cls, const struct GNUNET_HashCode * query, void *value)
765 struct PeerRequest *peerreq = value;
766 struct GSF_PendingRequest *pr = peerreq->pr;
767 struct GSF_PendingRequestData *prd;
769 prd = GSF_pending_request_get_data_ (pr);
770 GSF_pending_request_cancel_ (pr, GNUNET_NO);
771 free_pending_request (peerreq, &prd->query);
777 * Free the given request.
779 * @param cls the request to free
780 * @param tc task context
783 peer_request_destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
785 struct PeerRequest *peerreq = cls;
786 struct GSF_PendingRequest *pr = peerreq->pr;
787 struct GSF_PendingRequestData *prd;
789 peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
790 prd = GSF_pending_request_get_data_ (pr);
791 cancel_pending_request (NULL, &prd->query, peerreq);
796 * The artificial delay is over, transmit the message now.
798 * @param cls the 'struct GSF_DelayedHandle' with the message
799 * @param tc scheduler context
802 transmit_delayed_now (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
804 struct GSF_DelayedHandle *dh = cls;
805 struct GSF_ConnectedPeer *cp = dh->cp;
807 GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
808 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
810 GNUNET_free (dh->pm);
814 (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
815 dh->msize, ©_reply, dh->pm);
821 * Get the randomized delay a response should be subjected to.
823 * @return desired delay
825 static struct GNUNET_TIME_Relative
826 get_randomized_delay ()
828 struct GNUNET_TIME_Relative ret;
831 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
832 GNUNET_CRYPTO_random_u32
833 (GNUNET_CRYPTO_QUALITY_WEAK,
834 2 * GSF_avg_latency.rel_value + 1));
835 #if INSANE_STATISTICS
836 GNUNET_STATISTICS_update (GSF_stats,
838 ("# artificial delays introduced (ms)"),
839 ret.rel_value, GNUNET_NO);
846 * Handle a reply to a pending request. Also called if a request
847 * expires (then with data == NULL). The handler may be called
848 * many times (depending on the request type), but will not be
849 * called during or after a call to GSF_pending_request_cancel
850 * and will also not be called anymore after a call signalling
853 * @param cls 'struct PeerRequest' this is an answer for
854 * @param eval evaluation of the result
855 * @param pr handle to the original pending request
856 * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
857 * @param expiration when does 'data' expire?
858 * @param last_transmission when did we last transmit a request for this block
859 * @param type type of the block
860 * @param data response data, NULL on request expiration
861 * @param data_len number of bytes in data
864 handle_p2p_reply (void *cls, enum GNUNET_BLOCK_EvaluationResult eval,
865 struct GSF_PendingRequest *pr, uint32_t reply_anonymity_level,
866 struct GNUNET_TIME_Absolute expiration,
867 struct GNUNET_TIME_Absolute last_transmission,
868 enum GNUNET_BLOCK_Type type, const void *data,
871 struct PeerRequest *peerreq = cls;
872 struct GSF_ConnectedPeer *cp = peerreq->cp;
873 struct GSF_PendingRequestData *prd;
874 struct PutMessage *pm;
877 GNUNET_assert (data_len + sizeof (struct PutMessage) <
878 GNUNET_SERVER_MAX_MESSAGE_SIZE);
879 GNUNET_assert (peerreq->pr == pr);
880 prd = GSF_pending_request_get_data_ (pr);
883 free_pending_request (peerreq, &prd->query);
886 GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
887 if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
889 GNUNET_STATISTICS_update (GSF_stats,
891 ("# replies dropped due to type mismatch"),
895 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
896 "Transmitting result for query `%s' to peer\n",
897 GNUNET_h2s (&prd->query));
898 GNUNET_STATISTICS_update (GSF_stats,
899 gettext_noop ("# replies received for other peers"),
901 msize = sizeof (struct PutMessage) + data_len;
902 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
907 if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
909 if (reply_anonymity_level - 1 > GSF_cover_content_count)
911 GNUNET_STATISTICS_update (GSF_stats,
913 ("# replies dropped due to insufficient cover traffic"),
917 GSF_cover_content_count -= (reply_anonymity_level - 1);
920 pm = GNUNET_malloc (msize);
921 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
922 pm->header.size = htons (msize);
923 pm->type = htonl (type);
924 pm->expiration = GNUNET_TIME_absolute_hton (expiration);
925 memcpy (&pm[1], data, data_len);
926 if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) &&
927 (GNUNET_YES == GSF_enable_randomized_delays))
929 struct GSF_DelayedHandle *dh;
931 dh = GNUNET_malloc (sizeof (struct GSF_DelayedHandle));
935 GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
937 GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
938 &transmit_delayed_now, dh);
942 (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
945 if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
947 if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task)
949 GNUNET_STATISTICS_update (GSF_stats,
951 ("# P2P searches destroyed due to ultimate reply"),
954 GNUNET_SCHEDULER_add_now (&peer_request_destroy, peerreq);
960 * Increase the peer's respect by a value.
962 * @param cp which peer to change the respect value on
963 * @param value is the int value by which the
964 * peer's credit is to be increased or decreased
965 * @returns the actual change in respect (positive or negative)
968 change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
972 GNUNET_assert (NULL != cp);
975 if (cp->ppd.respect + value < cp->ppd.respect)
977 value = UINT32_MAX - cp->ppd.respect;
978 cp->ppd.respect = UINT32_MAX;
981 cp->ppd.respect += value;
985 if (cp->ppd.respect < -value)
987 value = -cp->ppd.respect;
991 cp->ppd.respect += value;
998 * We've received a request with the specified priority. Bound it
999 * according to how much we respect the given peer.
1001 * @param prio_in requested priority
1002 * @param cp the peer making the request
1003 * @return effective priority
1006 bound_priority (uint32_t prio_in, struct GSF_ConnectedPeer *cp)
1008 #define N ((double)128.0)
1013 ld = GSF_test_get_load_too_high_ (0);
1014 if (GNUNET_SYSERR == ld)
1016 #if INSANE_STATISTICS
1017 GNUNET_STATISTICS_update (GSF_stats,
1019 ("# requests done for free (low load)"), 1,
1022 return 0; /* excess resources */
1024 if (prio_in > INT32_MAX)
1025 prio_in = INT32_MAX;
1026 ret = -change_peer_respect (cp, -(int) prio_in);
1029 if (ret > GSF_current_priorities + N)
1030 rret = GSF_current_priorities + N;
1033 GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1035 if ((GNUNET_YES == ld) && (ret > 0))
1037 /* try with charging */
1038 ld = GSF_test_get_load_too_high_ (ret);
1040 if (GNUNET_YES == ld)
1042 GNUNET_STATISTICS_update (GSF_stats,
1044 ("# request dropped, priority insufficient"), 1,
1047 change_peer_respect (cp, (int) ret);
1048 return -1; /* not enough resources */
1052 GNUNET_STATISTICS_update (GSF_stats,
1054 ("# requests done for a price (normal load)"), 1,
1063 * The priority level imposes a bound on the maximum
1064 * value for the ttl that can be requested.
1066 * @param ttl_in requested ttl
1067 * @param prio given priority
1068 * @return ttl_in if ttl_in is below the limit,
1069 * otherwise the ttl-limit for the given priority
1072 bound_ttl (int32_t ttl_in, uint32_t prio)
1074 unsigned long long allowed;
1078 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1079 if (ttl_in > allowed)
1081 if (allowed >= (1 << 30))
1090 * Handle P2P "QUERY" message. Creates the pending request entry
1091 * and sets up all of the data structures to that we will
1092 * process replies properly. Does not initiate forwarding or
1093 * local database lookups.
1095 * @param other the other peer involved (sender or receiver, NULL
1096 * for loopback messages where we are both sender and receiver)
1097 * @param message the actual message
1098 * @return pending request handle, NULL on error
1100 struct GSF_PendingRequest *
1101 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1102 const struct GNUNET_MessageHeader *message)
1104 struct PeerRequest *peerreq;
1105 struct GSF_PendingRequest *pr;
1106 struct GSF_PendingRequestData *prd;
1107 struct GSF_ConnectedPeer *cp;
1108 struct GSF_ConnectedPeer *cps;
1109 const struct GNUNET_PeerIdentity *target;
1110 enum GSF_PendingRequestOptions options;
1112 const struct GetMessage *gm;
1114 const struct GNUNET_HashCode *opt;
1117 uint32_t ttl_decrement;
1120 enum GNUNET_BLOCK_Type type;
1121 GNUNET_PEER_Id spid;
1123 GNUNET_assert (other != NULL);
1124 msize = ntohs (message->size);
1125 if (msize < sizeof (struct GetMessage))
1127 GNUNET_break_op (0);
1130 GNUNET_STATISTICS_update (GSF_stats,
1132 ("# GET requests received (from other peers)"), 1,
1134 gm = (const struct GetMessage *) message;
1135 type = ntohl (gm->type);
1136 bm = ntohl (gm->hash_bitmap);
1144 if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_HashCode))
1146 GNUNET_break_op (0);
1149 opt = (const struct GNUNET_HashCode *) &gm[1];
1150 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_HashCode);
1151 /* bfsize must be power of 2, check! */
1152 if (0 != ((bfsize - 1) & bfsize))
1154 GNUNET_break_op (0);
1157 GSF_cover_query_count++;
1158 bm = ntohl (gm->hash_bitmap);
1160 cps = GSF_peer_get_ (other);
1163 /* peer must have just disconnected */
1164 GNUNET_STATISTICS_update (GSF_stats,
1166 ("# requests dropped due to initiator not being connected"),
1170 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1171 cp = GSF_peer_get_ ((const struct GNUNET_PeerIdentity *) &opt[bits++]);
1176 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1177 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1178 "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
1179 GNUNET_i2s ((const struct GNUNET_PeerIdentity *)
1183 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1184 "Failed to find peer `%4s' in connection set. Dropping query.\n",
1185 GNUNET_i2s (other));
1186 #if INSANE_STATISTICS
1187 GNUNET_STATISTICS_update (GSF_stats,
1189 ("# requests dropped due to missing reverse route"),
1194 /* note that we can really only check load here since otherwise
1195 * peers could find out that we are overloaded by not being
1196 * disconnected after sending us a malformed query... */
1197 priority = bound_priority (ntohl (gm->priority), cps);
1200 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1201 "Dropping query from `%s', this peer is too busy.\n",
1202 GNUNET_i2s (other));
1205 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1206 "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
1207 GNUNET_h2s (&gm->query), (unsigned int) type, GNUNET_i2s (other),
1211 (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity
1212 *) &opt[bits++]) : NULL;
1213 options = GSF_PRO_DEFAULTS;
1215 if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
1216 || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1217 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 +
1218 GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1220 /* don't have BW to send to peer, or would likely take longer than we have for it,
1221 * so at best indirect the query */
1223 options |= GSF_PRO_FORWARD_ONLY;
1224 spid = GNUNET_PEER_intern (other);
1225 GNUNET_assert (0 != spid);
1227 ttl = bound_ttl (ntohl (gm->ttl), priority);
1228 /* decrement ttl (always) */
1230 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1232 if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
1234 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1235 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1236 GNUNET_i2s (other), ttl, ttl_decrement);
1237 GNUNET_STATISTICS_update (GSF_stats,
1239 ("# requests dropped due TTL underflow"), 1,
1241 /* integer underflow => drop (should be very rare)! */
1244 ttl -= ttl_decrement;
1246 /* test if the request already exists */
1247 peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map, &gm->query);
1248 if (peerreq != NULL)
1251 prd = GSF_pending_request_get_data_ (pr);
1252 if (prd->type == type)
1254 if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get ().abs_value + ttl)
1256 /* existing request has higher TTL, drop new one! */
1257 prd->priority += priority;
1258 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259 "Have existing request with higher TTL, dropping new request.\n",
1260 GNUNET_i2s (other));
1261 GNUNET_STATISTICS_update (GSF_stats,
1263 ("# requests dropped due to higher-TTL request"),
1267 /* existing request has lower TTL, drop old one! */
1268 priority += prd->priority;
1269 GSF_pending_request_cancel_ (pr, GNUNET_YES);
1270 free_pending_request (peerreq, &gm->query);
1274 peerreq = GNUNET_malloc (sizeof (struct PeerRequest));
1276 pr = GSF_pending_request_create_ (options, type, &gm->query,
1279 0) ? (const char *) &opt[bits] : NULL,
1280 bfsize, ntohl (gm->filter_mutator),
1282 (uint32_t) priority, ttl, spid, GNUNET_PEER_intern (other), NULL, 0, /* replies_seen */
1283 &handle_p2p_reply, peerreq);
1284 GNUNET_assert (NULL != pr);
1286 GNUNET_break (GNUNET_OK ==
1287 GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query,
1289 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1290 GNUNET_STATISTICS_update (GSF_stats,
1292 ("# P2P query messages received and processed"), 1,
1294 GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
1301 * Function called if there has been a timeout trying to satisfy
1302 * a transmission request.
1304 * @param cls the 'struct GSF_PeerTransmitHandle' of the request
1305 * @param tc scheduler context
1308 peer_transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1310 struct GSF_PeerTransmitHandle *pth = cls;
1311 struct GSF_ConnectedPeer *cp;
1313 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1314 "Timeout trying to transmit to other peer\n");
1315 pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1317 GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1318 if (GNUNET_YES == pth->is_query)
1319 GNUNET_assert (0 < cp->ppd.pending_queries--);
1320 else if (GNUNET_NO == pth->is_query)
1321 GNUNET_assert (0 < cp->ppd.pending_replies--);
1322 GNUNET_LOAD_update (cp->ppd.transmission_delay, UINT64_MAX);
1323 if (NULL != cp->cth)
1325 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1328 pth->gmc (pth->gmc_cls, 0, NULL);
1329 GNUNET_assert (0 == cp->cth_in_progress);
1335 * Transmit a message to the given peer as soon as possible.
1336 * If the peer disconnects before the transmission can happen,
1337 * the callback is invoked with a 'NULL' buffer.
1339 * @param cp target peer
1340 * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
1341 * @param priority how important is this request?
1342 * @param timeout when does this request timeout (call gmc with error)
1343 * @param size number of bytes we would like to send to the peer
1344 * @param gmc function to call to get the message
1345 * @param gmc_cls closure for gmc
1346 * @return handle to cancel request
1348 struct GSF_PeerTransmitHandle *
1349 GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, int is_query,
1350 uint32_t priority, struct GNUNET_TIME_Relative timeout,
1351 size_t size, GSF_GetMessageCallback gmc, void *gmc_cls)
1353 struct GSF_PeerTransmitHandle *pth;
1354 struct GSF_PeerTransmitHandle *pos;
1355 struct GSF_PeerTransmitHandle *prev;
1357 pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
1358 pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1359 pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1361 pth->gmc_cls = gmc_cls;
1363 pth->is_query = is_query;
1364 pth->priority = priority;
1366 /* insertion sort (by priority, descending) */
1369 while ((NULL != pos) && (pos->priority > priority))
1374 GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
1375 if (GNUNET_YES == is_query)
1376 cp->ppd.pending_queries++;
1377 else if (GNUNET_NO == is_query)
1378 cp->ppd.pending_replies++;
1380 GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
1381 schedule_transmission (pth);
1387 * Cancel an earlier request for transmission.
1389 * @param pth request to cancel
1392 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1394 struct GSF_ConnectedPeer *cp;
1396 if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
1398 GNUNET_SCHEDULER_cancel (pth->timeout_task);
1399 pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1402 GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1403 if (GNUNET_YES == pth->is_query)
1404 GNUNET_assert (0 < cp->ppd.pending_queries--);
1405 else if (GNUNET_NO == pth->is_query)
1406 GNUNET_assert (0 < cp->ppd.pending_replies--);
1412 * Report on receiving a reply; update the performance record of the given peer.
1414 * @param cp responding peer (will be updated)
1415 * @param request_time time at which the original query was transmitted
1416 * @param request_priority priority of the original request
1419 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1420 struct GNUNET_TIME_Absolute request_time,
1421 uint32_t request_priority)
1423 struct GNUNET_TIME_Relative delay;
1425 delay = GNUNET_TIME_absolute_get_duration (request_time);
1426 cp->ppd.avg_reply_delay.rel_value =
1427 (cp->ppd.avg_reply_delay.rel_value * (RUNAVG_DELAY_N - 1) +
1428 delay.rel_value) / RUNAVG_DELAY_N;
1429 cp->ppd.avg_priority =
1430 (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1431 request_priority) / RUNAVG_DELAY_N;
1436 * Report on receiving a reply in response to an initiating client.
1437 * Remember that this peer is good for this client.
1439 * @param cp responding peer (will be updated)
1440 * @param initiator_client local client on responsible for query
1443 GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1444 struct GSF_LocalClient *initiator_client)
1446 cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1447 CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1452 * Report on receiving a reply in response to an initiating peer.
1453 * Remember that this peer is good for this initiating peer.
1455 * @param cp responding peer (will be updated)
1456 * @param initiator_peer other peer responsible for query
1459 GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1460 const struct GSF_ConnectedPeer *initiator_peer)
1464 woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1465 GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1466 cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1467 GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1468 cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1473 * A peer disconnected from us. Tear down the connected peer
1477 * @param peer identity of peer that connected
1480 GSF_peer_disconnect_handler_ (void *cls, const struct GNUNET_PeerIdentity *peer)
1482 struct GSF_ConnectedPeer *cp;
1483 struct GSF_PeerTransmitHandle *pth;
1484 struct GSF_DelayedHandle *dh;
1486 cp = GSF_peer_get_ (peer);
1488 return; /* must have been disconnect from core with
1489 * 'peer' == my_id, ignore */
1490 GNUNET_assert (GNUNET_YES ==
1491 GNUNET_CONTAINER_multihashmap_remove (cp_map,
1492 &peer->hashPubKey, cp));
1493 GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
1494 GNUNET_CONTAINER_multihashmap_size (cp_map),
1496 if (NULL != cp->migration_pth)
1498 GSF_peer_transmit_cancel_ (cp->migration_pth);
1499 cp->migration_pth = NULL;
1503 GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1506 if (GNUNET_SCHEDULER_NO_TASK != cp->rc_delay_task)
1508 GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1509 cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
1511 GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1512 &cancel_pending_request, cp);
1513 GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1514 cp->request_map = NULL;
1515 GSF_plan_notify_peer_disconnect_ (cp);
1516 GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1517 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1518 memset (cp->ppd.last_p2p_replies, 0, sizeof (cp->ppd.last_p2p_replies));
1519 GSF_push_stop_ (cp);
1520 if (NULL != cp->cth)
1522 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1525 GNUNET_assert (0 == cp->cth_in_progress);
1526 while (NULL != (pth = cp->pth_head))
1528 if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1530 GNUNET_SCHEDULER_cancel (pth->timeout_task);
1531 pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1533 GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
1534 pth->gmc (pth->gmc_cls, 0, NULL);
1537 while (NULL != (dh = cp->delayed_head))
1539 GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
1540 GNUNET_SCHEDULER_cancel (dh->delay_task);
1541 GNUNET_free (dh->pm);
1544 GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1545 if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1547 GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1548 cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
1555 * Closure for 'call_iterator'.
1557 struct IterationContext
1560 * Function to call on each entry.
1562 GSF_ConnectedPeerIterator it;
1572 * Function that calls the callback for each peer.
1574 * @param cls the 'struct IterationContext*'
1575 * @param key identity of the peer
1576 * @param value the 'struct GSF_ConnectedPeer*'
1577 * @return GNUNET_YES to continue iteration
1580 call_iterator (void *cls, const struct GNUNET_HashCode * key, void *value)
1582 struct IterationContext *ic = cls;
1583 struct GSF_ConnectedPeer *cp = value;
1585 ic->it (ic->it_cls, (const struct GNUNET_PeerIdentity *) key, cp, &cp->ppd);
1591 * Iterate over all connected peers.
1593 * @param it function to call for each peer
1594 * @param it_cls closure for it
1597 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it, void *it_cls)
1599 struct IterationContext ic;
1603 GNUNET_CONTAINER_multihashmap_iterate (cp_map, &call_iterator, &ic);
1608 * Obtain the identity of a connected peer.
1610 * @param cp peer to get identity of
1611 * @param id identity to set (written to)
1614 GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1615 struct GNUNET_PeerIdentity *id)
1617 GNUNET_assert (0 != cp->ppd.pid);
1618 GNUNET_PEER_resolve (cp->ppd.pid, id);
1623 * Obtain the identity of a connected peer.
1625 * @param cp peer to get identity of
1626 * @return reference to peer identity, valid until peer disconnects (!)
1628 const struct GNUNET_PeerIdentity *
1629 GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1631 GNUNET_assert (0 != cp->ppd.pid);
1632 return GNUNET_PEER_resolve2 (cp->ppd.pid);
1637 * Assemble a migration stop message for transmission.
1639 * @param cls the 'struct GSF_ConnectedPeer' to use
1640 * @param size number of bytes we're allowed to write to buf
1641 * @param buf where to copy the message
1642 * @return number of bytes copied to buf
1645 create_migration_stop_message (void *cls, size_t size, void *buf)
1647 struct GSF_ConnectedPeer *cp = cls;
1648 struct MigrationStopMessage msm;
1650 cp->migration_pth = NULL;
1653 GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1654 msm.header.size = htons (sizeof (struct MigrationStopMessage));
1655 msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1656 msm.reserved = htonl (0);
1658 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1659 (cp->last_migration_block));
1660 memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1661 GNUNET_STATISTICS_update (GSF_stats,
1662 gettext_noop ("# migration stop messages sent"),
1664 return sizeof (struct MigrationStopMessage);
1669 * Ask a peer to stop migrating data to us until the given point
1672 * @param cp peer to ask
1673 * @param block_time until when to block
1676 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1677 struct GNUNET_TIME_Absolute block_time)
1679 if (cp->last_migration_block.abs_value > block_time.abs_value)
1681 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1682 "Migration already blocked for another %s\n",
1683 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
1684 (cp->last_migration_block), GNUNET_YES));
1685 return; /* already blocked */
1687 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %llu ms\n",
1688 (unsigned long long) GNUNET_TIME_absolute_get_remaining (block_time).rel_value);
1689 cp->last_migration_block = block_time;
1690 if (NULL != cp->migration_pth)
1691 GSF_peer_transmit_cancel_ (cp->migration_pth);
1693 GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
1694 GNUNET_TIME_UNIT_FOREVER_REL,
1695 sizeof (struct MigrationStopMessage),
1696 &create_migration_stop_message, cp);
1701 * Write peer-respect information to a file - flush the buffer entry!
1704 * @param key peer identity
1705 * @param value the 'struct GSF_ConnectedPeer' to flush
1706 * @return GNUNET_OK to continue iteration
1709 flush_respect (void *cls, const struct GNUNET_HashCode * key, void *value)
1711 struct GSF_ConnectedPeer *cp = value;
1714 struct GNUNET_PeerIdentity pid;
1716 if (cp->ppd.respect == cp->disk_respect)
1717 return GNUNET_OK; /* unchanged */
1718 GNUNET_assert (0 != cp->ppd.pid);
1719 GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1720 fn = get_respect_filename (&pid);
1721 if (cp->ppd.respect == 0)
1723 if ((0 != UNLINK (fn)) && (errno != ENOENT))
1724 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1725 GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1729 respect = htonl (cp->ppd.respect);
1730 if (sizeof (uint32_t) ==
1731 GNUNET_DISK_fn_write (fn, &respect, sizeof (uint32_t),
1732 GNUNET_DISK_PERM_USER_READ |
1733 GNUNET_DISK_PERM_USER_WRITE |
1734 GNUNET_DISK_PERM_GROUP_READ |
1735 GNUNET_DISK_PERM_OTHER_READ))
1736 cp->disk_respect = cp->ppd.respect;
1744 * Notify core about a preference we have for the given peer
1745 * (to allocate more resources towards it). The change will
1746 * be communicated the next time we reserve bandwidth with
1747 * core (not instantly).
1749 * @param cp peer to reserve bandwidth from
1750 * @param pref preference change
1753 GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1756 cp->inc_preference += pref;
1761 * Call this method periodically to flush respect information to disk.
1763 * @param cls closure, not used
1764 * @param tc task context, not used
1767 cron_flush_respect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1772 GNUNET_CONTAINER_multihashmap_iterate (cp_map, &flush_respect, NULL);
1775 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1777 GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1778 GNUNET_SCHEDULER_PRIORITY_HIGH,
1779 &cron_flush_respect, NULL);
1784 * Initialize peer management subsystem.
1787 GSF_connected_peer_init_ ()
1789 cp_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_YES);
1790 GNUNET_assert (GNUNET_OK ==
1791 GNUNET_CONFIGURATION_get_value_filename (GSF_cfg, "fs",
1793 &respectDirectory));
1794 GNUNET_DISK_directory_create (respectDirectory);
1795 GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1796 &cron_flush_respect, NULL);
1801 * Iterator to free peer entries.
1803 * @param cls closure, unused
1804 * @param key current key code
1805 * @param value value in the hash map (peer entry)
1806 * @return GNUNET_YES (we should continue to iterate)
1809 clean_peer (void *cls, const struct GNUNET_HashCode * key, void *value)
1811 GSF_peer_disconnect_handler_ (NULL, (const struct GNUNET_PeerIdentity *) key);
1817 * Shutdown peer management subsystem.
1820 GSF_connected_peer_done_ ()
1822 cron_flush_respect (NULL, NULL);
1823 GNUNET_CONTAINER_multihashmap_iterate (cp_map, &clean_peer, NULL);
1824 GNUNET_CONTAINER_multihashmap_destroy (cp_map);
1826 GNUNET_free (respectDirectory);
1827 respectDirectory = NULL;
1832 * Iterator to remove references to LC entry.
1834 * @param cls the 'struct GSF_LocalClient*' to look for
1835 * @param key current key code
1836 * @param value value in the hash map (peer entry)
1837 * @return GNUNET_YES (we should continue to iterate)
1840 clean_local_client (void *cls, const struct GNUNET_HashCode * key, void *value)
1842 const struct GSF_LocalClient *lc = cls;
1843 struct GSF_ConnectedPeer *cp = value;
1846 for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1847 if (cp->ppd.last_client_replies[i] == lc)
1848 cp->ppd.last_client_replies[i] = NULL;
1854 * Notification that a local client disconnected. Clean up all of our
1855 * references to the given handle.
1857 * @param lc handle to the local client (henceforth invalid)
1860 GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1863 return; /* already cleaned up */
1864 GNUNET_CONTAINER_multihashmap_iterate (cp_map, &clean_local_client,
1869 /* end of gnunet-service-fs_cp.c */