GSF_CadetReplyProcessor proc;
/**
- * Closure for 'proc'
+ * Closure for @e proc
*/
void *proc_cls;
*/
struct GNUNET_CADET_Channel *channel;
- /**
- * Handle for active write operation, or NULL.
- */
- struct GNUNET_CADET_TransmitHandle *wh;
-
/**
* Which peer does this cadet go to?
*/
* a few seconds to give the application a chance to give
* us another query).
*/
- struct GNUNET_SCHEDULER_Task * timeout_task;
+ struct GNUNET_SCHEDULER_Task *timeout_task;
/**
* Task to reset cadets that had errors (asynchronously,
* as we may not be able to do it immediately during a
* callback from the cadet API).
*/
- struct GNUNET_SCHEDULER_Task * reset_task;
+ struct GNUNET_SCHEDULER_Task *reset_task;
};
/**
* Transmit pending requests via the cadet.
*
- * @param mh cadet to process
+ * @param cls `struct CadetHandle` to process
*/
static void
-transmit_pending (struct CadetHandle *mh);
+transmit_pending (void *cls);
/**
/**
- * We had a serious error, tear down and re-create cadet from scratch.
- *
- * @param mh cadet to reset
- */
-static void
-reset_cadet (struct CadetHandle *mh)
-{
- struct GNUNET_CADET_Channel *channel = mh->channel;
- struct GNUNET_HashCode port;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Resetting cadet channel to %s\n",
- GNUNET_i2s (&mh->target));
- mh->channel = NULL;
-
- if (NULL != channel)
- {
- /* Avoid loop */
- if (NULL != mh->wh)
- {
- GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
- mh->wh = NULL;
- }
- GNUNET_CADET_channel_destroy (channel);
- }
- GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
- &move_to_pending,
- mh);
- GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
- strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
- &port);
- mh->channel = GNUNET_CADET_channel_create (cadet_handle,
- mh,
- &mh->target,
- &port,
- GNUNET_CADET_OPTION_RELIABLE);
- transmit_pending (mh);
-}
-
-
-/**
- * Task called when it is time to destroy an inactive cadet channel.
+ * Functions with this signature are called whenever a complete reply
+ * is received.
*
- * @param cls the `struct CadetHandle` to tear down
+ * @param cls closure with the `struct CadetHandle`
+ * @param srm the actual message
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
*/
-static void
-cadet_timeout (void *cls)
+static int
+check_reply (void *cls,
+ const struct CadetReplyMessage *srm)
{
- struct CadetHandle *mh = cls;
- struct GNUNET_CADET_Channel *tun;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout on cadet channel to %s\n",
- GNUNET_i2s (&mh->target));
- mh->timeout_task = NULL;
- tun = mh->channel;
- mh->channel = NULL;
- if(NULL != tun)
- GNUNET_CADET_channel_destroy (tun);
+ /* We check later... */
+ return GNUNET_OK;
}
* @param cls the `struct CadetHandle` to tear down
*/
static void
-reset_cadet_task (void *cls)
-{
- struct CadetHandle *mh = cls;
-
- mh->reset_task = NULL;
- reset_cadet (mh);
-}
+reset_cadet_task (void *cls);
/**
}
-/**
- * Functions of this signature are called whenever we are ready to transmit
- * query via a cadet.
- *
- * @param cls the struct CadetHandle for which we did the write call
- * @param size the number of bytes that can be written to @a buf
- * @param buf where to write the message
- * @return number of bytes written to @a buf
- */
-static size_t
-transmit_sqm (void *cls,
- size_t size,
- void *buf)
-{
- struct CadetHandle *mh = cls;
- struct CadetQueryMessage sqm;
- struct GSF_CadetRequest *sr;
-
- mh->wh = NULL;
- if (NULL == buf)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Cadet channel to %s failed during transmission attempt, rebuilding\n",
- GNUNET_i2s (&mh->target));
- reset_cadet_async (mh);
- return 0;
- }
- sr = mh->pending_head;
- if (NULL == sr)
- return 0;
- GNUNET_assert (size >= sizeof (struct CadetQueryMessage));
- GNUNET_CONTAINER_DLL_remove (mh->pending_head,
- mh->pending_tail,
- sr);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
- &sr->query,
- sr,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- sr->was_transmitted = GNUNET_YES;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending query for %s via cadet to %s\n",
- GNUNET_h2s (&sr->query),
- GNUNET_i2s (&mh->target));
- sqm.header.size = htons (sizeof (sqm));
- sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
- sqm.type = htonl (sr->type);
- sqm.query = sr->query;
- GNUNET_memcpy (buf, &sqm, sizeof (sqm));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Successfully transmitted %u bytes via cadet to %s\n",
- (unsigned int) size,
- GNUNET_i2s (&mh->target));
- transmit_pending (mh);
- return sizeof (sqm);
-}
-
-
-/**
- * Transmit pending requests via the cadet.
- *
- * @param mh cadet to process
- */
-static void
-transmit_pending (struct CadetHandle *mh)
-{
- if (NULL == mh->channel)
- return;
- if (NULL != mh->wh)
- return;
- mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /* allow cork */,
- GNUNET_TIME_UNIT_FOREVER_REL,
- sizeof (struct CadetQueryMessage),
- &transmit_sqm, mh);
-}
-
-
/**
* Closure for handle_reply().
*/
struct GNUNET_TIME_Absolute expiration;
/**
- * Number of bytes in 'data'.
+ * Number of bytes in @e data.
*/
size_t data_size;
/**
- * Functions with this signature are called whenever a complete reply
- * is received.
+ * Iterator called on each entry in a waiting map to
+ * call the 'proc' continuation and release associated
+ * resources.
*
- * @param cls closure with the `struct CadetHandle`
- * @param srm the actual message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
+ * @param cls the `struct CadetHandle`
+ * @param key the key of the entry in the map (the query)
+ * @param value the `struct GSF_CadetRequest` to clean up
+ * @return #GNUNET_YES (continue to iterate)
*/
static int
-check_reply (void *cls,
- const struct CadetReplyMessage *srm)
+free_waiting_entry (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
{
- /* We check later... */
- return GNUNET_OK;
+ struct GSF_CadetRequest *sr = value;
+
+ GSF_cadet_query_cancel (sr);
+ return GNUNET_YES;
}
}
-/**
- * Iterator called on each entry in a waiting map to
- * call the 'proc' continuation and release associated
- * resources.
- *
- * @param cls the `struct CadetHandle`
- * @param key the key of the entry in the map (the query)
- * @param value the `struct GSF_CadetRequest` to clean up
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-free_waiting_entry (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct GSF_CadetRequest *sr = value;
-
- GSF_cadet_query_cancel (sr);
- return GNUNET_YES;
-}
-
-
/**
* Function called by cadet when a client disconnects.
* Cleans up our `struct CadetClient` of that channel.
GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
&free_waiting_entry,
mh);
- if (NULL != mh->wh)
- GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
if (NULL != mh->timeout_task)
GNUNET_SCHEDULER_cancel (mh->timeout_task);
if (NULL != mh->reset_task)
int window_size)
{
/* FIXME: for flow control, implement? */
+#if 0
+ /* Something like this instead of the GNUNET_MQ_notify_sent() in
+ transmit_pending() might be good (once the window change CB works...) */
+ if (0 < window_size) /* test needed? */
+ transmit_pending (mh);
+#endif
+}
+
+
+/**
+ * We had a serious error, tear down and re-create cadet from scratch.
+ *
+ * @param mh cadet to reset
+ */
+static void
+reset_cadet (struct CadetHandle *mh)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Resetting cadet channel to %s\n",
+ GNUNET_i2s (&mh->target));
+ GNUNET_CADET_channel_destroy (mh->channel);
+ mh->channel = NULL;
+ GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+ &move_to_pending,
+ mh);
+ {
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (reply,
+ GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
+ struct CadetReplyMessage,
+ mh),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_HashCode port;
+
+ GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
+ strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
+ &port);
+ mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
+ mh,
+ &mh->target,
+ &port,
+ GNUNET_CADET_OPTION_RELIABLE,
+ &window_change_cb,
+ &disconnect_cb,
+ handlers);
+ }
+ transmit_pending (mh);
+}
+
+
+/**
+ * Task called when it is time to destroy an inactive cadet channel.
+ *
+ * @param cls the `struct CadetHandle` to tear down
+ */
+static void
+cadet_timeout (void *cls)
+{
+ struct CadetHandle *mh = cls;
+ struct GNUNET_CADET_Channel *tun;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout on cadet channel to %s\n",
+ GNUNET_i2s (&mh->target));
+ mh->timeout_task = NULL;
+ tun = mh->channel;
+ mh->channel = NULL;
+ if (NULL != tun)
+ GNUNET_CADET_channel_destroy (tun);
+}
+
+
+/**
+ * Task called when it is time to reset an cadet.
+ *
+ * @param cls the `struct CadetHandle` to tear down
+ */
+static void
+reset_cadet_task (void *cls)
+{
+ struct CadetHandle *mh = cls;
+
+ mh->reset_task = NULL;
+ reset_cadet (mh);
+}
+
+
+/**
+ * Transmit pending requests via the cadet.
+ *
+ * @param cls `struct CadetHandle` to process
+ */
+static void
+transmit_pending (void *cls)
+{
+ struct CadetHandle *mh = cls;
+ struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
+ struct GSF_CadetRequest *sr;
+ struct GNUNET_MQ_Envelope *env;
+ struct CadetQueryMessage *sqm;
+
+ if ( (0 != GNUNET_MQ_get_length (mq)) ||
+ (NULL == (sr = mh->pending_head)) )
+ return;
+ GNUNET_CONTAINER_DLL_remove (mh->pending_head,
+ mh->pending_tail,
+ sr);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
+ &sr->query,
+ sr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+ sr->was_transmitted = GNUNET_YES;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending query for %s via cadet to %s\n",
+ GNUNET_h2s (&sr->query),
+ GNUNET_i2s (&mh->target));
+ env = GNUNET_MQ_msg (sqm,
+ GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
+ sqm->type = htonl (sr->type);
+ sqm->query = sr->query;
+ GNUNET_MQ_notify_sent (env,
+ &transmit_pending,
+ mh);
+ GNUNET_MQ_send (mq,
+ env);
}
get_cadet (const struct GNUNET_PeerIdentity *target)
{
struct CadetHandle *mh;
- struct GNUNET_HashCode port;
mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
target);
&mh->target,
mh,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
- strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
- &port);
-
{
struct GNUNET_MQ_MessageHandler handlers[] = {
GNUNET_MQ_hd_var_size (reply,
mh),
GNUNET_MQ_handler_end ()
};
+ struct GNUNET_HashCode port;
+ GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
+ strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
+ &port);
mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
mh,
&mh->target,
*/
struct GSF_CadetRequest *
GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_HashCode *query,
- enum GNUNET_BLOCK_Type type,
- GSF_CadetReplyProcessor proc, void *proc_cls)
+ const struct GNUNET_HashCode *query,
+ enum GNUNET_BLOCK_Type type,
+ GSF_CadetReplyProcessor proc,
+ void *proc_cls)
{
struct CadetHandle *mh;
struct GSF_CadetRequest *sr;