2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_cp.c
23 * @brief API to handle 'connected peers'
24 * @author Christian Grothoff
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
31 * How often do we flush trust values to disk?
33 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
36 struct GSF_PeerTransmitHandle
40 * Handle for an active request for transmission to this
41 * peer, or NULL (if core queue was full).
43 struct GNUNET_CORE_TransmitHandle *cth;
46 * Time when this transmission request was issued.
48 struct GNUNET_TIME_Absolute transmission_request_start_time;
51 * Timeout for this request.
53 struct GNUNET_TIME_Absolute timeout;
56 * Task called on timeout, or 0 for none.
58 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
61 * Function to call to get the actual message.
63 GSF_GetMessageCallback gmc;
66 * Peer this request targets.
68 struct GSF_ConnectedPeer *cp;
76 * Size of the message to be transmitted.
81 * GNUNET_YES if this is a query, GNUNET_NO for content.
86 * Priority of this request.
96 struct GSF_ConnectedPeer
100 * Performance data for this peer.
102 struct GSF_PeerPerformanceData ppd;
105 * Time until when we blocked this peer from migrating
108 struct GNUNET_TIME_Absolute last_migration_block;
111 * Messages (replies, queries, content migration) we would like to
112 * send to this peer in the near future. Sorted by priority, head.
114 struct GSF_PeerTransmitHandle *pth_head;
117 * Messages (replies, queries, content migration) we would like to
118 * send to this peer in the near future. Sorted by priority, tail.
120 struct GSF_PeerTransmitHandle *pth_tail;
123 * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
125 struct GNUNET_CORE_InformationRequestContext *irc;
128 * ID of delay task for scheduling transmission.
130 GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
133 * Increase in traffic preference still to be submitted
134 * to the core service for this peer.
136 uint64_t inc_preference;
139 * Trust rating for this peer
144 * Trust rating for this peer on disk.
149 * The peer's identity.
154 * Which offset in "last_p2p_replies" will be updated next?
155 * (we go round-robin).
157 unsigned int last_p2p_replies_woff;
160 * Which offset in "last_client_replies" will be updated next?
161 * (we go round-robin).
163 unsigned int last_client_replies_woff;
166 * Current offset into 'last_request_times' ring buffer.
168 unsigned int last_request_times_off;
174 * Map from peer identities to 'struct GSF_ConnectPeer' entries.
176 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
180 * Where do we store trust information?
182 static char *trustDirectory;
186 * Get the filename under which we would store the GNUNET_HELLO_Message
187 * for the given host and protocol.
188 * @return filename of the form DIRECTORY/HOSTID
191 get_trust_filename (const struct GNUNET_PeerIdentity *id)
193 struct GNUNET_CRYPTO_HashAsciiEncoded fil;
196 GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
197 GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
203 * Find latency information in 'atsi'.
205 * @param atsi performance data
206 * @return connection latency
208 static struct GNUNET_TIME_Relative
209 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
212 return GNUNET_TIME_UNIT_SECONDS;
213 while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
214 (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
216 if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR)
219 /* how can we not have latency data? */
220 return GNUNET_TIME_UNIT_SECONDS;
222 return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
223 ntohl (atsi->value));
228 * Update the performance information kept for the given peer.
230 * @param cp peer record to update
231 * @param atsi transport performance data
234 update_atsi (struct GSF_ConnectedPeer *cp,
235 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
237 // FIXME: merge atsi into cp's performance data!
242 * A peer connected to us. Setup the connected peer
245 * @param peer identity of peer that connected
246 * @param atsi performance data for the connection
247 * @return handle to connected peer entry
249 struct GSF_ConnectedPeer *
250 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
251 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
253 struct GSF_ConnectedPeer *cp;
256 struct GNUNET_TIME_Relative latency;
258 cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
259 cp->transmission_delay = GNUNET_LOAD_value_init (latency);
260 cp->pid = GNUNET_PEER_intern (peer);
261 fn = get_trust_filename (peer);
262 if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
263 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
264 cp->disk_trust = cp->trust = ntohl (trust);
266 GNUNET_break (GNUNET_OK ==
267 GNUNET_CONTAINER_multihashmap_put (cp_map,
270 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
271 update_atsi (cp, atsi);
274 // FIXME: notify plan & migration about new peer!
281 * Core is ready to transmit to a peer, get the message.
283 * @param cls the 'struct GSF_PeerTransmitHandle' of the message
284 * @param size number of bytes core is willing to take
285 * @param buf where to copy the message
286 * @return number of bytes copied to buf
289 peer_transmit_ready_cb (void *cls,
293 struct GSF_PeerTransmitHandle *pth = cls;
294 struct GSF_ConnectedPeer *cp;
298 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
301 // FIXME: update 'cp' counters!
302 ret = pth->gmc (pth->gmc_cls,
310 * Function called if there has been a timeout trying to satisfy
311 * a transmission request.
313 * @param cls the 'struct GSF_PeerTransmitHandle' of the request
314 * @param tc scheduler context
317 peer_transmit_timeout (void *cls,
318 const struct GNUNET_SCHEDULER_TaskContext *tc)
320 struct GSF_PeerTransmitHandle *pth = cls;
321 struct GSF_ConnectedPeer *cp;
323 pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
325 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
328 // FIXME: update 'cp' counters!
329 pth->gmc (pth->gmc_cls,
336 * Transmit a message to the given peer as soon as possible.
337 * If the peer disconnects before the transmission can happen,
338 * the callback is invoked with a 'NULL' buffer.
340 * @param peer target peer
341 * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO)
342 * @param priority how important is this request?
343 * @param timeout when does this request timeout (call gmc with error)
344 * @param size number of bytes we would like to send to the peer
345 * @param gmc function to call to get the message
346 * @param gmc_cls closure for gmc
347 * @return handle to cancel request
349 struct GSF_PeerTransmitHandle *
350 GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
353 struct GNUNET_TIME_Relative timeout,
355 GSF_GetMessageCallback gmc,
358 struct GSF_ConnectedPeer *cp;
359 struct GSF_PeerTransmitHandle *pth;
360 struct GSF_PeerTransmitHandle *pos;
361 struct GSF_PeerTransmitHandle *prev;
362 struct GNUNET_PeerIdentity target;
364 cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
366 GNUNET_assert (NULL != cp);
367 pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
368 pth->transmission_request_start_time = GNUNET_TIME_absolute_now ();
369 pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
371 pth->gmc_cls = gmc_cls;
373 pth->is_query = is_query;
374 pth->priority = priority;
376 /* insertion sort (by priority, descending) */
379 while ( (pos != NULL) &&
380 (pos->priority > priority) )
386 GNUNET_CONTAINER_DLL_insert_head (cp->pth_head,
390 GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
394 GNUNET_PEER_resolve (cp->pid,
396 pth->cth = GNUNET_CORE_notify_transmit_ready (core,
401 &peer_transmit_ready_cb,
403 /* pth->cth could be NULL here, that's OK, we'll try again
405 if (pth->cth == NULL)
406 pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
407 &peer_transmit_timeout,
414 * Cancel an earlier request for transmission.
416 * @param pth request to cancel
419 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
421 struct GSF_PeerTransmitHandle *pth = cls;
422 struct GSF_ConnectedPeer *cp;
424 if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
426 GNUNET_SCHEDULER_cancel (pth->timeout_task);
427 pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
430 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
433 // FIXME: update 'cp' counters!
439 * Report on receiving a reply; update the performance record of the given peer.
441 * @param peer responding peer (will be updated)
442 * @param request_time time at which the original query was transmitted
443 * @param request_priority priority of the original request
444 * @param initiator_client local client on responsible for query (or NULL)
445 * @param initiator_peer other peer responsible for query (or NULL)
448 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *peer,
449 GNUNET_TIME_Absolute request_time,
450 uint32_t request_priority,
451 const struct GSF_LocalClient *initiator_client,
452 const struct GSF_ConnectedPeer *initiator_peer)
459 * Method called whenever a given peer has a status change.
462 * @param peer peer identity this notification is about
463 * @param bandwidth_in available amount of inbound bandwidth
464 * @param bandwidth_out available amount of outbound bandwidth
465 * @param timeout absolute time when this peer will time out
466 * unless we see some further activity from it
467 * @param atsi status information
470 GSF_peer_status_handler_ (void *cls,
471 const struct GNUNET_PeerIdentity *peer,
472 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
473 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
474 struct GNUNET_TIME_Absolute timeout,
475 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
477 struct GSF_ConnectedPeer *cp;
479 cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
481 GNUNET_assert (NULL != cp);
482 update_atsi (cp, atsi);
487 * A peer disconnected from us. Tear down the connected peer
491 * @param peer identity of peer that connected
494 GSF_peer_disconnect_handler_ (void *cls,
495 const struct GNUNET_PeerIdentity *peer)
497 struct GSF_ConnectedPeer *cp;
499 cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
501 GNUNET_assert (NULL != cp);
502 GNUNET_CONTAINER_multihashmap_remove (cp_map,
505 // FIXME: more cleanup
511 * Closure for 'call_iterator'.
513 struct IterationContext
516 * Function to call on each entry.
518 GSF_ConnectedPeerIterator it;
528 * Function that calls the callback for each peer.
530 * @param cls the 'struct IterationContext*'
531 * @param key identity of the peer
532 * @param value the 'struct GSF_ConnectedPeer*'
533 * @return GNUNET_YES to continue iteration
536 call_iterator (void *cls,
537 const GNUNET_HashCode *key,
540 struct IterationContext *ic = cls;
541 struct GSF_ConnectedPeer *cp = value;
544 (const struct GNUNET_PeerIdentity*) key,
552 * Iterate over all connected peers.
554 * @param it function to call for each peer
555 * @param it_cls closure for it
558 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
561 struct IterationContext ic;
565 GNUNET_CONTAINER_multihashmap_iterate (cp_map,
572 * Try to reserve bandwidth (to receive data FROM the given peer).
573 * This function must only be called ONCE per connected peer at a
574 * time; it can be called again after the 'rc' callback was invoked.
575 * If the peer disconnects, the request is (silently!) ignored (and
576 * the requester is responsible to register for notification about the
577 * peer disconnect if any special action needs to be taken in this
580 * @param cp peer to reserve bandwidth from
581 * @param size number of bytes to reserve
582 * @param rc function to call upon reservation success or failure
583 * @param rc_cls closure for rc
586 GSF_connected_peer_reserve_ (struct GSF_ConnectedPeer *cp,
588 GSF_PeerReserveCallback rc,
591 // FIXME: should we allow queueing multiple reservation requests?
592 // FIXME: what about cancellation?
593 // FIXME: change docu on peer disconnect handling?
596 rc (rc_cls, cp, GNUNET_NO);
604 * Write host-trust information to a file - flush the buffer entry!
606 * @param cls closure, not used
607 * @param key host identity
608 * @param value the 'struct GSF_ConnectedPeer' to flush
609 * @return GNUNET_OK to continue iteration
612 flush_trust (void *cls,
613 const GNUNET_HashCode *key,
616 struct GSF_ConnectedPeer *cp = value;
619 struct GNUNET_PeerIdentity pid;
621 if (cp->trust == cp->disk_trust)
622 return GNUNET_OK; /* unchanged */
623 GNUNET_PEER_resolve (cp->pid,
625 fn = get_trust_filename (&pid);
628 if ((0 != UNLINK (fn)) && (errno != ENOENT))
629 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
630 GNUNET_ERROR_TYPE_BULK, "unlink", fn);
634 trust = htonl (cp->trust);
635 if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust,
637 GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
638 | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
639 cp->disk_trust = cp->trust;
647 * Call this method periodically to flush trust information to disk.
649 * @param cls closure, not used
650 * @param tc task context, not used
653 cron_flush_trust (void *cls,
654 const struct GNUNET_SCHEDULER_TaskContext *tc)
659 GNUNET_CONTAINER_multihashmap_iterate (cp_map,
664 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
666 GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ,
673 * Initialize peer management subsystem.
675 * @param cfg configuration to use
678 GSF_connected_peer_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
680 cp_map = GNUNET_CONTAINER_multihashmap_create (128);
681 GNUNET_assert (GNUNET_OK ==
682 GNUNET_CONFIGURATION_get_value_filename (cfg,
686 GNUNET_DISK_directory_create (trustDirectory);
687 GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
688 &cron_flush_trust, NULL);
693 * Iterator to free peer entries.
695 * @param cls closure, unused
696 * @param key current key code
697 * @param value value in the hash map (peer entry)
698 * @return GNUNET_YES (we should continue to iterate)
701 clean_peer (void *cls,
702 const GNUNET_HashCode * key,
705 GSF_peer_disconnect_handler_ (NULL,
706 (const struct GNUNET_PeerIdentity*) key);
712 * Shutdown peer management subsystem.
715 GSF_connected_peer_done_ ()
717 cron_flush_trust (NULL, NULL);
718 GNUNET_CONTAINER_multihashmap_iterate (cp_peers,
721 GNUNET_CONTAINER_multihashmap_destroy (cp_map);
723 GNUNET_free (trustDirectory);
724 trustDirectory = NULL;
730 /* end of gnunet-service-fs_cp.h */