/**
* Context of our GNUNET_CORE_peer_change_preference call (or NULL).
+ * NULL if we have successfully reserved 32k, otherwise non-NULL.
*/
struct GNUNET_CORE_InformationRequestContext *irc;
/**
* ID of delay task for scheduling transmission.
*/
- GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
+ GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: unused!
/**
* Increase in traffic preference still to be submitted
update_atsi (struct GSF_ConnectedPeer *cp,
const struct GNUNET_TRANSPORT_ATS_Information *atsi)
{
- // FIXME: merge atsi into cp's performance data!
+ struct GNUNET_TIME_Relative latency;
+
+ latency = get_latency (atsi);
+ GNUNET_LOAD_value_set_decline (cp->transmission_delay,
+ latency);
+ /* LATER: merge atsi into cp's performance data (if we ever care...) */
+}
+
+
+/**
+ * Core is ready to transmit to a peer, get the message.
+ *
+ * @param cls the 'struct GSF_PeerTransmitHandle' of the message
+ * @param size number of bytes core is willing to take
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
+ */
+static size_t
+peer_transmit_ready_cb (void *cls,
+ size_t size,
+ void *buf)
+{
+ struct GSF_PeerTransmitHandle *pth = cls;
+ struct GSF_ConnectedPeer *cp;
+ size_t ret;
+
+ if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (pth->timeout_task);
+ pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ cp = pth->cp;
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
+ if (pth->is_query)
+ {
+ cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
+ GNUNET_assert (0 < cp->ppd.pending_queries--);
+ }
+ else
+ {
+ GNUNET_assert (0 < cp->ppd.pending_replies--);
+ }
+ GNUNET_LOAD_update (cp->ppd.transmission_delay,
+ GNUNET_TIME_absolute_get_duration (pth->request_start_time).rel_value);
+ ret = pth->gmc (pth->gmc_cls,
+ 0, NULL);
+ GNUNET_free (pth);
+ return ret;
+}
+
+
+/**
+ * Function called by core upon success or failure of our bandwidth reservation request.
+ *
+ * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
+ * @param peer identifies the peer
+ * @param bandwidth_out available amount of outbound bandwidth
+ * @param amount set to the amount that was actually reserved or unreserved;
+ * either the full requested amount or zero (no partial reservations)
+ * @param preference current traffic preference for the given peer
+ */
+static void
+core_reserve_callback (void *cls,
+ const struct GNUNET_PeerIdentity * peer,
+ struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
+ int amount,
+ uint64_t preference)
+{
+ struct GSF_ConnectedPeer *cp = cls;
+ uint64_t ip;
+
+ cp->irc = NULL;
+ if (0 == amount)
+ {
+ /* failed; retry! (how did we get here!?) */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Failed to reserve bandwidth to peer `%s'\n"),
+ GNUNET_i2s (peer));
+ ip = cp->inc_preference;
+ cp->inc_preference = 0;
+ cp->irc = GNUNET_CORE_peer_change_preference (core,
+ peer,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_BANDWIDTH_VALUE_MAX,
+ GNUNET_FS_DBLOCK_SIZE,
+ ip,
+ &core_reserve_callback,
+ cp);
+ return;
+ }
+ pth = cp->pth_head;
+ if ( (NULL != pth) &&
+ (NULL == pth->cth) )
+ {
+ /* reservation success, try transmission now! */
+ pth->cth = GNUNET_CORE_notify_transmit_ready (core,
+ priority,
+ GNUNET_TIME_absolute_get_remaining (pth->timeout),
+ &target,
+ size,
+ &peer_transmit_ready_cb,
+ pth);
+ }
}
cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
cp->transmission_delay = GNUNET_LOAD_value_init (latency);
cp->pid = GNUNET_PEER_intern (peer);
+ cp->transmission_delay = GNUNET_LOAD_value_init (0);
+ cp->irc = GNUNET_CORE_peer_change_preference (core,
+ peer,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_BANDWIDTH_VALUE_MAX,
+ GNUNET_FS_DBLOCK_SIZE,
+ 0,
+ &core_reserve_callback,
+ cp);
fn = get_trust_filename (peer);
if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
(sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
cp,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
update_atsi (cp, atsi);
-
-
- // FIXME: notify plan & migration about new peer!
-
+ GSF_plan_notify_new_peer_ (cp);
return cp;
}
/**
- * Core is ready to transmit to a peer, get the message.
+ * Handle P2P "MIGRATION_STOP" message.
*
- * @param cls the 'struct GSF_PeerTransmitHandle' of the message
- * @param size number of bytes core is willing to take
- * @param buf where to copy the message
- * @return number of bytes copied to buf
+ * @param cls closure, always NULL
+ * @param other the other peer involved (sender or receiver, NULL
+ * for loopback messages where we are both sender and receiver)
+ * @param message the actual message
+ * @param atsi performance information
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
*/
-static size_t
-peer_transmit_ready_cb (void *cls,
- size_t size,
- void *buf)
+int
+GSF_handle_p2p_migration_stop_ (void *cls,
+ const struct GNUNET_PeerIdentity *other,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_TRANSPORT_ATS_Information *atsi)
{
- struct GSF_PeerTransmitHandle *pth = cls;
- struct GSF_ConnectedPeer *cp;
- size_t ret;
+ struct GSF_ConnectedPeer *cp;
+ const struct MigrationStopMessage *msm;
- cp = pth->cp;
- GNUNET_CONTAINER_DLL_remove (cp->pth_head,
- cp->pth_tail,
- pth);
- // FIXME: update 'cp' counters!
- ret = pth->gmc (pth->gmc_cls,
- 0, NULL);
- GNUNET_free (pth);
- return ret;
+ msm = (const struct MigrationStopMessage*) message;
+ cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
+ &other->hashPubKey);
+ if (cp == NULL)
+ {
+ GNUNET_break (0);
+ return GNUNET_OK;
+ }
+ cp->ppd.migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
+ update_atsi (cp, atsi);
+ return GNUNET_OK;
}
GNUNET_CONTAINER_DLL_remove (cp->pth_head,
cp->pth_tail,
pth);
- // FIXME: update 'cp' counters!
+ if (pth->is_query)
+ GNUNET_assert (0 < cp->ppd.pending_queries--);
+ else
+ GNUNET_assert (0 < cp->ppd.pending_replies--);
+ GNUNET_LOAD_update (cp->ppd.transmission_delay,
+ UINT64_MAX);
pth->gmc (pth->gmc_cls,
0, NULL);
GNUNET_free (pth);
struct GSF_PeerTransmitHandle *pos;
struct GSF_PeerTransmitHandle *prev;
struct GNUNET_PeerIdentity target;
+ uint64_t ip;
+ int is_ready;
cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
&peer->hashPubKey);
pth);
GNUNET_PEER_resolve (cp->pid,
&target);
- pth->cth = GNUNET_CORE_notify_transmit_ready (core,
- priority,
- timeout,
- &target,
- size,
- &peer_transmit_ready_cb,
- pth);
- /* pth->cth could be NULL here, that's OK, we'll try again
- later... */
+ if (is_query)
+ {
+ /* query, need reservation */
+ if (NULL == cp->irc)
+ {
+ /* reservation already done! */
+ is_ready = GNUNET_YES;
+ ip = cp->inc_preference;
+ cp->inc_preference = 0;
+ cp->irc = GNUNET_CORE_peer_change_preference (core,
+ peer,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_BANDWIDTH_VALUE_MAX,
+ GNUNET_FS_DBLOCK_SIZE,
+ ip,
+ &core_reserve_callback,
+ cp);
+ }
+ else
+ {
+ /* still waiting for reservation */
+ is_ready = GNUNET_NO;
+ }
+ }
+ else
+ {
+ /* no reservation needed for content */
+ is_ready = GNUNET_YES;
+ }
+ if (is_ready)
+ {
+ pth->cth = GNUNET_CORE_notify_transmit_ready (core,
+ priority,
+ timeout,
+ &target,
+ size,
+ &peer_transmit_ready_cb,
+ pth);
+ /* pth->cth could be NULL here, that's OK, we'll try again
+ later... */
+ }
if (pth->cth == NULL)
- pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
- &peer_transmit_timeout,
- pth);
+ {
+ /* if we're waiting for reservation OR if we could not do notify_transmit_ready,
+ install a timeout task to be on the safe side */
+ pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
+ &peer_transmit_timeout,
+ pth);
+ }
return pth;
}
GNUNET_SCHEDULER_cancel (pth->timeout_task);
pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
}
+ if (NULL != pth->cth)
+ {
+ GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
+ pth->cth = NULL;
+ }
cp = pth->cp;
GNUNET_CONTAINER_DLL_remove (cp->pth_head,
cp->pth_tail,
pth);
- // FIXME: update 'cp' counters!
+ if (pth->is_query)
+ GNUNET_assert (0 < cp->ppd.pending_queries--);
+ else
+ GNUNET_assert (0 < cp->ppd.pending_replies--);
GNUNET_free (pth);
}
/**
* Report on receiving a reply; update the performance record of the given peer.
*
- * @param peer responding peer (will be updated)
+ * @param cp responding peer (will be updated)
* @param request_time time at which the original query was transmitted
* @param request_priority priority of the original request
* @param initiator_client local client on responsible for query (or NULL)
* @param initiator_peer other peer responsible for query (or NULL)
*/
void
-GSF_peer_update_performance_ (struct GSF_ConnectedPeer *peer,
- GNUNET_TIME_Absolute request_time,
+GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
+ struct GNUNET_TIME_Absolute request_time,
uint32_t request_priority,
const struct GSF_LocalClient *initiator_client,
const struct GSF_ConnectedPeer *initiator_peer)
{
- // FIXME...
+ struct GNUNET_TIME_Relative delay;
+ unsigned int i;
+
+ delay = GNUNET_TIME_absolute_get_duration (request_time);
+ cp->ppd.avg_reply_delay = (cp->ppd.avg_reply_delay * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
+ cp->ppd.avg_priority = (cp->avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
+ if (NULL != initiator_client)
+ {
+ cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
+ }
+ else if (NULL != initiator_peer)
+ {
+ GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
+ cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid;
+ GNUNET_PEER_change_rc (initiator_peer->pid, 1);
+ }
+ else
+ GNUNET_break (0);
}
const struct GNUNET_PeerIdentity *peer)
{
struct GSF_ConnectedPeer *cp;
+ struct GSF_PeerTransmitHandle *pth;
cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
&peer->hashPubKey);
GNUNET_CONTAINER_multihashmap_remove (cp_map,
&peer->hashPubKey,
cp);
- // FIXME: more cleanup
+ if (NULL != cp->irc)
+ {
+ GNUNET_CORE_peer_change_preference_cancel (cp->irc);
+ cp->irc = NULL;
+ }
+ GSF_plan_notify_peer_disconnect_ (cp);
+ GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
+ GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
+ while (NULL != (pth = cp->pth_head))
+ {
+ if (NULL != pth->th)
+ {
+ GNUNET_CORE_notify_transmit_ready_cancel (pth->th);
+ pth->th = NULL;
+ }
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
+ GNUNET_free (pth);
+ }
+ GNUNET_PEER_change_rc (cp->pid, -1);
GNUNET_free (cp);
}
}
-/**
- * Try to reserve bandwidth (to receive data FROM the given peer).
- * This function must only be called ONCE per connected peer at a
- * time; it can be called again after the 'rc' callback was invoked.
- * If the peer disconnects, the request is (silently!) ignored (and
- * the requester is responsible to register for notification about the
- * peer disconnect if any special action needs to be taken in this
- * case).
- *
- * @param cp peer to reserve bandwidth from
- * @param size number of bytes to reserve
- * @param rc function to call upon reservation success or failure
- * @param rc_cls closure for rc
- */
-void
-GSF_connected_peer_reserve_ (struct GSF_ConnectedPeer *cp,
- size_t size,
- GSF_PeerReserveCallback rc,
- void *rc_cls)
-{
- // FIXME: should we allow queueing multiple reservation requests?
- // FIXME: what about cancellation?
- // FIXME: change docu on peer disconnect handling?
- if (NULL != cp->irc)
- {
- rc (rc_cls, cp, GNUNET_NO);
- return;
- }
- // FIXME...
-}
-
-
/**
* Write host-trust information to a file - flush the buffer entry!
*
}
+/**
+ * Notify core about a preference we have for the given peer
+ * (to allocate more resources towards it). The change will
+ * be communicated the next time we reserve bandwidth with
+ * core (not instantly).
+ *
+ * @param cp peer to reserve bandwidth from
+ * @param pref preference change
+ */
+void
+GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
+ uint64_t pref)
+{
+ cp->inc_preference += pref;
+}
+
+
/**
* Call this method periodically to flush trust information to disk.
*
GSF_connected_peer_done_ ()
{
cron_flush_trust (NULL, NULL);
- GNUNET_CONTAINER_multihashmap_iterate (cp_peers,
+ GNUNET_CONTAINER_multihashmap_iterate (cp_map,
&clean_peer,
NULL);
GNUNET_CONTAINER_multihashmap_destroy (cp_map);
}
+/**
+ * Iterator to remove references to LC entry.
+ *
+ * @param the 'struct GSF_LocalClient*' to look for
+ * @param key current key code
+ * @param value value in the hash map (peer entry)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+clean_peer (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ const struct GSF_LocalClient *lc = cls;
+ struct GSF_ConnectedPeer *cp = value;
+ unsigned int i;
+
+ for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
+ if (cp->ppd.last_client_replies[i] == lc)
+ cp->ppd.last_client_replies[i] = NULL;
+ return GNUNET_YES;
+}
+
+
+/**
+ * Notification that a local client disconnected. Clean up all of our
+ * references to the given handle.
+ *
+ * @param lc handle to the local client (henceforth invalid)
+ */
+void
+GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
+{
+ GNUNET_CONTAINER_multihashmap_iterate (cp_map,
+ &clean_local_client,
+ (void*) lc);
+}
+
#endif
/* end of gnunet-service-fs_cp.h */
/**
* Report on receiving a reply; update the performance record of the given peer.
*
- * @param peer responding peer (will be updated)
+ * @param cp responding peer (will be updated)
* @param request_time time at which the original query was transmitted
* @param request_priority priority of the original request
* @param initiator_client local client on responsible for query (or NULL)
* @param initiator_peer other peer responsible for query (or NULL)
*/
void
-GSF_peer_update_performance_ (struct GSF_ConnectedPeer *peer,
- GNUNET_TIME_Absolute request_time,
+GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
+ struct GNUNET_TIME_Absolute request_time,
uint32_t request_priority,
const struct GSF_LocalClient *initiator_client,
const struct GSF_ConnectedPeer *initiator_peer);
const struct GNUNET_TRANSPORT_ATS_Information *atsi);
+/**
+ * Handle P2P "MIGRATION_STOP" message.
+ *
+ * @param cls closure, always NULL
+ * @param other the other peer involved (sender or receiver, NULL
+ * for loopback messages where we are both sender and receiver)
+ * @param message the actual message
+ * @param atsi performance information
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+int
+GSF_handle_p2p_migration_stop_ (void *cls,
+ const struct GNUNET_PeerIdentity *other,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_TRANSPORT_ATS_Information *atsi);
+
+
/**
* A peer disconnected from us. Tear down the connected peer
* record.
/**
- * Iterate over all connected peers.
+ * Notify core about a preference we have for the given peer
+ * (to allocate more resources towards it). The change will
+ * be communicated the next time we reserve bandwidth with
+ * core (not instantly).
*
- * @param it function to call for each peer
- * @param it_cls closure for it
+ * @param cp peer to reserve bandwidth from
+ * @param pref preference change
*/
void
-GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
- void *it_cls);
+GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
+ uint64_t pref);
-// FIXME: should we allow queueing multiple reservation requests?
-// FIXME: what about cancellation?
-// FIXME: change docu on peer disconnect handling?
/**
- * Try to reserve bandwidth (to receive data FROM the given peer).
- * This function must only be called ONCE per connected peer at a
- * time; it can be called again after the 'rc' callback was invoked.
- * If the peer disconnects, the request is (silently!) ignored (and
- * the requester is responsible to register for notification about the
- * peer disconnect if any special action needs to be taken in this
- * case).
+ * Iterate over all connected peers.
*
- * @param cp peer to reserve bandwidth from
- * @param size number of bytes to reserve
- * @param rc function to call upon reservation success
- * @param rc_cls closure for rc
+ * @param it function to call for each peer
+ * @param it_cls closure for it
*/
void
-GSF_connected_peer_reserve_ (struct GSF_ConnectedPeer *cp,
- size_t size,
- GSF_PeerReserveCallback rc,
- void *rc_cls);
+GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
+ void *it_cls);
/**