/*
This file is part of GNUnet.
- Copyright (C) 2009-2014 GNUnet e.V.
+ Copyright (C) 2009-2014, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
/**
* Identity of this peer.
*/
-static struct GNUNET_PeerIdentity my_id;
+struct GNUNET_PeerIdentity GSF_my_id;
/**
/**
- * Handle P2P "PUT" message.
+ * Check P2P "PUT" message.
*
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- * for loopback messages where we are both sender and receiver)
+ * @param cls closure with the `struct GSF_ConnectedPeer`
* @param message the actual message
* @return #GNUNET_OK to keep the connection open,
* #GNUNET_SYSERR to close it (signal serious error)
*/
static int
-handle_p2p_put (void *cls,
- const struct GNUNET_PeerIdentity *other,
- const struct GNUNET_MessageHeader *message)
+check_p2p_put (void *cls,
+ const struct PutMessage *put)
{
- struct GSF_ConnectedPeer *cp;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received P2P PUT from %s\n",
- GNUNET_i2s (other));
- cp = GSF_peer_get_ (other);
- if (NULL == cp)
+ enum GNUNET_BLOCK_Type type;
+
+ type = ntohl (put->type);
+ if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
{
- GNUNET_break (0);
- return GNUNET_OK;
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- GSF_cover_content_count++;
- return GSF_handle_p2p_content_ (cp, message);
+ return GNUNET_OK;
}
{
struct GSF_PendingRequest *pr = cls;
- if (GNUNET_YES != GSF_pending_request_test_target_ (pr, peer))
+ if (GNUNET_YES !=
+ GSF_pending_request_test_target_ (pr, peer))
{
#if INSANE_STATISTICS
GNUNET_STATISTICS_update (GSF_stats,
#endif
return;
}
- GSF_plan_add_ (cp, pr);
+ GSF_plan_add_ (cp,
+ pr);
}
* @param pr the pending request we were processing
* @param result final datastore lookup result
*/
-static void
-consider_forwarding (void *cls,
- struct GSF_PendingRequest *pr,
- enum GNUNET_BLOCK_EvaluationResult result)
+void
+GSF_consider_forwarding (void *cls,
+ struct GSF_PendingRequest *pr,
+ enum GNUNET_BLOCK_EvaluationResult result)
{
if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
return; /* we're done... */
/**
- * Handle P2P "GET" request.
+ * Check P2P "GET" request.
*
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- * for loopback messages where we are both sender and receiver)
- * @param message the actual message
+ * @param cls closure
+ * @param gm the actual message
* @return #GNUNET_OK to keep the connection open,
* #GNUNET_SYSERR to close it (signal serious error)
*/
static int
-handle_p2p_get (void *cls,
- const struct GNUNET_PeerIdentity *other,
- const struct GNUNET_MessageHeader *message)
+check_p2p_get (void *cls,
+ const struct GetMessage *gm)
{
- struct GSF_PendingRequest *pr;
-
- pr = GSF_handle_p2p_query_ (other,
- message);
- if (NULL == pr)
- return GNUNET_OK; /* exists, identical to existing request, or malformed */
- GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
- GSF_local_lookup_ (pr,
- &consider_forwarding,
- NULL);
- return GNUNET_OK;
+ size_t msize;
+ unsigned int bm;
+ unsigned int bits;
+ size_t bfsize;
+
+ msize = ntohs (gm->header.size);
+ bm = ntohl (gm->hash_bitmap);
+ bits = 0;
+ while (bm > 0)
+ {
+ if (1 == (bm & 1))
+ bits++;
+ bm >>= 1;
+ }
+ if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
+ /* bfsize must be power of 2, check! */
+ if (0 != ((bfsize - 1) & bfsize))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
}
prd = GSF_pending_request_get_data_ (pr);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Finished database lookup for local request `%s' with result %d\n",
- GNUNET_h2s (&prd->query), result);
+ GNUNET_h2s (&prd->query),
+ result);
if (0 == prd->anonymity_level)
{
switch (prd->type)
break;
}
}
- consider_forwarding (NULL, pr, result);
+ GSF_consider_forwarding (NULL, pr, result);
}
GSF_cadet_stop_server ();
if (NULL != GSF_core)
{
- GNUNET_CORE_disconnect (GSF_core);
+ GNUNET_CORE_disconnecT (GSF_core);
GSF_core = NULL;
}
if (NULL != GSF_ats)
/**
- * Function called for each pending request whenever a new
- * peer connects, giving us a chance to decide about submitting
- * the existing request to the new peer.
- *
- * @param cls the `struct GSF_ConnectedPeer` of the new peer
- * @param key query for the request
- * @param pr handle to the pending request
- * @return #GNUNET_YES to continue to iterate
- */
-static int
-consider_peer_for_forwarding (void *cls,
- const struct GNUNET_HashCode *key,
- struct GSF_PendingRequest *pr)
-{
- struct GSF_ConnectedPeer *cp = cls;
- struct GNUNET_PeerIdentity pid;
-
- if (GNUNET_YES !=
- GSF_pending_request_test_active_ (pr))
- return GNUNET_YES; /* request is not actually active, skip! */
- GSF_connected_peer_get_identity_ (cp, &pid);
- if (GNUNET_YES !=
- GSF_pending_request_test_target_ (pr, &pid))
- {
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# Loopback routes suppressed"),
- 1,
- GNUNET_NO);
- return GNUNET_YES;
- }
- GSF_plan_add_ (cp, pr);
- return GNUNET_YES;
-}
-
-
-/**
- * Function called after the creation of a connected peer record is complete.
- *
- * @param cls closure (unused)
- * @param cp handle to the newly created connected peer record
- */
-static void
-connected_peer_cb (void *cls,
- struct GSF_ConnectedPeer *cp)
-{
- if (NULL == cp)
- return;
- GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
- cp);
-}
-
-
-/**
- * Method called whenever a given peer connects.
- *
- * @param cls closure, not used
- * @param peer peer identity this notification is about
- */
-static void
-peer_connect_handler (void *cls,
- const struct GNUNET_PeerIdentity *peer)
-{
- if (0 ==
- GNUNET_CRYPTO_cmp_peer_identity (&my_id,
- peer))
- return;
- GSF_peer_connect_handler_ (peer,
- &connected_peer_cb,
- NULL);
-}
-
-
-/**
- * Function called after GNUNET_CORE_connect has succeeded
+ * Function called after GNUNET_CORE_connecT has succeeded
* (or failed for good). Note that the private key of the
* peer is intentionally not exposed here; if you need it,
* your process should try to read the private key file
peer_init_handler (void *cls,
const struct GNUNET_PeerIdentity *my_identity)
{
- if (0 != GNUNET_CRYPTO_cmp_peer_identity (&my_id,
+ if (0 != GNUNET_CRYPTO_cmp_peer_identity (&GSF_my_id,
my_identity))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
main_init (struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *c)
{
- static const struct GNUNET_CORE_MessageHandler no_p2p_handlers[] = {
- { NULL, 0, 0 }
+ GNUNET_MQ_hd_var_size (p2p_get,
+ GNUNET_MESSAGE_TYPE_FS_GET,
+ struct GetMessage);
+ GNUNET_MQ_hd_var_size (p2p_put,
+ GNUNET_MESSAGE_TYPE_FS_PUT,
+ struct PutMessage);
+ GNUNET_MQ_hd_fixed_size (p2p_migration_stop,
+ GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
+ struct MigrationStopMessage);
+ struct GNUNET_MQ_MessageHandler no_p2p_handlers[] = {
+ GNUNET_MQ_handler_end ()
};
- static const struct GNUNET_CORE_MessageHandler p2p_handlers[] = {
- { &handle_p2p_get,
- GNUNET_MESSAGE_TYPE_FS_GET, 0 },
- { &handle_p2p_put,
- GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
- { &GSF_handle_p2p_migration_stop_,
- GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
- sizeof (struct MigrationStopMessage) },
- { NULL, 0, 0 }
+ struct GNUNET_MQ_MessageHandler p2p_handlers[] = {
+ make_p2p_get_handler (NULL),
+ make_p2p_put_handler (NULL),
+ make_p2p_migration_stop_handler (NULL),
+ GNUNET_MQ_handler_end ()
};
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
{ &GNUNET_FS_handle_index_start, NULL,
GNUNET_free (keyfile);
GNUNET_assert (NULL != pk);
GNUNET_CRYPTO_eddsa_key_get_public (pk,
- &my_id.public_key);
+ &GSF_my_id.public_key);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"I am peer %s\n",
- GNUNET_i2s (&my_id));
+ GNUNET_i2s (&GSF_my_id));
GSF_core
- = GNUNET_CORE_connect (GSF_cfg, NULL,
+ = GNUNET_CORE_connecT (GSF_cfg,
+ NULL,
&peer_init_handler,
- &peer_connect_handler,
- &GSF_peer_disconnect_handler_,
- NULL, GNUNET_NO,
- NULL, GNUNET_NO,
+ &GSF_peer_connect_handler,
+ &GSF_peer_disconnect_handler,
(GNUNET_YES == anon_p2p_off)
? no_p2p_handlers
: p2p_handlers);
if (NULL == GSF_core)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("Failed to connect to `%s' service.\n"), "core");
+ _("Failed to connect to `%s' service.\n"),
+ "core");
return GNUNET_SYSERR;
}
- GNUNET_SERVER_disconnect_notify (server, &GSF_client_disconnect_handler_,
+ GNUNET_SERVER_disconnect_notify (server,
+ &GSF_client_disconnect_handler_,
NULL);
GNUNET_SERVER_add_handlers (server, handlers);
cover_age_task =
*/
extern struct GNUNET_ATS_PerformanceHandle *GSF_ats;
+/**
+ * Identity of this peer.
+ */
+extern struct GNUNET_PeerIdentity GSF_my_id;
/**
* Typical priorities we're seeing from other peers right now. Since
extern unsigned int GSF_datastore_queue_size;
+/**
+ * Function to be called after we're done processing
+ * replies from the local lookup. If the result status
+ * code indicates that there may be more replies, plan
+ * forwarding the request.
+ *
+ * @param cls closure (NULL)
+ * @param pr the pending request we were processing
+ * @param result final datastore lookup result
+ */
+void
+GSF_consider_forwarding (void *cls,
+ struct GSF_PendingRequest *pr,
+ enum GNUNET_BLOCK_EvaluationResult result);
+
+
/**
* Test if the DATABASE (GET) load on this peer is too high
* to even consider processing the query at
* all.
*
- * @return GNUNET_YES if the load is too high to do anything (load high)
- * GNUNET_NO to process normally (load normal)
- * GNUNET_SYSERR to process for free (load low)
+ * @return #GNUNET_YES if the load is too high to do anything (load high)
+ * #GNUNET_NO to process normally (load normal)
+ * #GNUNET_SYSERR to process for free (load low)
*/
int
GSF_test_get_load_too_high_ (uint32_t priority);
/*
This file is part of GNUnet.
- Copyright (C) 2011 GNUnet e.V.
+ Copyright (C) 2011, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
struct GNUNET_TIME_Absolute transmission_request_start_time;
/**
- * Timeout for this request.
+ * Envelope with the actual message.
*/
- struct GNUNET_TIME_Absolute timeout;
-
- /**
- * Task called on timeout, or 0 for none.
- */
- struct GNUNET_SCHEDULER_Task *timeout_task;
-
- /**
- * Function to call to get the actual message.
- */
- GSF_GetMessageCallback gmc;
+ struct GNUNET_MQ_Envelope *env;
/**
* Peer this request targets.
*/
struct GSF_ConnectedPeer *cp;
- /**
- * Closure for @e gmc.
- */
- void *gmc_cls;
-
- /**
- * Size of the message to be transmitted.
- */
- size_t size;
-
/**
* #GNUNET_YES if this is a query, #GNUNET_NO for content.
*/
struct GSF_ConnectedPeer *cp;
/**
- * The PUT that was delayed.
+ * Envelope of the message that was delayed.
*/
- struct PutMessage *pm;
+ struct GNUNET_MQ_Envelope *env;
/**
* Task for the delay.
*/
struct GSF_DelayedHandle *delayed_tail;
- /**
- * Migration stop message in our queue, or NULL if we have none pending.
- */
- struct GSF_PeerTransmitHandle *migration_pth;
-
/**
* Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
*/
/**
* Handle for an active request for transmission to this
- * peer, or NULL (if core queue was full).
+ * peer.
*/
- struct GNUNET_CORE_TransmitHandle *cth;
+ struct GNUNET_MQ_Handle *mq;
/**
* Increase in traffic preference still to be submitted
*/
uint64_t inc_preference;
- /**
- * Set to 1 if we're currently in the process of calling
- * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
- * NULL, we should not call notify_transmit_ready for this
- * handle right now).
- */
- unsigned int cth_in_progress;
-
/**
* Number of entries in @e delayed_head DLL.
*/
*/
int did_reserve;
- /**
- * Function called when the creation of this record is complete.
- */
- GSF_ConnectedPeerCreationCallback creation_cb;
-
- /**
- * Closure for @e creation_cb
- */
- void *creation_cb_cls;
-
/**
* Handle to the PEERSTORE iterate request for peer respect value
*/
/**
* Core is ready to transmit to a peer, get the message.
*
- * @param cls the `struct GSF_PeerTransmitHandle` of the message
- * @param size number of bytes core is willing to take
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
+ * @param cp which peer to send a message to
*/
-static size_t
-peer_transmit_ready_cb (void *cls,
- size_t size,
- void *buf);
+static void
+peer_transmit (struct GSF_ConnectedPeer *cp);
/**
struct GNUNET_PeerIdentity target;
cp = pth->cp;
- if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
- return; /* already done */
GNUNET_assert (0 != cp->ppd.pid);
GNUNET_PEER_resolve (cp->ppd.pid, &target);
cp);
return;
}
- GNUNET_assert (NULL == cp->cth);
- cp->cth_in_progress++;
- cp->cth =
- GNUNET_CORE_notify_transmit_ready (GSF_core,
- GNUNET_YES,
- GNUNET_CORE_PRIO_BACKGROUND,
- GNUNET_TIME_absolute_get_remaining (pth->timeout),
- &target,
- pth->size,
- &peer_transmit_ready_cb, cp);
- GNUNET_assert (NULL != cp->cth);
- GNUNET_assert (0 < cp->cth_in_progress--);
+ peer_transmit (cp);
}
/**
* Core is ready to transmit to a peer, get the message.
*
- * @param cls the `struct GSF_PeerTransmitHandle` of the message
- * @param size number of bytes core is willing to take
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
+ * @param cp which peer to send a message to
*/
-static size_t
-peer_transmit_ready_cb (void *cls,
- size_t size,
- void *buf)
+static void
+peer_transmit (struct GSF_ConnectedPeer *cp)
{
- struct GSF_ConnectedPeer *cp = cls;
struct GSF_PeerTransmitHandle *pth = cp->pth_head;
struct GSF_PeerTransmitHandle *pos;
- size_t ret;
- cp->cth = NULL;
if (NULL == pth)
- return 0;
- if (pth->size > size)
- {
- schedule_transmission (pth);
- return 0;
- }
- if (NULL != pth->timeout_task)
- {
- GNUNET_SCHEDULER_cancel (pth->timeout_task);
- pth->timeout_task = NULL;
- }
+ return;
GNUNET_CONTAINER_DLL_remove (cp->pth_head,
cp->pth_tail,
pth);
GNUNET_LOAD_update (cp->ppd.transmission_delay,
GNUNET_TIME_absolute_get_duration
(pth->transmission_request_start_time).rel_value_us);
- ret = pth->gmc (pth->gmc_cls, size, buf);
+ GNUNET_MQ_send (cp->mq,
+ pth->env);
+ GNUNET_free (pth);
if (NULL != (pos = cp->pth_head))
{
GNUNET_assert (pos != pth);
schedule_transmission (pos);
}
- GNUNET_free (pth);
- return ret;
}
}
cp->did_reserve = GNUNET_YES;
pth = cp->pth_head;
- if ( (NULL != pth) &&
- (NULL == cp->cth) &&
- (0 == cp->cth_in_progress) )
+ if (NULL != pth)
{
/* reservation success, try transmission now! */
- cp->cth_in_progress++;
- cp->cth =
- GNUNET_CORE_notify_transmit_ready (GSF_core,
- GNUNET_YES,
- GNUNET_CORE_PRIO_BACKGROUND,
- GNUNET_TIME_absolute_get_remaining (pth->timeout),
- peer,
- pth->size,
- &peer_transmit_ready_cb,
- cp);
- GNUNET_assert (NULL != cp->cth);
- GNUNET_assert (0 < cp->cth_in_progress--);
+ peer_transmit (cp);
}
}
struct GSF_ConnectedPeer *cp = cls;
GNUNET_assert (NULL != cp->respect_iterate_req);
- if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size))
- cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value);
+ if ( (NULL != record) &&
+ (sizeof (cp->disk_respect) == record->value_size))
+ {
+ cp->disk_respect = *((uint32_t *)record->value);
+ cp->ppd.respect += *((uint32_t *)record->value);
+ }
GSF_push_start_ (cp);
- if (NULL != cp->creation_cb)
- cp->creation_cb (cp->creation_cb_cls, cp);
if (NULL != record)
GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
cp->respect_iterate_req = NULL;
}
+/**
+ * Function called for each pending request whenever a new
+ * peer connects, giving us a chance to decide about submitting
+ * the existing request to the new peer.
+ *
+ * @param cls the `struct GSF_ConnectedPeer` of the new peer
+ * @param key query for the request
+ * @param pr handle to the pending request
+ * @return #GNUNET_YES to continue to iterate
+ */
+static int
+consider_peer_for_forwarding (void *cls,
+ const struct GNUNET_HashCode *key,
+ struct GSF_PendingRequest *pr)
+{
+ struct GSF_ConnectedPeer *cp = cls;
+ struct GNUNET_PeerIdentity pid;
+
+ if (GNUNET_YES !=
+ GSF_pending_request_test_active_ (pr))
+ return GNUNET_YES; /* request is not actually active, skip! */
+ GSF_connected_peer_get_identity_ (cp, &pid);
+ if (GNUNET_YES !=
+ GSF_pending_request_test_target_ (pr, &pid))
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# Loopback routes suppressed"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_YES;
+ }
+ GSF_plan_add_ (cp, pr);
+ return GNUNET_YES;
+}
+
+
/**
* A peer connected to us. Setup the connected peer
* records.
*
+ * @param cls NULL
* @param peer identity of peer that connected
- * @param creation_cb callback function when the record is created.
- * @param creation_cb_cls closure for @creation_cb
+ * @param mq message queue for talking to @a peer
+ * @return our internal handle for the peer
*/
-void
-GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
- GSF_ConnectedPeerCreationCallback creation_cb,
- void *creation_cb_cls)
+void *
+GSF_peer_connect_handler (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ struct GNUNET_MQ_Handle *mq)
{
struct GSF_ConnectedPeer *cp;
+ if (0 ==
+ GNUNET_CRYPTO_cmp_peer_identity (&GSF_my_id,
+ peer))
+ return NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Connected to peer %s\n",
GNUNET_i2s (peer));
cp = GNUNET_new (struct GSF_ConnectedPeer);
cp->ppd.pid = GNUNET_PEER_intern (peer);
+ cp->ppd.peer = peer;
+ cp->mq = mq;
cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
cp->rc =
GNUNET_ATS_reserve_bandwidth (GSF_ats,
gettext_noop ("# peers connected"),
GNUNET_CONTAINER_multipeermap_size (cp_map),
GNUNET_NO);
- cp->creation_cb = creation_cb;
- cp->creation_cb_cls = creation_cb_cls;
- cp->respect_iterate_req =
- GNUNET_PEERSTORE_iterate (peerstore, "fs",
- peer, "respect",
+ cp->respect_iterate_req
+ = GNUNET_PEERSTORE_iterate (peerstore,
+ "fs",
+ peer,
+ "respect",
GNUNET_TIME_UNIT_FOREVER_REL,
&peer_respect_cb,
cp);
+ GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
+ cp);
+ return cp;
}
/**
- * Handle P2P "MIGRATION_STOP" message.
+ * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
*
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- * for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
+ * @param cls closure, the `struct GSF_ConnectedPeer`
+ * @param msm the actual message
*/
-int
-GSF_handle_p2p_migration_stop_ (void *cls,
- const struct GNUNET_PeerIdentity *other,
- const struct GNUNET_MessageHeader *message)
+void
+handle_p2p_migration_stop (void *cls,
+ const struct MigrationStopMessage *msm)
{
- struct GSF_ConnectedPeer *cp;
- const struct MigrationStopMessage *msm;
+ struct GSF_ConnectedPeer *cp = cls;
struct GNUNET_TIME_Relative bt;
- msm = (const struct MigrationStopMessage *) message;
- cp = GSF_peer_get_ (other);
- if (NULL == cp)
- {
- GNUNET_break (0);
- return GNUNET_OK;
- }
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# migration stop messages received"),
1, GNUNET_NO);
bt = GNUNET_TIME_relative_ntoh (msm->duration);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_("Migration of content to peer `%s' blocked for %s\n"),
- GNUNET_i2s (other),
+ GNUNET_i2s (cp->ppd.peer),
GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
if ( (NULL == cp->mig_revive_task) &&
GNUNET_SCHEDULER_add_delayed (bt,
&revive_migration, cp);
}
- return GNUNET_OK;
-}
-
-
-/**
- * Copy reply and free put message.
- *
- * @param cls the `struct PutMessage`
- * @param buf_size number of bytes available in @a buf
- * @param buf where to copy the message, NULL on error (peer disconnect)
- * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
- */
-static size_t
-copy_reply (void *cls,
- size_t buf_size,
- void *buf)
-{
- struct PutMessage *pm = cls;
- size_t size;
-
- if (NULL != buf)
- {
- GNUNET_assert (buf_size >= ntohs (pm->header.size));
- size = ntohs (pm->header.size);
- GNUNET_memcpy (buf, pm, size);
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies transmitted to other peers"),
- 1,
- GNUNET_NO);
- }
- else
- {
- size = 0;
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies dropped"),
- 1,
- GNUNET_NO);
- }
- GNUNET_free (pm);
- return size;
}
cp->delayed_tail,
dh);
cp->delay_queue_size--;
- (void) GSF_peer_transmit_ (cp,
- GNUNET_NO,
- UINT32_MAX,
- REPLY_TIMEOUT,
- dh->msize,
- ©_reply,
- dh->pm);
+ GSF_peer_transmit_ (cp,
+ GNUNET_NO,
+ UINT32_MAX,
+ dh->env);
GNUNET_free (dh);
}
struct PeerRequest *peerreq = cls;
struct GSF_ConnectedPeer *cp = peerreq->cp;
struct GSF_PendingRequestData *prd;
+ struct GNUNET_MQ_Envelope *env;
struct PutMessage *pm;
size_t msize;
GSF_cover_content_count -= (reply_anonymity_level - 1);
}
- pm = GNUNET_malloc (msize);
- pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
- pm->header.size = htons (msize);
+ env = GNUNET_MQ_msg_extra (pm,
+ data_len,
+ GNUNET_MESSAGE_TYPE_FS_PUT);
pm->type = htonl (type);
pm->expiration = GNUNET_TIME_absolute_hton (expiration);
- GNUNET_memcpy (&pm[1], data, data_len);
+ GNUNET_memcpy (&pm[1],
+ data,
+ data_len);
if ( (UINT32_MAX != reply_anonymity_level) &&
(0 != reply_anonymity_level) &&
(GNUNET_YES == GSF_enable_randomized_delays) )
dh = GNUNET_new (struct GSF_DelayedHandle);
dh->cp = cp;
- dh->pm = pm;
+ dh->env = env;
dh->msize = msize;
GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
cp->delayed_tail,
}
else
{
- (void) GSF_peer_transmit_ (cp,
- GNUNET_NO,
- UINT32_MAX,
- REPLY_TIMEOUT,
- msize,
- ©_reply,
- pm);
+ GSF_peer_transmit_ (cp,
+ GNUNET_NO,
+ UINT32_MAX,
+ env);
}
if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
return;
* process replies properly. Does not initiate forwarding or
* local database lookups.
*
- * @param other the other peer involved (sender or receiver, NULL
- * for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return pending request handle, NULL on error
+ * @param cls the other peer involved (sender of the message)
+ * @param gm the GET message
*/
-struct GSF_PendingRequest *
-GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
- const struct GNUNET_MessageHeader *message)
+void
+handle_p2p_get (void *cls,
+ const struct GetMessage *gm)
{
+ struct GSF_ConnectedPeer *cps = cls;
struct PeerRequest *peerreq;
struct GSF_PendingRequest *pr;
struct GSF_ConnectedPeer *cp;
- struct GSF_ConnectedPeer *cps;
const struct GNUNET_PeerIdentity *target;
enum GSF_PendingRequestOptions options;
uint16_t msize;
- const struct GetMessage *gm;
unsigned int bits;
const struct GNUNET_PeerIdentity *opt;
uint32_t bm;
GNUNET_PEER_Id spid;
const struct GSF_PendingRequestData *prd;
- msize = ntohs (message->size);
- if (msize < sizeof (struct GetMessage))
- {
- GNUNET_break_op (0);
- return NULL;
- }
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop
- ("# GET requests received (from other peers)"),
- 1,
- GNUNET_NO);
- gm = (const struct GetMessage *) message;
+ msize = ntohs (gm->header.size);
tec.type = ntohl (gm->type);
bm = ntohl (gm->hash_bitmap);
bits = 0;
bits++;
bm >>= 1;
}
- if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
- {
- GNUNET_break_op (0);
- return NULL;
- }
opt = (const struct GNUNET_PeerIdentity *) &gm[1];
bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
- /* bfsize must be power of 2, check! */
- if (0 != ((bfsize - 1) & bfsize))
- {
- GNUNET_break_op (0);
- return NULL;
- }
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop
+ ("# GET requests received (from other peers)"),
+ 1,
+ GNUNET_NO);
GSF_cover_query_count++;
bm = ntohl (gm->hash_bitmap);
bits = 0;
- cps = GSF_peer_get_ (other);
- if (NULL == cps)
- {
- /* peer must have just disconnected */
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop
- ("# requests dropped due to initiator not being connected"),
- 1, GNUNET_NO);
- return NULL;
- }
if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
cp = GSF_peer_get_ (&opt[bits++]);
else
else
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Failed to find peer `%s' in connection set. Dropping query.\n",
- GNUNET_i2s (other));
+ GNUNET_i2s (cps->ppd.peer));
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# requests dropped due to missing reverse route"),
1,
GNUNET_NO);
- return NULL;
+ return;
}
if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Peer `%s' has too many replies queued already. Dropping query.\n",
- GNUNET_i2s (other));
+ GNUNET_i2s (cps->ppd.peer));
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# requests dropped due to full reply queue"),
1,
GNUNET_NO);
- return NULL;
+ return;
}
/* note that we can really only check load here since otherwise
* peers could find out that we are overloaded by not being
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Dropping query from `%s', this peer is too busy.\n",
- GNUNET_i2s (other));
- return NULL;
+ GNUNET_i2s (cps->ppd.peer));
+ return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received request for `%s' of type %u from peer `%s' with flags %u\n",
GNUNET_h2s (&gm->query),
(unsigned int) tec.type,
- GNUNET_i2s (other),
+ GNUNET_i2s (cps->ppd.peer),
(unsigned int) bm);
target =
(0 !=
* so at best indirect the query */
tec.priority = 0;
options |= GSF_PRO_FORWARD_ONLY;
- spid = GNUNET_PEER_intern (other);
+ spid = GNUNET_PEER_intern (cps->ppd.peer);
GNUNET_assert (0 != spid);
}
tec.ttl = bound_ttl (ntohl (gm->ttl),
ttl_decrement =
2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
TTL_DECREMENT);
- if ((tec.ttl < 0) && (((int32_t) (tec.ttl - ttl_decrement)) > 0))
+ if ( (tec.ttl < 0) &&
+ (((int32_t) (tec.ttl - ttl_decrement)) > 0) )
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Dropping query from `%s' due to TTL underflow (%d - %u).\n",
- GNUNET_i2s (other),
+ GNUNET_i2s (cps->ppd.peer),
tec.ttl,
ttl_decrement);
GNUNET_STATISTICS_update (GSF_stats,
("# requests dropped due TTL underflow"), 1,
GNUNET_NO);
/* integer underflow => drop (should be very rare)! */
- return NULL;
+ return;
}
tec.ttl -= ttl_decrement;
&test_exist_cb,
&tec);
if (GNUNET_YES == tec.finished)
- return NULL; /* merged into existing request, we're done */
+ return; /* merged into existing request, we're done */
peerreq = GNUNET_new (struct PeerRequest);
peerreq->cp = cp;
(uint32_t) tec.priority,
tec.ttl,
spid,
- GNUNET_PEER_intern (other),
+ GNUNET_PEER_intern (cps->ppd.peer),
NULL, 0, /* replies_seen */
&handle_p2p_reply,
peerreq);
gettext_noop ("# P2P searches active"),
1,
GNUNET_NO);
- return pr;
-}
-
-
-/**
- * Function called if there has been a timeout trying to satisfy
- * a transmission request.
- *
- * @param cls the `struct GSF_PeerTransmitHandle` of the request
- */
-static void
-peer_transmit_timeout (void *cls)
-{
- struct GSF_PeerTransmitHandle *pth = cls;
- struct GSF_ConnectedPeer *cp;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout trying to transmit to other peer\n");
- pth->timeout_task = NULL;
- cp = pth->cp;
- GNUNET_CONTAINER_DLL_remove (cp->pth_head,
- cp->pth_tail,
- pth);
- if (GNUNET_YES == pth->is_query)
- GNUNET_assert (0 < cp->ppd.pending_queries--);
- else if (GNUNET_NO == pth->is_query)
- GNUNET_assert (0 < cp->ppd.pending_replies--);
- GNUNET_LOAD_update (cp->ppd.transmission_delay,
- UINT64_MAX);
- if (NULL != cp->cth)
- {
- GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
- cp->cth = NULL;
- }
- pth->gmc (pth->gmc_cls, 0, NULL);
- GNUNET_assert (0 == cp->cth_in_progress);
- GNUNET_free (pth);
+ GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
+ GSF_local_lookup_ (pr,
+ &GSF_consider_forwarding,
+ NULL);
}
* @param cp target peer
* @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
* @param priority how important is this request?
- * @param timeout when does this request timeout (call gmc with error)
+ * @param timeout when does this request timeout
* @param size number of bytes we would like to send to the peer
- * @param gmc function to call to get the message
- * @param gmc_cls closure for @a gmc
- * @return handle to cancel request
+ * @param env message to send
*/
-struct GSF_PeerTransmitHandle *
+void
GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
int is_query,
uint32_t priority,
- struct GNUNET_TIME_Relative timeout,
- size_t size,
- GSF_GetMessageCallback gmc, void *gmc_cls)
+ struct GNUNET_MQ_Envelope *env)
{
struct GSF_PeerTransmitHandle *pth;
struct GSF_PeerTransmitHandle *pos;
pth = GNUNET_new (struct GSF_PeerTransmitHandle);
pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
- pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
- pth->gmc = gmc;
- pth->gmc_cls = gmc_cls;
- pth->size = size;
+ pth->env = env;
pth->is_query = is_query;
pth->priority = priority;
pth->cp = cp;
cp->ppd.pending_queries++;
else if (GNUNET_NO == is_query)
cp->ppd.pending_replies++;
- pth->timeout_task
- = GNUNET_SCHEDULER_add_delayed (timeout,
- &peer_transmit_timeout,
- pth);
schedule_transmission (pth);
- return pth;
-}
-
-
-/**
- * Cancel an earlier request for transmission.
- *
- * @param pth request to cancel
- */
-void
-GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
-{
- struct GSF_ConnectedPeer *cp;
-
- if (NULL != pth->timeout_task)
- {
- GNUNET_SCHEDULER_cancel (pth->timeout_task);
- pth->timeout_task = NULL;
- }
- cp = pth->cp;
- GNUNET_CONTAINER_DLL_remove (cp->pth_head,
- cp->pth_tail,
- pth);
- if (GNUNET_YES == pth->is_query)
- GNUNET_assert (0 < cp->ppd.pending_queries--);
- else if (GNUNET_NO == pth->is_query)
- GNUNET_assert (0 < cp->ppd.pending_replies--);
- GNUNET_free (pth);
}
GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
sizeof (cp->ppd.respect),
GNUNET_TIME_UNIT_FOREVER_ABS,
- GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL);
+ GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+ NULL,
+ NULL);
return GNUNET_OK;
}
* record.
*
* @param cls unused
- * @param peer identity of peer that connected
+ * @param peer identity of peer that disconnected
+ * @param internal_cls the corresponding `struct GSF_ConnectedPeer`
*/
void
-GSF_peer_disconnect_handler_ (void *cls,
- const struct GNUNET_PeerIdentity *peer)
+GSF_peer_disconnect_handler (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ void *internal_cls)
{
- struct GSF_ConnectedPeer *cp;
+ struct GSF_ConnectedPeer *cp = internal_cls;
struct GSF_PeerTransmitHandle *pth;
struct GSF_DelayedHandle *dh;
- cp = GSF_peer_get_ (peer);
if (NULL == cp)
- return; /* must have been disconnect from core with
- * 'peer' == my_id, ignore */
- flush_respect (NULL, peer, cp);
+ return; /* must have been disconnect from core with
+ * 'peer' == my_id, ignore */
+ flush_respect (NULL,
+ peer,
+ cp);
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (cp_map,
peer,
cp));
- GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
+ GNUNET_STATISTICS_set (GSF_stats,
+ gettext_noop ("# peers connected"),
GNUNET_CONTAINER_multipeermap_size (cp_map),
GNUNET_NO);
if (NULL != cp->respect_iterate_req)
GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
cp->respect_iterate_req = NULL;
}
- if (NULL != cp->migration_pth)
- {
- GSF_peer_transmit_cancel_ (cp->migration_pth);
- cp->migration_pth = NULL;
- }
if (NULL != cp->rc)
{
GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
0,
sizeof (cp->ppd.last_p2p_replies));
GSF_push_stop_ (cp);
- if (NULL != cp->cth)
- {
- GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
- cp->cth = NULL;
- }
- GNUNET_assert (0 == cp->cth_in_progress);
while (NULL != (pth = cp->pth_head))
{
- if (pth->timeout_task != NULL)
- {
- GNUNET_SCHEDULER_cancel (pth->timeout_task);
- pth->timeout_task = NULL;
- }
GNUNET_CONTAINER_DLL_remove (cp->pth_head,
cp->pth_tail,
pth);
GNUNET_assert (0 < cp->ppd.pending_queries--);
else if (GNUNET_NO == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_replies--);
- pth->gmc (pth->gmc_cls, 0, NULL);
GNUNET_free (pth);
}
while (NULL != (dh = cp->delayed_head))
GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
cp->delayed_tail,
dh);
+ GNUNET_MQ_discard (dh->env);
cp->delay_queue_size--;
GNUNET_SCHEDULER_cancel (dh->delay_task);
- GNUNET_free (dh->pm);
GNUNET_free (dh);
}
GNUNET_PEER_change_rc (cp->ppd.pid, -1);
}
-/**
- * Assemble a migration stop message for transmission.
- *
- * @param cls the `struct GSF_ConnectedPeer` to use
- * @param size number of bytes we're allowed to write to @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
- */
-static size_t
-create_migration_stop_message (void *cls,
- size_t size,
- void *buf)
-{
- struct GSF_ConnectedPeer *cp = cls;
- struct MigrationStopMessage msm;
-
- cp->migration_pth = NULL;
- if (NULL == buf)
- return 0;
- GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
- msm.header.size = htons (sizeof (struct MigrationStopMessage));
- msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
- msm.reserved = htonl (0);
- msm.duration =
- GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
- (cp->last_migration_block));
- GNUNET_memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# migration stop messages sent"),
- 1, GNUNET_NO);
- return sizeof (struct MigrationStopMessage);
-}
-
-
/**
* Ask a peer to stop migrating data to us until the given point
* in time.
GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
struct GNUNET_TIME_Absolute block_time)
{
+ struct GNUNET_MQ_Envelope *env;
+ struct MigrationStopMessage *msm;
+
if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
GNUNET_YES));
cp->last_migration_block = block_time;
- if (NULL != cp->migration_pth)
- GSF_peer_transmit_cancel_ (cp->migration_pth);
- cp->migration_pth =
- GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
- GNUNET_TIME_UNIT_FOREVER_REL,
- sizeof (struct MigrationStopMessage),
- &create_migration_stop_message, cp);
+ env = GNUNET_MQ_msg (msm,
+ GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+ msm->reserved = htonl (0);
+ msm->duration
+ = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
+ (cp->last_migration_block));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# migration stop messages sent"),
+ 1,
+ GNUNET_NO);
+ GSF_peer_transmit_ (cp,
+ GNUNET_SYSERR,
+ UINT32_MAX,
+ env);
}
}
-/**
- * Iterator to free peer entries.
- *
- * @param cls closure, unused
- * @param key current key code
- * @param value value in the hash map (peer entry)
- * @return #GNUNET_YES (we should continue to iterate)
- */
-static int
-clean_peer (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
-{
- GSF_peer_disconnect_handler_ (NULL, key);
- return GNUNET_YES;
-}
-
-
/**
* Shutdown peer management subsystem.
*/
GNUNET_CONTAINER_multipeermap_iterate (cp_map,
&flush_respect,
NULL);
- GNUNET_CONTAINER_multipeermap_iterate (cp_map,
- &clean_peer,
- NULL);
GNUNET_SCHEDULER_cancel (fr_task);
fr_task = NULL;
GNUNET_CONTAINER_multipeermap_destroy (cp_map);
{
if (NULL == cp_map)
return; /* already cleaned up */
- GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client,
+ GNUNET_CONTAINER_multipeermap_iterate (cp_map,
+ &clean_local_client,
(void *) lc);
}
double avg_priority;
/**
- * The peer's identity.
+ * The peer's identity (interned version).
*/
GNUNET_PEER_Id pid;
+ /**
+ * The peer's identity (pointer).
+ */
+ const struct GNUNET_PeerIdentity *peer;
+
/**
* Respect rating for this peer
*/
int success);
-/**
- * Function called after the creation of a connected peer record is complete.
- *
- * @param cls closure
- * @param cp handle to the newly created connected peer record
- */
-typedef void
-(*GSF_ConnectedPeerCreationCallback) (void *cls,
- struct GSF_ConnectedPeer *cp);
-
-
/**
* Handle to cancel a transmission request.
*/
* A peer connected to us. Setup the connected peer
* records.
*
+ * @param cls NULL
* @param peer identity of peer that connected
- * @param creation_cb callback function when the record is created.
- * @param creation_cb_cls closure for @creation_cb
+ * @param mq queue for sending messages to @a peer
+ * @return internal handle for the peer
*/
-void
-GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
- GSF_ConnectedPeerCreationCallback creation_cb,
- void *creation_cb_cls);
+void *
+GSF_peer_connect_handler (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ struct GNUNET_MQ_Handle *mq);
/**
* the callback is invoked with a 'NULL' buffer.
*
* @param cp target peer
- * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO)
+ * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO)
* @param priority how important is this request?
- * @param timeout when does this request timeout (call gmc with error)
- * @param size number of bytes we would like to send to the peer
- * @param gmc function to call to get the message
- * @param gmc_cls closure for gmc
- * @return handle to cancel request
+ * @param env envelope of message to send
*/
-struct GSF_PeerTransmitHandle *
+void
GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
int is_query,
uint32_t priority,
- struct GNUNET_TIME_Relative timeout,
- size_t size, GSF_GetMessageCallback gmc,
- void *gmc_cls);
-
-
-/**
- * Cancel an earlier request for transmission.
- *
- * @param pth request to cancel
- */
-void
-GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth);
+ struct GNUNET_MQ_Envelope *env);
/**
/**
- * Handle P2P "MIGRATION_STOP" message.
+ * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
*
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- * for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return #GNUNET_OK to keep the connection open,
- * #GNUNET_SYSERR to close it (signal serious error)
+ * @param cls closure, the `struct GSF_ConnectedPeer`
+ * @param msm the actual message
*/
-int
-GSF_handle_p2p_migration_stop_ (void *cls,
- const struct GNUNET_PeerIdentity *other,
- const struct GNUNET_MessageHeader *message);
+void
+handle_p2p_migration_stop (void *cls,
+ const struct MigrationStopMessage *message);
/**
- * Handle P2P "QUERY" message. Only responsible for creating the
- * request entry itself and setting up reply callback and cancellation
- * on peer disconnect. Does NOT execute the actual request strategy
- * (planning) or local database operations.
+ * Handle P2P "QUERY" message.
*
- * @param other the other peer involved (sender or receiver, NULL
- * for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return pending request handle, NULL on error
+ * @param cls the `struct GSF_ConnectedPeer` of the other sender
+ * @param gm the actual message
*/
-struct GSF_PendingRequest *
-GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
- const struct GNUNET_MessageHeader *message);
+void
+handle_p2p_get (void *cls,
+ const struct GetMessage *gm);
/**
*
* @param cls unused
* @param peer identity of peer that connected
+ * @param internal_cls our `struct GSF_ConnectedPeer` for @a peer
*/
void
-GSF_peer_disconnect_handler_ (void *cls,
- const struct GNUNET_PeerIdentity *peer);
+GSF_peer_disconnect_handler (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ void *internal_cls);
/**
*/
struct GNUNET_CONTAINER_MultiHashMap *plan_map;
- /**
- * Current transmission request handle.
- */
- struct GSF_PeerTransmitHandle *pth;
-
/**
* Peer for which this is the plan.
*/
* Current task for executing the plan.
*/
struct GNUNET_SCHEDULER_Task *task;
+
+ /**
+ * Current message under transmission for the plan.
+ */
+ struct GNUNET_MQ_Envelope *env;
+
};
}
-/**
- * Figure out when and how to transmit to the given peer.
- *
- * @param cls the `struct GSF_ConnectedPeer` for transmission
- */
-static void
-schedule_peer_transmission (void *cls);
-
-
/**
* Insert the given request plan into the heap with the appropriate weight.
*
rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Earliest (re)transmission for `%s' in %us\n",
- GNUNET_h2s (&prd->query), rp->transmission_counter);
+ GNUNET_h2s (&prd->query),
+ rp->transmission_counter);
GNUNET_assert (rp->hn == NULL);
if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us)
- rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
+ rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
+ rp,
+ rp->priority);
else
rp->hn =
- GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp,
+ GNUNET_CONTAINER_heap_insert (pp->delay_heap,
+ rp,
rp->earliest_transmission.abs_value_us);
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
get_rp_key (rp),
rp));
- if (NULL != pp->task)
- GNUNET_SCHEDULER_cancel (pp->task);
- pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
#undef N
}
}
-/**
- * Function called to get a message for transmission.
- *
- * @param cls closure
- * @param buf_size number of bytes available in @a buf
- * @param buf where to copy the message, NULL on error (peer disconnect)
- * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
- */
-static size_t
-transmit_message_callback (void *cls,
- size_t buf_size,
- void *buf)
-{
- struct PeerPlan *pp = cls;
- struct GSF_RequestPlan *rp;
- size_t msize;
-
- pp->pth = NULL;
- if (NULL == buf)
- {
- /* failed, try again... */
- if (NULL != pp->task)
- GNUNET_SCHEDULER_cancel (pp->task);
-
- pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop
- ("# transmission failed (core has no bandwidth)"),
- 1, GNUNET_NO);
- return 0;
- }
- rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
- if (NULL == rp)
- {
- if (NULL != pp->task)
- GNUNET_SCHEDULER_cancel (pp->task);
- pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
- return 0;
- }
- msize = GSF_pending_request_get_message_ (get_latest (rp),
- buf_size,
- buf);
- if (msize > buf_size)
- {
- if (NULL != pp->task)
- GNUNET_SCHEDULER_cancel (pp->task);
- /* buffer to small (message changed), try again */
- pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
- return 0;
- }
- /* remove from root, add again elsewhere... */
- GNUNET_assert (rp ==
- GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
- rp->hn = NULL;
- rp->last_transmission = GNUNET_TIME_absolute_get ();
- rp->transmission_counter++;
- total_delay++;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Executing plan %p executed %u times, planning retransmission\n",
- rp, rp->transmission_counter);
- plan (pp, rp);
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# query messages sent to other peers"),
- 1,
- GNUNET_NO);
- return msize;
-}
-
-
/**
* Figure out when and how to transmit to the given peer.
*
{
struct PeerPlan *pp = cls;
struct GSF_RequestPlan *rp;
- size_t msize;
struct GNUNET_TIME_Relative delay;
- pp->task = NULL;
- if (NULL != pp->pth)
+ if (NULL != pp->task)
+ {
+ pp->task = NULL;
+ }
+ else
{
- GSF_peer_transmit_cancel_ (pp->pth);
- pp->pth = NULL;
+ GNUNET_assert (NULL != pp->env);
+ pp->env = NULL;
}
/* move ready requests to priority queue */
while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
return;
}
#if INSANE_STATISTICS
- GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"),
- 1, GNUNET_NO);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# query plans executed"),
+ 1,
+ GNUNET_NO);
#endif
/* process from priority heap */
- rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
+ rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Executing query plan %p\n",
rp);
GNUNET_assert (NULL != rp);
- msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
- pp->pth =
- GSF_peer_transmit_ (pp->cp, GNUNET_YES,
- rp->priority,
- GNUNET_TIME_UNIT_FOREVER_REL,
- msize,
- &transmit_message_callback, pp);
- GNUNET_assert (NULL != pp->pth);
+ rp->hn = NULL;
+ rp->last_transmission = GNUNET_TIME_absolute_get ();
+ rp->transmission_counter++;
+ total_delay++;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Executing plan %p executed %u times, planning retransmission\n",
+ rp,
+ rp->transmission_counter);
+ GNUNET_assert (NULL == pp->env);
+ pp->env = GSF_pending_request_get_message_ (get_latest (rp));
+ GNUNET_MQ_notify_sent (pp->env,
+ &schedule_peer_transmission,
+ pp);
+ GSF_peer_transmit_ (pp->cp,
+ GNUNET_YES,
+ rp->priority,
+ pp->env);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# query messages sent to other peers"),
+ 1,
+ GNUNET_NO);
+ plan (pp,
+ rp);
}
id,
pp,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
+ pp);
}
mpc.merged = GNUNET_NO;
mpc.pr = pr;
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (plans, id,
pp));
- if (NULL != pp->pth)
- {
- GSF_peer_transmit_cancel_ (pp->pth);
- pp->pth = NULL;
- }
if (NULL != pp->task)
{
GNUNET_SCHEDULER_cancel (pp->task);
pp->task = NULL;
}
+ if (NULL != pp->env)
+ {
+ GNUNET_MQ_send_cancel (pp->env);
+ pp->env = NULL;
+ }
while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
{
GNUNET_break (GNUNET_YES ==
/**
* Generate the message corresponding to the given pending request for
- * transmission to other peers (or at least determine its size).
+ * transmission to other peers.
*
* @param pr request to generate the message for
- * @param buf_size number of bytes available in @a buf
- * @param buf where to copy the message (can be NULL)
- * @return number of bytes needed (if `>` @a buf_size) or used
+ * @return envelope with the request message
*/
-size_t
-GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
- size_t buf_size, void *buf)
+struct GNUNET_MQ_Envelope *
+GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
{
- char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
+ struct GNUNET_MQ_Envelope *env;
struct GetMessage *gm;
struct GNUNET_PeerIdentity *ext;
- size_t msize;
unsigned int k;
uint32_t bm;
uint32_t prio;
int64_t ttl;
int do_route;
- if (buf_size > 0)
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Building request message for `%s' of type %d\n",
- GNUNET_h2s (&pr->public_data.query),
- pr->public_data.type);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Building request message for `%s' of type %d\n",
+ GNUNET_h2s (&pr->public_data.query),
+ pr->public_data.type);
k = 0;
bm = 0;
do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
k++;
}
bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
- msize = sizeof (struct GetMessage) + bf_size + k * sizeof (struct GNUNET_PeerIdentity);
- GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
- if (buf_size < msize)
- return msize;
- gm = (struct GetMessage *) lbuf;
- gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
- gm->header.size = htons (msize);
+ env = GNUNET_MQ_msg_extra (gm,
+ bf_size + k * sizeof (struct GNUNET_PeerIdentity),
+ GNUNET_MESSAGE_TYPE_FS_GET);
gm->type = htonl (pr->public_data.type);
if (do_route)
prio =
gm->query = pr->public_data.query;
ext = (struct GNUNET_PeerIdentity *) &gm[1];
k = 0;
- if (!do_route)
+ if (! do_route)
GNUNET_PEER_resolve (pr->sender_pid,
&ext[k++]);
if (NULL != pr->public_data.target)
GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
(char *) &ext[k],
bf_size));
- GNUNET_memcpy (buf, gm, msize);
- return msize;
+ return env;
}
* this content and possibly passes it on (to local clients or other
* peers). Does NOT perform migration (content caching at this peer).
*
- * @param cp the other peer involved (sender or receiver, NULL
- * for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return #GNUNET_OK if the message was well-formed,
- * #GNUNET_SYSERR if the message was malformed (close connection,
- * do not cache under any circumstances)
+ * @param cls the other peer involved
+ * @param put the actual message
*/
-int
-GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
- const struct GNUNET_MessageHeader *message)
+void
+handle_p2p_put (void *cls,
+ const struct PutMessage *put)
{
- const struct PutMessage *put;
+ struct GSF_ConnectedPeer *cp = cls;
uint16_t msize;
size_t dsize;
enum GNUNET_BLOCK_Type type;
double putl;
struct PutMigrationContext *pmc;
- msize = ntohs (message->size);
- if (msize < sizeof (struct PutMessage))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- put = (const struct PutMessage *) message;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received P2P PUT from %s\n",
+ GNUNET_i2s (GSF_get_peer_performance_data_ (cp)->peer));
+ GSF_cover_content_count++;
+ msize = ntohs (put->header.size);
dsize = msize - sizeof (struct PutMessage);
type = ntohl (put->type);
expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
/* do not allow migrated content to live longer than 1 year */
expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
expiration);
- if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
- return GNUNET_SYSERR;
if (GNUNET_OK !=
GNUNET_BLOCK_get_key (GSF_block_ctx,
type,
&query))
{
GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ return;
}
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# GAP PUT messages received"),
GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
&pmc->origin);
if (NULL ==
- GNUNET_DATASTORE_put (GSF_dsh, 0, &query, dsize, &put[1], type,
- prq.priority, 1 /* anonymity */ ,
+ GNUNET_DATASTORE_put (GSF_dsh,
+ 0,
+ &query,
+ dsize,
+ &put[1],
+ type,
+ prq.priority,
+ 1 /* anonymity */ ,
0 /* replication */ ,
- expiration, 1 + prq.priority, MAX_DATASTORE_QUEUE,
- &put_migration_continuation, pmc))
+ expiration, 1 + prq.priority,
+ MAX_DATASTORE_QUEUE,
+ &put_migration_continuation,
+ pmc))
{
put_migration_continuation (pmc,
GNUNET_SYSERR,
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Choosing not to keep content `%s' (%d/%d)\n",
- GNUNET_h2s (&query), active_to_migration,
+ GNUNET_h2s (&query),
+ active_to_migration,
test_put_load_too_high (prq.priority));
}
putl = GNUNET_LOAD_get_load (datastore_put_load);
putl,
active_to_migration,
(GNUNET_NO == prq.request_found));
- GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (block_time));
+ GSF_block_peer_migration_ (cp,
+ GNUNET_TIME_relative_to_absolute (block_time));
}
- return GNUNET_OK;
}
*/
GSF_PRO_DEFAULTS = 0,
- /**
- * Request must only be processed locally.
- */
+ /**
+ * Request must only be processed locally.
+ */
GSF_PRO_LOCAL_ONLY = 1,
- /**
- * Request must only be forwarded (no routing)
- */
+ /**
+ * Request must only be forwarded (no routing)
+ */
GSF_PRO_FORWARD_ONLY = 2,
- /**
- * Request persists indefinitely (no expiration).
- */
+ /**
+ * Request persists indefinitely (no expiration).
+ */
GSF_PRO_REQUEST_NEVER_EXPIRES = 4,
- /**
- * Request is allowed to refresh bloomfilter and change mingle value.
- */
+ /**
+ * Request is allowed to refresh bloomfilter and change mingle value.
+ */
GSF_PRO_BLOOMFILTER_FULL_REFRESH = 8,
- /**
- * Request priority is allowed to be exceeded.
- */
+ /**
+ * Request priority is allowed to be exceeded.
+ */
GSF_PRO_PRIORITY_UNLIMITED = 16,
- /**
- * Option mask for typical local requests.
- */
+ /**
+ * Option mask for typical local requests.
+ */
GSF_PRO_LOCAL_REQUEST =
(GSF_PRO_BLOOMFILTER_FULL_REFRESH | GSF_PRO_PRIORITY_UNLIMITED | GSF_PRO_REQUEST_NEVER_EXPIRES)
};
/**
* Generate the message corresponding to the given pending request for
- * transmission to other peers (or at least determine its size).
+ * transmission to other peers.
*
* @param pr request to generate the message for
- * @param buf_size number of bytes available in @a buf
- * @param buf where to copy the message (can be NULL)
- * @return number of bytes needed (if @a buf_size too small) or used
+ * @return envelope with the request message
*/
-size_t
-GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
- size_t buf_size,
- void *buf);
+struct GNUNET_MQ_Envelope *
+GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr);
/**
* this content and possibly passes it on (to local clients or other
* peers). Does NOT perform migration (content caching at this peer).
*
- * @param cp the other peer involved (sender or receiver, NULL
- * for loopback messages where we are both sender and receiver)
- * @param message the actual message
- * @return #GNUNET_OK if the message was well-formed,
- * #GNUNET_SYSERR if the message was malformed (close connection,
- * do not cache under any circumstances)
+ * @param cls the other peer involved (sender)
+ * @param put the actual message
*/
-int
-GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
- const struct GNUNET_MessageHeader *message);
+void
+handle_p2p_put (void *cls,
+ const struct PutMessage *put);
/**
/*
This file is part of GNUnet.
- Copyright (C) 2011 GNUnet e.V.
+ Copyright (C) 2011, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
/**
- * Information about a peer waiting for
- * migratable data.
+ * Information about a peer waiting for migratable data.
*/
struct MigrationReadyPeer
{
struct GSF_ConnectedPeer *peer;
/**
- * Handle for current transmission request,
- * or NULL for none.
+ * Envelope of the currently pushed message.
*/
- struct GSF_PeerTransmitHandle *th;
-
- /**
- * Message we are trying to push right now (or NULL)
- */
- struct PutMessage *msg;
+ struct GNUNET_MQ_Envelope *env;
};
/**
* ID of task that collects blocks for migration.
*/
-static struct GNUNET_SCHEDULER_Task * mig_task;
+static struct GNUNET_SCHEDULER_Task *mig_task;
/**
* What is the maximum frequency at which we are allowed to
static void
delete_migration_block (struct MigrationReadyBlock *mb)
{
- GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb);
- GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE);
+ GNUNET_CONTAINER_DLL_remove (mig_head,
+ mig_tail,
+ mb);
+ GNUNET_PEER_decrement_rcs (mb->target_list,
+ MIGRATION_LIST_SIZE);
mig_size--;
GNUNET_free (mb);
}
/**
* Find content for migration to this peer.
- */
-static void
-find_content (struct MigrationReadyPeer *mrp);
-
-
-/**
- * Transmit the message currently scheduled for transmission.
*
- * @param cls the `struct MigrationReadyPeer`
- * @param buf_size number of bytes available in @a buf
- * @param buf where to copy the message, NULL on error (peer disconnect)
- * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
+ * @param cls a `struct MigrationReadyPeer *`
*/
-static size_t
-transmit_message (void *cls,
- size_t buf_size,
- void *buf)
-{
- struct MigrationReadyPeer *peer = cls;
- struct PutMessage *msg;
- uint16_t msize;
-
- peer->th = NULL;
- msg = peer->msg;
- peer->msg = NULL;
- if (NULL == buf)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to migrate content to another peer (disconnect)\n");
- GNUNET_free (msg);
- return 0;
- }
- msize = ntohs (msg->header.size);
- GNUNET_assert (msize <= buf_size);
- GNUNET_memcpy (buf, msg, msize);
- GNUNET_free (msg);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Pushing %u bytes to %s\n",
- msize,
- GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
- find_content (peer);
- return msize;
-}
+static void
+find_content (void *cls);
/**
* @return #GNUNET_YES if the block was deleted (!)
*/
static int
-transmit_content (struct MigrationReadyPeer *peer,
+transmit_content (struct MigrationReadyPeer *mrp,
struct MigrationReadyBlock *block)
{
- size_t msize;
struct PutMessage *msg;
unsigned int i;
struct GSF_PeerPerformanceData *ppd;
int ret;
- ppd = GSF_get_peer_performance_data_ (peer->peer);
- GNUNET_assert (NULL == peer->th);
- msize = sizeof (struct PutMessage) + block->size;
- msg = GNUNET_malloc (msize);
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
- msg->header.size = htons (msize);
+ ppd = GSF_get_peer_performance_data_ (mrp->peer);
+ mrp->env = GNUNET_MQ_msg_extra (msg,
+ block->size,
+ GNUNET_MESSAGE_TYPE_FS_PUT);
msg->type = htonl (block->type);
msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
- GNUNET_memcpy (&msg[1], &block[1], block->size);
- peer->msg = msg;
+ GNUNET_memcpy (&msg[1],
+ &block[1],
+ block->size);
for (i = 0; i < MIGRATION_LIST_SIZE; i++)
{
if (block->target_list[i] == 0)
{
block->target_list[i] = ppd->pid;
- GNUNET_PEER_change_rc (block->target_list[i], 1);
+ GNUNET_PEER_change_rc (block->target_list[i],
+ 1);
break;
}
}
{
ret = GNUNET_NO;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asking for transmission of %u bytes to %s for migration\n",
- (unsigned int) msize,
- GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
- peer->th = GSF_peer_transmit_ (peer->peer,
- GNUNET_NO, 0 /* priority */ ,
- GNUNET_TIME_UNIT_FOREVER_REL,
- msize,
- &transmit_message, peer);
+ GNUNET_MQ_notify_sent (mrp->env,
+ &find_content,
+ mrp);
+ GSF_peer_transmit_ (mrp->peer,
+ GNUNET_NO,
+ 0 /* priority */ ,
+ mrp->env);
return ret;
}
* Check if sending this block to this peer would
* be a good idea.
*
- * @param peer target peer
+ * @param mrp target peer
* @param block the block
* @return score (>= 0: feasible, negative: infeasible)
*/
static long
-score_content (struct MigrationReadyPeer *peer,
+score_content (struct MigrationReadyPeer *mrp,
struct MigrationReadyBlock *block)
{
unsigned int i;
struct GNUNET_HashCode hc;
uint32_t dist;
- ppd = GSF_get_peer_performance_data_ (peer->peer);
+ ppd = GSF_get_peer_performance_data_ (mrp->peer);
for (i = 0; i < MIGRATION_LIST_SIZE; i++)
if (block->target_list[i] == ppd->pid)
return -1;
GNUNET_assert (0 != ppd->pid);
- GNUNET_PEER_resolve (ppd->pid, &id);
- GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc);
- dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc);
+ GNUNET_PEER_resolve (ppd->pid,
+ &id);
+ GNUNET_CRYPTO_hash (&id,
+ sizeof (struct GNUNET_PeerIdentity),
+ &hc);
+ dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
+ &hc);
/* closer distance, higher score: */
return UINT32_MAX - dist;
}
/**
* Find content for migration to this peer.
*
- * @param mrp peer to find content for
+ * @param cls peer to find content for
*/
static void
-find_content (struct MigrationReadyPeer *mrp)
+find_content (void *cls)
{
+ struct MigrationReadyPeer *mrp = cls;
struct MigrationReadyBlock *pos;
long score;
long best_score;
struct MigrationReadyBlock *best;
- GNUNET_assert (NULL == mrp->th);
+ mrp->env = NULL;
best = NULL;
best_score = -1;
pos = mig_head;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Preparing to push best content to peer\n");
- transmit_content (mrp, best);
+ transmit_content (mrp,
+ best);
}
return;
if (mig_size >= MAX_MIGRATION_QUEUE)
return;
- delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size);
- delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE);
- delay = GNUNET_TIME_relative_max (delay, min_migration_delay);
+ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ mig_size);
+ delay = GNUNET_TIME_relative_divide (delay,
+ MAX_MIGRATION_QUEUE);
+ delay = GNUNET_TIME_relative_max (delay,
+ min_migration_delay);
if (GNUNET_NO == value_found)
{
/* wait at least 5s if the datastore is empty */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Scheduling gathering task (queue size: %u)\n",
mig_size);
- mig_task =
- GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL);
+ mig_task = GNUNET_SCHEDULER_add_delayed (delay,
+ &gather_migration_blocks,
+ NULL);
}
mig_size++;
for (pos = peer_head; NULL != pos; pos = pos->next)
{
- if (NULL == pos->th)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Preparing to push best content to peer %s\n",
- GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
- if (GNUNET_YES == transmit_content (pos, mb))
- break; /* 'mb' was freed! */
- }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Preparing to push best content to peer %s\n",
+ GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
+ if (GNUNET_YES == transmit_content (pos,
+ mb))
+ break; /* 'mb' was freed! */
}
consider_gathering ();
}
"Asking datastore for content for replication (queue size: %u)\n",
mig_size);
value_found = GNUNET_NO;
- mig_qe =
- GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX,
- &process_migration_content, NULL);
+ mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
+ 0,
+ UINT_MAX,
+ &process_migration_content,
+ NULL);
if (NULL == mig_qe)
consider_gathering ();
}
break;
if (NULL == pos)
return;
+ if (NULL != pos->env)
+ GNUNET_MQ_send_cancel (pos->env);
GNUNET_CONTAINER_DLL_remove (peer_head,
peer_tail,
pos);
- if (NULL != pos->th)
- {
- GSF_peer_transmit_cancel_ (pos->th);
- pos->th = NULL;
- }
- if (NULL != pos->msg)
- {
- GNUNET_free (pos->msg);
- pos->msg = NULL;
- }
GNUNET_free (pos);
}
GSF_push_init_ ()
{
enabled =
- GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING");
+ GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
+ "FS",
+ "CONTENT_PUSHING");
if (GNUNET_YES != enabled)
return;
if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY",
+ GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
+ "fs",
+ "MIN_MIGRATION_DELAY",
&min_migration_delay))
{
GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
- "fs", "MIN_MIGRATION_DELAY",
+ "fs",
+ "MIN_MIGRATION_DELAY",
_("time required, content pushing disabled"));
return;
}