#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
-/**
+/**
* Handle for a mesh to another peer.
*/
struct MeshHandle;
enum GNUNET_BLOCK_Type type;
/**
- * Did we transmit this request already? YES if we are
- * in the 'waiting_map', NO if we are in the 'pending' DLL.
+ * Did we transmit this request already? #GNUNET_YES if we are
+ * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
*/
int was_transmitted;
};
-/**
+/**
* Handle for a mesh to another peer.
*/
struct MeshHandle
struct GSF_MeshRequest *pending_tail;
/**
- * Map from query to 'struct GSF_MeshRequest's waiting for
+ * Map from query to `struct GSF_MeshRequest`s waiting for
* a reply.
*/
struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
/**
- * Tunnel to the other peer.
+ * Channel to the other peer.
*/
- struct GNUNET_MESH_Tunnel *tunnel;
+ struct GNUNET_MESH_Channel *channel;
/**
* Handle for active write operation, or NULL.
- */
+ */
struct GNUNET_MESH_TransmitHandle *wh;
/**
* Which peer does this mesh go to?
- */
+ */
struct GNUNET_PeerIdentity target;
/**
/**
- * Mesh tunnel for creating outbound tunnels.
+ * Mesh channel for creating outbound channels.
*/
static struct GNUNET_MESH_Handle *mesh_handle;
/**
* Map from peer identities to 'struct MeshHandles' with mesh
- * tunnels to those peers.
+ * channels to those peers.
*/
-static struct GNUNET_CONTAINER_MultiHashMap *mesh_map;
+static struct GNUNET_CONTAINER_MultiPeerMap *mesh_map;
/* ********************* client-side code ************************* */
/**
- * Iterator called on each entry in a waiting map to
+ * Iterator called on each entry in a waiting map to
* move it back to the pending list.
*
- * @param cls the 'struct MeshHandle'
+ * @param cls the `struct MeshHandle`
* @param key the key of the entry in the map (the query)
- * @param value the 'struct GSF_MeshRequest' to move to pending
- * @return GNUNET_YES (continue to iterate)
+ * @param value the `struct GSF_MeshRequest` to move to pending
+ * @return #GNUNET_YES (continue to iterate)
*/
static int
move_to_pending (void *cls,
{
struct MeshHandle *mh = cls;
struct GSF_MeshRequest *sr = value;
-
+
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
key,
static void
reset_mesh (struct MeshHandle *mh)
{
+ struct GNUNET_MESH_Channel *channel = mh->channel;
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Resetting mesh tunnel to %s\n",
+ "Resetting mesh channel to %s\n",
GNUNET_i2s (&mh->target));
- GNUNET_mesh_handle_destroy (mh->tunnel);
+ mh->channel = NULL;
+ if (NULL != channel)
+ GNUNET_MESH_channel_destroy (channel);
GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
&move_to_pending,
mh);
- mh->tunnel = GNUNET_mesh_handle_create (mesh_handle,
+ mh->channel = GNUNET_MESH_channel_create (mesh_handle,
mh,
&mh->target,
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- GNUNET_YES,
- GNUNET_YES);
+ GNUNET_MESH_OPTION_RELIABLE);
+ transmit_pending (mh);
}
/**
- * Task called when it is time to destroy an inactive mesh tunnel.
+ * Task called when it is time to destroy an inactive mesh channel.
*
- * @param cls the 'struct MeshHandle' to tear down
+ * @param cls the `struct MeshHandle` to tear down
* @param tc scheduler context, unused
*/
static void
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct MeshHandle *mh = cls;
- struct GNUNET_Mesh_Handle *tun;
+ struct GNUNET_MESH_Channel *tun;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout on mesh tunnel to %s\n",
+ "Timeout on mesh channel to %s\n",
GNUNET_i2s (&mh->target));
mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- tun = mh->tunnel;
- mh->tunnel = NULL;
- GNUNET_mesh_handle_destroy (tun);
+ tun = mh->channel;
+ mh->channel = NULL;
+ GNUNET_MESH_channel_destroy (tun);
}
/**
* Task called when it is time to reset an mesh.
*
- * @param cls the 'struct MeshHandle' to tear down
+ * @param cls the `struct MeshHandle` to tear down
* @param tc scheduler context, unused
*/
static void
* query via a mesh.
*
* @param cls the struct MeshHandle for which we did the write call
- * @param size the number of bytes that can be written to 'buf'
+ * @param size the number of bytes that can be written to @a buf
* @param buf where to write the message
- * @return number of bytes written to 'buf'
+ * @return number of bytes written to @a buf
*/
static size_t
transmit_sqm (void *cls,
if (NULL == buf)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Mesh tunnel to %s failed during transmission attempt, rebuilding\n",
+ "Mesh channel to %s failed during transmission attempt, rebuilding\n",
GNUNET_i2s (&mh->target));
- reset_mesh (mh);
+ reset_mesh_async (mh);
return 0;
}
sr = mh->pending_head;
transmit_pending (mh);
return sizeof (sqm);
}
-
+
/**
* Transmit pending requests via the mesh.
static void
transmit_pending (struct MeshHandle *mh)
{
+ if (NULL == mh->channel)
+ return;
if (NULL != mh->wh)
return;
- mh->wh = GNUNET_MESH_notify_transmit_ready (mh->tunnel, GNUNET_YES /* allow cork */,
+ mh->wh = GNUNET_MESH_notify_transmit_ready (mh->channel, GNUNET_YES /* allow cork */,
GNUNET_TIME_UNIT_FOREVER_REL,
sizeof (struct MeshQueryMessage),
&transmit_sqm, mh);
/**
- * Closure for 'handle_reply'.
+ * Closure for handle_reply().
*/
struct HandleReplyClosure
{
/**
* Reply payload.
- */
+ */
const void *data;
/**
*/
size_t data_size;
- /**
+ /**
* Type of the block.
*/
enum GNUNET_BLOCK_Type type;
-
+
/**
* Did we have a matching query?
*/
/**
- * Iterator called on each entry in a waiting map to
+ * Iterator called on each entry in a waiting map to
* process a result.
*
- * @param cls the 'struct HandleReplyClosure'
+ * @param cls the `struct HandleReplyClosure`
* @param key the key of the entry in the map (the query)
- * @param value the 'struct GSF_MeshRequest' to handle result for
- * @return GNUNET_YES (continue to iterate)
+ * @param value the `struct GSF_MeshRequest` to handle result for
+ * @return #GNUNET_YES (continue to iterate)
*/
static int
handle_reply (void *cls,
{
struct HandleReplyClosure *hrc = cls;
struct GSF_MeshRequest *sr = value;
-
+
sr->proc (sr->proc_cls,
hrc->type,
hrc->expiration,
hrc->data_size,
hrc->data);
+ sr->proc = NULL;
GSF_mesh_query_cancel (sr);
hrc->found = GNUNET_YES;
return GNUNET_YES;
* Functions with this signature are called whenever a complete reply
* is received.
*
- * @param cls closure with the 'struct MeshHandle'
- * @param tunnel tunnel handle
- * @param tunnel_ctx tunnel context
+ * @param cls closure with the `struct MeshHandle`
+ * @param channel channel handle
+ * @param channel_ctx channel context
* @param message the actual message
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
*/
static int
reply_cb (void *cls,
- struct GNUNET_Mesh_Handle *tunnel,
- void **tunnel_ctx,
+ struct GNUNET_MESH_Channel *channel,
+ void **channel_ctx,
const struct GNUNET_MessageHeader *message)
{
- struct MeshHandle *mh = *tunnel_ctx;
+ struct MeshHandle *mh = *channel_ctx;
const struct MeshReplyMessage *srm;
struct HandleReplyClosure hrc;
uint16_t msize;
type,
&srm[1], msize, &query))
{
- GNUNET_break_op (0);
+ GNUNET_break_op (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Received bogus reply of type %u with %u bytes via mesh from peer %s\n",
+ type,
+ msize,
+ GNUNET_i2s (&mh->target));
reset_mesh_async (mh);
return GNUNET_SYSERR;
}
"Received reply `%s' via mesh from peer %s\n",
GNUNET_h2s (&query),
GNUNET_i2s (&mh->target));
- GNUNET_MESH_receive_done (tunnel);
+ GNUNET_MESH_receive_done (channel);
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received via mesh"), 1,
GNUNET_NO);
{
struct MeshHandle *mh;
- mh = GNUNET_CONTAINER_multihashmap_get (mesh_map,
- &target->hashPubKey);
+ mh = GNUNET_CONTAINER_multipeermap_get (mesh_map,
+ target);
if (NULL != mh)
{
if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
return mh;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating mesh tunnel to %s\n",
+ "Creating mesh channel to %s\n",
GNUNET_i2s (target));
mh = GNUNET_new (struct MeshHandle);
mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
mh);
mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
mh->target = *target;
- mh->tunnel = GNUNET_mesh_handle_create (mesh_handle,
- mh,
- &mh->target,
- GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- GNUNET_NO,
- GNUNET_YES);
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (mesh_map,
- &mh->target.hashPubKey,
+ GNUNET_CONTAINER_multipeermap_put (mesh_map,
+ &mh->target,
mh,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ mh->channel = GNUNET_MESH_channel_create (mesh_handle,
+ mh,
+ &mh->target,
+ GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ GNUNET_MESH_OPTION_RELIABLE);
+ GNUNET_assert (mh ==
+ GNUNET_CONTAINER_multipeermap_get (mesh_map,
+ target));
return mh;
}
* @param query hash to query for the block
* @param type desired type for the block
* @param proc function to call with result
- * @param proc_cls closure for 'proc'
+ * @param proc_cls closure for @a proc
* @return handle to cancel the operation
*/
struct GSF_MeshRequest *
GSF_mesh_query_cancel (struct GSF_MeshRequest *sr)
{
struct MeshHandle *mh = sr->mh;
+ GSF_MeshReplyProcessor p;
+ p = sr->proc;
+ sr->proc = NULL;
+ if (NULL != p)
+ {
+ /* signal failure / cancellation to callback */
+ p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+ GNUNET_TIME_UNIT_ZERO_ABS,
+ 0, NULL);
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Cancelled query for %s via mesh to %s\n",
GNUNET_h2s (&sr->query),
/**
- * Iterator called on each entry in a waiting map to
+ * Iterator called on each entry in a waiting map to
* call the 'proc' continuation and release associated
* resources.
*
- * @param cls the 'struct MeshHandle'
+ * @param cls the `struct MeshHandle`
* @param key the key of the entry in the map (the query)
- * @param value the 'struct GSF_MeshRequest' to clean up
- * @return GNUNET_YES (continue to iterate)
+ * @param value the `struct GSF_MeshRequest` to clean up
+ * @return #GNUNET_YES (continue to iterate)
*/
static int
free_waiting_entry (void *cls,
{
struct GSF_MeshRequest *sr = value;
- sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- 0, NULL);
GSF_mesh_query_cancel (sr);
return GNUNET_YES;
}
/**
* Function called by mesh when a client disconnects.
- * Cleans up our 'struct MeshClient' of that tunnel.
+ * Cleans up our `struct MeshClient` of that channel.
*
* @param cls NULL
- * @param tunnel tunnel of the disconnecting client
- * @param tunnel_ctx our 'struct MeshClient'
+ * @param channel channel of the disconnecting client
+ * @param channel_ctx our `struct MeshClient`
*/
static void
cleaner_cb (void *cls,
- const struct GNUNET_Mesh_Handle *tunnel,
- void *tunnel_ctx)
+ const struct GNUNET_MESH_Channel *channel,
+ void *channel_ctx)
{
- struct MeshHandle *mh = tunnel_ctx;
+ struct MeshHandle *mh = channel_ctx;
struct GSF_MeshRequest *sr;
- mh->tunnel = NULL;
+ if (NULL == mh->channel)
+ return; /* being destroyed elsewhere */
+ GNUNET_assert (channel == mh->channel);
+ mh->channel = NULL;
while (NULL != (sr = mh->pending_head))
- {
- sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- 0, NULL);
GSF_mesh_query_cancel (sr);
- }
+ /* first remove `mh` from the `mesh_map`, so that if the
+ callback from `free_waiting_entry()` happens to re-issue
+ the request, we don't immediately have it back in the
+ `waiting_map`. */
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_remove (mesh_map,
+ &mh->target,
+ mh));
GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
&free_waiting_entry,
mh);
GNUNET_SCHEDULER_cancel (mh->timeout_task);
if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
GNUNET_SCHEDULER_cancel (mh->reset_task);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_remove (mesh_map,
- &mh->target.hashPubKey,
- mh));
+ GNUNET_assert (0 ==
+ GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
GNUNET_free (mh);
}
{ NULL, 0, 0 }
};
- mesh_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
+ mesh_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
mesh_handle = GNUNET_MESH_connect (GSF_cfg,
NULL,
NULL,
*
* @param cls NULL
* @param key target peer, unused
- * @param value the 'struct MeshHandle' to destroy
- * @return GNUNET_YES (continue to iterate)
+ * @param value the `struct MeshHandle` to destroy
+ * @return #GNUNET_YES (continue to iterate)
*/
static int
release_meshs (void *cls,
- const struct GNUNET_HashCode *key,
+ const struct GNUNET_PeerIdentity *key,
void *value)
{
struct MeshHandle *mh = value;
- struct GNUNET_Mesh_Handle *tun;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout on mesh tunnel to %s\n",
+ "Timeout on mesh channel to %s\n",
GNUNET_i2s (&mh->target));
- tun = mh->tunnel;
- mh->tunnel = NULL;
- if (NULL != tun)
- GNUNET_mesh_handle_destroy (tun);
+ if (NULL != mh->channel)
+ GNUNET_MESH_channel_destroy (mh->channel);
return GNUNET_YES;
}
void
GSF_mesh_stop_client ()
{
- GNUNET_CONTAINER_multihashmap_iterate (mesh_map,
+ GNUNET_CONTAINER_multipeermap_iterate (mesh_map,
&release_meshs,
NULL);
- GNUNET_CONTAINER_multihashmap_destroy (mesh_map);
+ GNUNET_CONTAINER_multipeermap_destroy (mesh_map);
mesh_map = NULL;
if (NULL != mesh_handle)
{