From 181c039d12aa2aa99920d14070e7b64c018e8be7 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 17 Feb 2017 14:31:38 +0100 Subject: [PATCH] get FS test with CADET to finally pass again --- src/cadet/cadet_api_new.c | 5 +- src/fs/Makefile.am | 2 +- src/fs/gnunet-service-fs_cadet_client.c | 358 +++++++++++------------- src/fs/gnunet-service-fs_cadet_server.c | 10 +- 4 files changed, 170 insertions(+), 205 deletions(-) diff --git a/src/cadet/cadet_api_new.c b/src/cadet/cadet_api_new.c index 8f482aa28..eb8bc2549 100644 --- a/src/cadet/cadet_api_new.c +++ b/src/cadet/cadet_api_new.c @@ -711,11 +711,10 @@ handle_local_data (void *cls, type = ntohs (payload->type); fwd = ntohl (ch->ccn.channel_of_client) <= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got a %s data on channel %s [%X] of type %s (%u)\n", - GC_f2s (fwd), + "Got a %s data on channel %s [%X] of type %u\n", + fwd ? "FWD" : "BWD", GNUNET_i2s (&ch->peer), ntohl (message->ccn.channel_of_client), - GC_m2s (type), type); GNUNET_MQ_inject_message (ch->mq, payload); diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index 344eb5a74..75451c7f6 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am @@ -202,7 +202,7 @@ gnunet_service_fs_LDADD = \ $(top_builddir)/src/block/libgnunetblock.la \ $(top_builddir)/src/datastore/libgnunetdatastore.la \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ - $(top_builddir)/src/cadet/libgnunetcadet.la \ + $(top_builddir)/src/cadet/libgnunetcadetnew.la \ $(top_builddir)/src/ats/libgnunetats.la \ $(top_builddir)/src/core/libgnunetcore.la \ $(top_builddir)/src/util/libgnunetutil.la \ diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c index 193fe2263..55e0cbc24 100644 --- a/src/fs/gnunet-service-fs_cadet_client.c +++ b/src/fs/gnunet-service-fs_cadet_client.c @@ -77,7 +77,7 @@ struct GSF_CadetRequest GSF_CadetReplyProcessor proc; /** - * Closure for 'proc' + * Closure for @e proc */ void *proc_cls; @@ -125,11 +125,6 @@ struct CadetHandle */ struct GNUNET_CADET_Channel *channel; - /** - * Handle for active write operation, or NULL. - */ - struct GNUNET_CADET_TransmitHandle *wh; - /** * Which peer does this cadet go to? */ @@ -140,14 +135,14 @@ struct CadetHandle * a few seconds to give the application a chance to give * us another query). */ - struct GNUNET_SCHEDULER_Task * timeout_task; + struct GNUNET_SCHEDULER_Task *timeout_task; /** * Task to reset cadets that had errors (asynchronously, * as we may not be able to do it immediately during a * callback from the cadet API). */ - struct GNUNET_SCHEDULER_Task * reset_task; + struct GNUNET_SCHEDULER_Task *reset_task; }; @@ -170,10 +165,10 @@ struct GNUNET_CONTAINER_MultiPeerMap *cadet_map; /** * Transmit pending requests via the cadet. * - * @param mh cadet to process + * @param cls `struct CadetHandle` to process */ static void -transmit_pending (struct CadetHandle *mh); +transmit_pending (void *cls); /** @@ -206,65 +201,19 @@ move_to_pending (void *cls, /** - * We had a serious error, tear down and re-create cadet from scratch. - * - * @param mh cadet to reset - */ -static void -reset_cadet (struct CadetHandle *mh) -{ - struct GNUNET_CADET_Channel *channel = mh->channel; - struct GNUNET_HashCode port; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Resetting cadet channel to %s\n", - GNUNET_i2s (&mh->target)); - mh->channel = NULL; - - if (NULL != channel) - { - /* Avoid loop */ - if (NULL != mh->wh) - { - GNUNET_CADET_notify_transmit_ready_cancel (mh->wh); - mh->wh = NULL; - } - GNUNET_CADET_channel_destroy (channel); - } - GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, - &move_to_pending, - mh); - GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, - strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), - &port); - mh->channel = GNUNET_CADET_channel_create (cadet_handle, - mh, - &mh->target, - &port, - GNUNET_CADET_OPTION_RELIABLE); - transmit_pending (mh); -} - - -/** - * Task called when it is time to destroy an inactive cadet channel. + * Functions with this signature are called whenever a complete reply + * is received. * - * @param cls the `struct CadetHandle` to tear down + * @param cls closure with the `struct CadetHandle` + * @param srm the actual message + * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing */ -static void -cadet_timeout (void *cls) +static int +check_reply (void *cls, + const struct CadetReplyMessage *srm) { - struct CadetHandle *mh = cls; - struct GNUNET_CADET_Channel *tun; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Timeout on cadet channel to %s\n", - GNUNET_i2s (&mh->target)); - mh->timeout_task = NULL; - tun = mh->channel; - mh->channel = NULL; - if(NULL != tun) - GNUNET_CADET_channel_destroy (tun); + /* We check later... */ + return GNUNET_OK; } @@ -274,13 +223,7 @@ cadet_timeout (void *cls) * @param cls the `struct CadetHandle` to tear down */ static void -reset_cadet_task (void *cls) -{ - struct CadetHandle *mh = cls; - - mh->reset_task = NULL; - reset_cadet (mh); -} +reset_cadet_task (void *cls); /** @@ -299,83 +242,6 @@ reset_cadet_async (struct CadetHandle *mh) } -/** - * Functions of this signature are called whenever we are ready to transmit - * query via a cadet. - * - * @param cls the struct CadetHandle for which we did the write call - * @param size the number of bytes that can be written to @a buf - * @param buf where to write the message - * @return number of bytes written to @a buf - */ -static size_t -transmit_sqm (void *cls, - size_t size, - void *buf) -{ - struct CadetHandle *mh = cls; - struct CadetQueryMessage sqm; - struct GSF_CadetRequest *sr; - - mh->wh = NULL; - if (NULL == buf) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Cadet channel to %s failed during transmission attempt, rebuilding\n", - GNUNET_i2s (&mh->target)); - reset_cadet_async (mh); - return 0; - } - sr = mh->pending_head; - if (NULL == sr) - return 0; - GNUNET_assert (size >= sizeof (struct CadetQueryMessage)); - GNUNET_CONTAINER_DLL_remove (mh->pending_head, - mh->pending_tail, - sr); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (mh->waiting_map, - &sr->query, - sr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - sr->was_transmitted = GNUNET_YES; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending query for %s via cadet to %s\n", - GNUNET_h2s (&sr->query), - GNUNET_i2s (&mh->target)); - sqm.header.size = htons (sizeof (sqm)); - sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); - sqm.type = htonl (sr->type); - sqm.query = sr->query; - GNUNET_memcpy (buf, &sqm, sizeof (sqm)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Successfully transmitted %u bytes via cadet to %s\n", - (unsigned int) size, - GNUNET_i2s (&mh->target)); - transmit_pending (mh); - return sizeof (sqm); -} - - -/** - * Transmit pending requests via the cadet. - * - * @param mh cadet to process - */ -static void -transmit_pending (struct CadetHandle *mh) -{ - if (NULL == mh->channel) - return; - if (NULL != mh->wh) - return; - mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /* allow cork */, - GNUNET_TIME_UNIT_FOREVER_REL, - sizeof (struct CadetQueryMessage), - &transmit_sqm, mh); -} - - /** * Closure for handle_reply(). */ @@ -393,7 +259,7 @@ struct HandleReplyClosure struct GNUNET_TIME_Absolute expiration; /** - * Number of bytes in 'data'. + * Number of bytes in @e data. */ size_t data_size; @@ -439,19 +305,24 @@ process_reply (void *cls, /** - * Functions with this signature are called whenever a complete reply - * is received. + * Iterator called on each entry in a waiting map to + * call the 'proc' continuation and release associated + * resources. * - * @param cls closure with the `struct CadetHandle` - * @param srm the actual message - * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing + * @param cls the `struct CadetHandle` + * @param key the key of the entry in the map (the query) + * @param value the `struct GSF_CadetRequest` to clean up + * @return #GNUNET_YES (continue to iterate) */ static int -check_reply (void *cls, - const struct CadetReplyMessage *srm) +free_waiting_entry (void *cls, + const struct GNUNET_HashCode *key, + void *value) { - /* We check later... */ - return GNUNET_OK; + struct GSF_CadetRequest *sr = value; + + GSF_cadet_query_cancel (sr); + return GNUNET_YES; } @@ -516,28 +387,6 @@ handle_reply (void *cls, } -/** - * Iterator called on each entry in a waiting map to - * call the 'proc' continuation and release associated - * resources. - * - * @param cls the `struct CadetHandle` - * @param key the key of the entry in the map (the query) - * @param value the `struct GSF_CadetRequest` to clean up - * @return #GNUNET_YES (continue to iterate) - */ -static int -free_waiting_entry (void *cls, - const struct GNUNET_HashCode *key, - void *value) -{ - struct GSF_CadetRequest *sr = value; - - GSF_cadet_query_cancel (sr); - return GNUNET_YES; -} - - /** * Function called by cadet when a client disconnects. * Cleans up our `struct CadetClient` of that channel. @@ -569,8 +418,6 @@ disconnect_cb (void *cls, GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, &free_waiting_entry, mh); - if (NULL != mh->wh) - GNUNET_CADET_notify_transmit_ready_cancel (mh->wh); if (NULL != mh->timeout_task) GNUNET_SCHEDULER_cancel (mh->timeout_task); if (NULL != mh->reset_task) @@ -602,6 +449,133 @@ window_change_cb (void *cls, int window_size) { /* FIXME: for flow control, implement? */ +#if 0 + /* Something like this instead of the GNUNET_MQ_notify_sent() in + transmit_pending() might be good (once the window change CB works...) */ + if (0 < window_size) /* test needed? */ + transmit_pending (mh); +#endif +} + + +/** + * We had a serious error, tear down and re-create cadet from scratch. + * + * @param mh cadet to reset + */ +static void +reset_cadet (struct CadetHandle *mh) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Resetting cadet channel to %s\n", + GNUNET_i2s (&mh->target)); + GNUNET_CADET_channel_destroy (mh->channel); + mh->channel = NULL; + GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, + &move_to_pending, + mh); + { + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (reply, + GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, + struct CadetReplyMessage, + mh), + GNUNET_MQ_handler_end () + }; + struct GNUNET_HashCode port; + + GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, + strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), + &port); + mh->channel = GNUNET_CADET_channel_creatE (cadet_handle, + mh, + &mh->target, + &port, + GNUNET_CADET_OPTION_RELIABLE, + &window_change_cb, + &disconnect_cb, + handlers); + } + transmit_pending (mh); +} + + +/** + * Task called when it is time to destroy an inactive cadet channel. + * + * @param cls the `struct CadetHandle` to tear down + */ +static void +cadet_timeout (void *cls) +{ + struct CadetHandle *mh = cls; + struct GNUNET_CADET_Channel *tun; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout on cadet channel to %s\n", + GNUNET_i2s (&mh->target)); + mh->timeout_task = NULL; + tun = mh->channel; + mh->channel = NULL; + if (NULL != tun) + GNUNET_CADET_channel_destroy (tun); +} + + +/** + * Task called when it is time to reset an cadet. + * + * @param cls the `struct CadetHandle` to tear down + */ +static void +reset_cadet_task (void *cls) +{ + struct CadetHandle *mh = cls; + + mh->reset_task = NULL; + reset_cadet (mh); +} + + +/** + * Transmit pending requests via the cadet. + * + * @param cls `struct CadetHandle` to process + */ +static void +transmit_pending (void *cls) +{ + struct CadetHandle *mh = cls; + struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel); + struct GSF_CadetRequest *sr; + struct GNUNET_MQ_Envelope *env; + struct CadetQueryMessage *sqm; + + if ( (0 != GNUNET_MQ_get_length (mq)) || + (NULL == (sr = mh->pending_head)) ) + return; + GNUNET_CONTAINER_DLL_remove (mh->pending_head, + mh->pending_tail, + sr); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (mh->waiting_map, + &sr->query, + sr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); + sr->was_transmitted = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending query for %s via cadet to %s\n", + GNUNET_h2s (&sr->query), + GNUNET_i2s (&mh->target)); + env = GNUNET_MQ_msg (sqm, + GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); + sqm->type = htonl (sr->type); + sqm->query = sr->query; + GNUNET_MQ_notify_sent (env, + &transmit_pending, + mh); + GNUNET_MQ_send (mq, + env); } @@ -614,7 +588,6 @@ static struct CadetHandle * get_cadet (const struct GNUNET_PeerIdentity *target) { struct CadetHandle *mh; - struct GNUNET_HashCode port; mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, target); @@ -641,10 +614,6 @@ get_cadet (const struct GNUNET_PeerIdentity *target) &mh->target, mh, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, - strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), - &port); - { struct GNUNET_MQ_MessageHandler handlers[] = { GNUNET_MQ_hd_var_size (reply, @@ -653,7 +622,11 @@ get_cadet (const struct GNUNET_PeerIdentity *target) mh), GNUNET_MQ_handler_end () }; + struct GNUNET_HashCode port; + GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, + strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), + &port); mh->channel = GNUNET_CADET_channel_creatE (cadet_handle, mh, &mh->target, @@ -679,9 +652,10 @@ get_cadet (const struct GNUNET_PeerIdentity *target) */ struct GSF_CadetRequest * GSF_cadet_query (const struct GNUNET_PeerIdentity *target, - const struct GNUNET_HashCode *query, - enum GNUNET_BLOCK_Type type, - GSF_CadetReplyProcessor proc, void *proc_cls) + const struct GNUNET_HashCode *query, + enum GNUNET_BLOCK_Type type, + GSF_CadetReplyProcessor proc, + void *proc_cls) { struct CadetHandle *mh; struct GSF_CadetRequest *sr; diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c index 0a72a8279..adbce1154 100644 --- a/src/fs/gnunet-service-fs_cadet_server.c +++ b/src/fs/gnunet-service-fs_cadet_server.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2012, 2013 GNUnet e.V. + Copyright (C) 2012, 2013, 2017 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -85,11 +85,6 @@ struct CadetClient */ struct GNUNET_CADET_Channel *channel; - /** - * Handle for active write operation, or NULL. - */ - struct GNUNET_CADET_TransmitHandle *wh; - /** * Head of write queue. */ @@ -439,8 +434,6 @@ disconnect_cb (void *cls, GNUNET_SCHEDULER_cancel (sc->terminate_task); if (NULL != sc->timeout_task) GNUNET_SCHEDULER_cancel (sc->timeout_task); - if (NULL != sc->wh) - GNUNET_CADET_notify_transmit_ready_cancel (sc->wh); if (NULL != sc->qe) GNUNET_DATASTORE_cancel (sc->qe); while (NULL != (wqi = sc->wqi_head)) @@ -458,7 +451,6 @@ disconnect_cb (void *cls, } - /** * Function called whenever an MQ-channel's transmission window size changes. * -- 2.25.1