GSF_CadetReplyProcessor proc;
/**
- * Closure for 'proc'
+ * Closure for @e proc
*/
void *proc_cls;
*/
struct GNUNET_CADET_Channel *channel;
- /**
- * Handle for active write operation, or NULL.
- */
- struct GNUNET_CADET_TransmitHandle *wh;
-
/**
* Which peer does this cadet go to?
*/
* a few seconds to give the application a chance to give
* us another query).
*/
- struct GNUNET_SCHEDULER_Task * timeout_task;
+ struct GNUNET_SCHEDULER_Task *timeout_task;
/**
* Task to reset cadets that had errors (asynchronously,
* as we may not be able to do it immediately during a
* callback from the cadet API).
*/
- struct GNUNET_SCHEDULER_Task * reset_task;
+ struct GNUNET_SCHEDULER_Task *reset_task;
};
/**
* Cadet channel for creating outbound channels.
*/
-static struct GNUNET_CADET_Handle *cadet_handle;
+struct GNUNET_CADET_Handle *cadet_handle;
/**
* Map from peer identities to 'struct CadetHandles' with cadet
* channels to those peers.
*/
-static struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
+struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
/* ********************* client-side code ************************* */
/**
* Transmit pending requests via the cadet.
*
- * @param mh cadet to process
+ * @param cls `struct CadetHandle` to process
*/
static void
-transmit_pending (struct CadetHandle *mh);
+transmit_pending (void *cls);
/**
/**
- * We had a serious error, tear down and re-create cadet from scratch.
- *
- * @param mh cadet to reset
- */
-static void
-reset_cadet (struct CadetHandle *mh)
-{
- struct GNUNET_CADET_Channel *channel = mh->channel;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Resetting cadet channel to %s\n",
- GNUNET_i2s (&mh->target));
- mh->channel = NULL;
-
- if (NULL != channel)
- {
- /* Avoid loop */
- if (NULL != mh->wh)
- {
- GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
- mh->wh = NULL;
- }
- GNUNET_CADET_channel_destroy (channel);
- }
- GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
- &move_to_pending,
- mh);
- mh->channel = GNUNET_CADET_channel_create (cadet_handle,
- mh,
- &mh->target,
- GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- GNUNET_CADET_OPTION_RELIABLE);
- transmit_pending (mh);
-}
-
-
-/**
- * Task called when it is time to destroy an inactive cadet channel.
+ * Functions with this signature are called whenever a complete reply
+ * is received.
*
- * @param cls the `struct CadetHandle` to tear down
- * @param tc scheduler context, unused
+ * @param cls closure with the `struct CadetHandle`
+ * @param srm the actual message
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
*/
-static void
-cadet_timeout (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+static int
+check_reply (void *cls,
+ const struct CadetReplyMessage *srm)
{
- struct CadetHandle *mh = cls;
- struct GNUNET_CADET_Channel *tun;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout on cadet channel to %s\n",
- GNUNET_i2s (&mh->target));
- mh->timeout_task = NULL;
- tun = mh->channel;
- mh->channel = NULL;
- if(NULL != tun)
- GNUNET_CADET_channel_destroy (tun);
+ /* We check later... */
+ return GNUNET_OK;
}
* Task called when it is time to reset an cadet.
*
* @param cls the `struct CadetHandle` to tear down
- * @param tc scheduler context, unused
*/
static void
-reset_cadet_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct CadetHandle *mh = cls;
-
- mh->reset_task = NULL;
- reset_cadet (mh);
-}
+reset_cadet_task (void *cls);
/**
}
-/**
- * Functions of this signature are called whenever we are ready to transmit
- * query via a cadet.
- *
- * @param cls the struct CadetHandle for which we did the write call
- * @param size the number of bytes that can be written to @a buf
- * @param buf where to write the message
- * @return number of bytes written to @a buf
- */
-static size_t
-transmit_sqm (void *cls,
- size_t size,
- void *buf)
-{
- struct CadetHandle *mh = cls;
- struct CadetQueryMessage sqm;
- struct GSF_CadetRequest *sr;
-
- mh->wh = NULL;
- if (NULL == buf)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Cadet channel to %s failed during transmission attempt, rebuilding\n",
- GNUNET_i2s (&mh->target));
- reset_cadet_async (mh);
- return 0;
- }
- sr = mh->pending_head;
- if (NULL == sr)
- return 0;
- GNUNET_assert (size >= sizeof (struct CadetQueryMessage));
- GNUNET_CONTAINER_DLL_remove (mh->pending_head,
- mh->pending_tail,
- sr);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
- &sr->query,
- sr,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- sr->was_transmitted = GNUNET_YES;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending query for %s via cadet to %s\n",
- GNUNET_h2s (&sr->query),
- GNUNET_i2s (&mh->target));
- sqm.header.size = htons (sizeof (sqm));
- sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
- sqm.type = htonl (sr->type);
- sqm.query = sr->query;
- memcpy (buf, &sqm, sizeof (sqm));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Successfully transmitted %u bytes via cadet to %s\n",
- (unsigned int) size,
- GNUNET_i2s (&mh->target));
- transmit_pending (mh);
- return sizeof (sqm);
-}
-
-
-/**
- * Transmit pending requests via the cadet.
- *
- * @param mh cadet to process
- */
-static void
-transmit_pending (struct CadetHandle *mh)
-{
- if (NULL == mh->channel)
- return;
- if (NULL != mh->wh)
- return;
- mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /* allow cork */,
- GNUNET_TIME_UNIT_FOREVER_REL,
- sizeof (struct CadetQueryMessage),
- &transmit_sqm, mh);
-}
-
-
/**
* Closure for handle_reply().
*/
struct GNUNET_TIME_Absolute expiration;
/**
- * Number of bytes in 'data'.
+ * Number of bytes in @e data.
*/
size_t data_size;
* @return #GNUNET_YES (continue to iterate)
*/
static int
-handle_reply (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
+process_reply (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
{
struct HandleReplyClosure *hrc = cls;
struct GSF_CadetRequest *sr = value;
}
+/**
+ * Iterator called on each entry in a waiting map to
+ * call the 'proc' continuation and release associated
+ * resources.
+ *
+ * @param cls the `struct CadetHandle`
+ * @param key the key of the entry in the map (the query)
+ * @param value the `struct GSF_CadetRequest` to clean up
+ * @return #GNUNET_YES (continue to iterate)
+ */
+static int
+free_waiting_entry (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GSF_CadetRequest *sr = value;
+
+ GSF_cadet_query_cancel (sr);
+ return GNUNET_YES;
+}
+
+
/**
* Functions with this signature are called whenever a complete reply
* is received.
*
* @param cls closure with the `struct CadetHandle`
- * @param channel channel handle
- * @param channel_ctx channel context
- * @param message the actual message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
+ * @param srm the actual message
*/
-static int
-reply_cb (void *cls,
- struct GNUNET_CADET_Channel *channel,
- void **channel_ctx,
- const struct GNUNET_MessageHeader *message)
+static void
+handle_reply (void *cls,
+ const struct CadetReplyMessage *srm)
{
- struct CadetHandle *mh = *channel_ctx;
- const struct CadetReplyMessage *srm;
+ struct CadetHandle *mh = cls;
struct HandleReplyClosure hrc;
uint16_t msize;
enum GNUNET_BLOCK_Type type;
struct GNUNET_HashCode query;
- msize = ntohs (message->size);
- if (sizeof (struct CadetReplyMessage) > msize)
- {
- GNUNET_break_op (0);
- reset_cadet_async (mh);
- return GNUNET_SYSERR;
- }
- srm = (const struct CadetReplyMessage *) message;
- msize -= sizeof (struct CadetReplyMessage);
+ msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage);
type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
if (GNUNET_YES !=
GNUNET_BLOCK_get_key (GSF_block_ctx,
type,
- &srm[1], msize, &query))
+ &srm[1],
+ msize,
+ &query))
{
GNUNET_break_op (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
msize,
GNUNET_i2s (&mh->target));
reset_cadet_async (mh);
- return GNUNET_SYSERR;
+ return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received reply `%s' via cadet from peer %s\n",
GNUNET_h2s (&query),
GNUNET_i2s (&mh->target));
- GNUNET_CADET_receive_done (channel);
+ GNUNET_CADET_receive_done (mh->channel);
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received via cadet"), 1,
GNUNET_NO);
hrc.found = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
&query,
- &handle_reply,
+ &process_reply,
&hrc);
if (GNUNET_NO == hrc.found)
{
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received via cadet dropped"), 1,
GNUNET_NO);
- return GNUNET_OK;
}
- return GNUNET_OK;
+}
+
+
+/**
+ * Function called by cadet when a client disconnects.
+ * Cleans up our `struct CadetClient` of that channel.
+ *
+ * @param cls our `struct CadetClient`
+ * @param channel channel of the disconnecting client
+ */
+static void
+disconnect_cb (void *cls,
+ const struct GNUNET_CADET_Channel *channel)
+{
+ struct CadetHandle *mh = cls;
+ struct GSF_CadetRequest *sr;
+
+ if (NULL == mh->channel)
+ return; /* being destroyed elsewhere */
+ GNUNET_assert (channel == mh->channel);
+ mh->channel = NULL;
+ while (NULL != (sr = mh->pending_head))
+ GSF_cadet_query_cancel (sr);
+ /* first remove `mh` from the `cadet_map`, so that if the
+ callback from `free_waiting_entry()` happens to re-issue
+ the request, we don't immediately have it back in the
+ `waiting_map`. */
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_remove (cadet_map,
+ &mh->target,
+ mh));
+ GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+ &free_waiting_entry,
+ mh);
+ if (NULL != mh->timeout_task)
+ GNUNET_SCHEDULER_cancel (mh->timeout_task);
+ if (NULL != mh->reset_task)
+ GNUNET_SCHEDULER_cancel (mh->reset_task);
+ GNUNET_assert (0 ==
+ GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
+ GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
+ GNUNET_free (mh);
+}
+
+
+/**
+ * Function called whenever an MQ-channel's transmission window size changes.
+ *
+ * The first callback in an outgoing channel will be with a non-zero value
+ * and will mean the channel is connected to the destination.
+ *
+ * For an incoming channel it will be called immediately after the
+ * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
+ *
+ * @param cls Channel closure.
+ * @param channel Connection to the other end (henceforth invalid).
+ * @param window_size New window size. If the is more messages than buffer size
+ * this value will be negative..
+ */
+static void
+window_change_cb (void *cls,
+ const struct GNUNET_CADET_Channel *channel,
+ int window_size)
+{
+ /* FIXME: for flow control, implement? */
+#if 0
+ /* Something like this instead of the GNUNET_MQ_notify_sent() in
+ transmit_pending() might be good (once the window change CB works...) */
+ if (0 < window_size) /* test needed? */
+ transmit_pending (mh);
+#endif
+}
+
+
+/**
+ * We had a serious error, tear down and re-create cadet from scratch.
+ *
+ * @param mh cadet to reset
+ */
+static void
+reset_cadet (struct CadetHandle *mh)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Resetting cadet channel to %s\n",
+ GNUNET_i2s (&mh->target));
+ GNUNET_CADET_channel_destroy (mh->channel);
+ mh->channel = NULL;
+ GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+ &move_to_pending,
+ mh);
+ {
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (reply,
+ GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
+ struct CadetReplyMessage,
+ mh),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_HashCode port;
+
+ GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
+ strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
+ &port);
+ mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
+ mh,
+ &mh->target,
+ &port,
+ GNUNET_CADET_OPTION_RELIABLE,
+ &window_change_cb,
+ &disconnect_cb,
+ handlers);
+ }
+ transmit_pending (mh);
+}
+
+
+/**
+ * Task called when it is time to destroy an inactive cadet channel.
+ *
+ * @param cls the `struct CadetHandle` to tear down
+ */
+static void
+cadet_timeout (void *cls)
+{
+ struct CadetHandle *mh = cls;
+ struct GNUNET_CADET_Channel *tun;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout on cadet channel to %s\n",
+ GNUNET_i2s (&mh->target));
+ mh->timeout_task = NULL;
+ tun = mh->channel;
+ mh->channel = NULL;
+ if (NULL != tun)
+ GNUNET_CADET_channel_destroy (tun);
+}
+
+
+/**
+ * Task called when it is time to reset an cadet.
+ *
+ * @param cls the `struct CadetHandle` to tear down
+ */
+static void
+reset_cadet_task (void *cls)
+{
+ struct CadetHandle *mh = cls;
+
+ mh->reset_task = NULL;
+ reset_cadet (mh);
+}
+
+
+/**
+ * Transmit pending requests via the cadet.
+ *
+ * @param cls `struct CadetHandle` to process
+ */
+static void
+transmit_pending (void *cls)
+{
+ struct CadetHandle *mh = cls;
+ struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
+ struct GSF_CadetRequest *sr;
+ struct GNUNET_MQ_Envelope *env;
+ struct CadetQueryMessage *sqm;
+
+ if ( (0 != GNUNET_MQ_get_length (mq)) ||
+ (NULL == (sr = mh->pending_head)) )
+ return;
+ GNUNET_CONTAINER_DLL_remove (mh->pending_head,
+ mh->pending_tail,
+ sr);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
+ &sr->query,
+ sr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+ sr->was_transmitted = GNUNET_YES;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending query for %s via cadet to %s\n",
+ GNUNET_h2s (&sr->query),
+ GNUNET_i2s (&mh->target));
+ env = GNUNET_MQ_msg (sqm,
+ GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
+ sqm->type = htonl (sr->type);
+ sqm->query = sr->query;
+ GNUNET_MQ_notify_sent (env,
+ &transmit_pending,
+ mh);
+ GNUNET_MQ_send (mq,
+ env);
}
&mh->target,
mh,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- mh->channel = GNUNET_CADET_channel_create (cadet_handle,
- mh,
- &mh->target,
- GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- GNUNET_CADET_OPTION_RELIABLE);
- GNUNET_assert (mh ==
- GNUNET_CONTAINER_multipeermap_get (cadet_map,
- target));
+ {
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (reply,
+ GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
+ struct CadetReplyMessage,
+ mh),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_HashCode port;
+
+ GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
+ strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
+ &port);
+ mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
+ mh,
+ &mh->target,
+ &port,
+ GNUNET_CADET_OPTION_RELIABLE,
+ &window_change_cb,
+ &disconnect_cb,
+ handlers);
+ }
return mh;
}
*/
struct GSF_CadetRequest *
GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_HashCode *query,
- enum GNUNET_BLOCK_Type type,
- GSF_CadetReplyProcessor proc, void *proc_cls)
+ const struct GNUNET_HashCode *query,
+ enum GNUNET_BLOCK_Type type,
+ GSF_CadetReplyProcessor proc,
+ void *proc_cls)
{
struct CadetHandle *mh;
struct GSF_CadetRequest *sr;
}
-/**
- * Iterator called on each entry in a waiting map to
- * call the 'proc' continuation and release associated
- * resources.
- *
- * @param cls the `struct CadetHandle`
- * @param key the key of the entry in the map (the query)
- * @param value the `struct GSF_CadetRequest` to clean up
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-free_waiting_entry (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct GSF_CadetRequest *sr = value;
-
- GSF_cadet_query_cancel (sr);
- return GNUNET_YES;
-}
-
-
-/**
- * Function called by cadet when a client disconnects.
- * Cleans up our `struct CadetClient` of that channel.
- *
- * @param cls NULL
- * @param channel channel of the disconnecting client
- * @param channel_ctx our `struct CadetClient`
- */
-static void
-cleaner_cb (void *cls,
- const struct GNUNET_CADET_Channel *channel,
- void *channel_ctx)
-{
- struct CadetHandle *mh = channel_ctx;
- struct GSF_CadetRequest *sr;
-
- if (NULL == mh->channel)
- return; /* being destroyed elsewhere */
- GNUNET_assert (channel == mh->channel);
- mh->channel = NULL;
- while (NULL != (sr = mh->pending_head))
- GSF_cadet_query_cancel (sr);
- /* first remove `mh` from the `cadet_map`, so that if the
- callback from `free_waiting_entry()` happens to re-issue
- the request, we don't immediately have it back in the
- `waiting_map`. */
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_remove (cadet_map,
- &mh->target,
- mh));
- GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
- &free_waiting_entry,
- mh);
- if (NULL != mh->wh)
- GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
- if (NULL != mh->timeout_task)
- GNUNET_SCHEDULER_cancel (mh->timeout_task);
- if (NULL != mh->reset_task)
- GNUNET_SCHEDULER_cancel (mh->reset_task);
- GNUNET_assert (0 ==
- GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
- GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
- GNUNET_free (mh);
-}
-
-
-/**
- * Initialize subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_start_client ()
-{
- static const struct GNUNET_CADET_MessageHandler handlers[] = {
- { &reply_cb, GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 0 },
- { NULL, 0, 0 }
- };
-
- cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
- cadet_handle = GNUNET_CADET_connect (GSF_cfg,
- NULL,
- NULL,
- &cleaner_cb,
- handlers,
- NULL);
-}
-
-
/**
* Function called on each active cadets to shut them down.
*
* @param value the `struct CadetHandle` to destroy
* @return #GNUNET_YES (continue to iterate)
*/
-static int
-release_cadets (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
+int
+GSF_cadet_release_clients (void *cls,
+ const struct GNUNET_PeerIdentity *key,
+ void *value)
{
struct CadetHandle *mh = value;
}
-/**
- * Shutdown subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_stop_client ()
-{
- GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
- &release_cadets,
- NULL);
- GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
- cadet_map = NULL;
- if (NULL != cadet_handle)
- {
- GNUNET_CADET_disconnect (cadet_handle);
- cadet_handle = NULL;
- }
-}
-
/* end of gnunet-service-fs_cadet_client.c */