WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
-
+
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
struct GNUNET_CONTAINER_MultiHashMap *watches;
+ /**
+ * ID of the task trying to reconnect to the service.
+ */
+ struct GNUNET_SCHEDULER_Task *reconnect_task;
+
+ /**
+ * Delay until we try to reconnect.
+ */
+ struct GNUNET_TIME_Relative reconnect_delay;
+
/**
* Are we in the process of disconnecting but need to sync first?
*/
/**
* Close the existing connection to PEERSTORE and reconnect.
*
- * @param h handle to the service
+ * @param cls a `struct GNUNET_PEERSTORE_Handle *h`
+ */
+static void
+reconnect (void *cls);
+
+
+/**
+ * Disconnect from the peerstore service.
+ *
+ * @param h peerstore handle to disconnect
+ */
+static void
+disconnect (struct GNUNET_PEERSTORE_Handle *h)
+{
+ struct GNUNET_PEERSTORE_IterateContext *next;
+
+ for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head;
+ NULL != ic;
+ ic = next)
+ {
+ next = ic->next;
+ if (GNUNET_YES == ic->iterating)
+ {
+ GNUNET_PEERSTORE_Processor icb;
+ void *icb_cls;
+
+ icb = ic->callback;
+ icb_cls = ic->callback_cls;
+ GNUNET_PEERSTORE_iterate_cancel (ic);
+ if (NULL != icb)
+ icb (icb_cls,
+ NULL,
+ "Iteration canceled due to reconnection");
+ }
+ }
+
+ if (NULL != h->mq)
+ {
+ GNUNET_MQ_destroy (h->mq);
+ h->mq = NULL;
+ }
+}
+
+
+/**
+ * Function that will schedule the job that will try
+ * to connect us again to the client.
+ *
+ * @param h peerstore to reconnect
*/
static void
-reconnect (struct GNUNET_PEERSTORE_Handle *h);
+disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h)
+{
+ GNUNET_assert (NULL == h->reconnect_task);
+ disconnect (h);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Scheduling task to reconnect to PEERSTORE service in %s.\n",
+ GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
+ GNUNET_YES));
+ h->reconnect_task =
+ GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
+ &reconnect,
+ h);
+ h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
+}
+
/**
LOG (GNUNET_ERROR_TYPE_ERROR,
"Received an error notification from MQ of type: %d\n",
error);
- reconnect (h);
+ disconnect_and_schedule_reconnect (h);
}
* @param h Handle to the service.
*/
static void
-do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
+final_disconnect (struct GNUNET_PEERSTORE_Handle *h)
{
if (NULL != h->mq)
{
while (NULL != (sc = h->store_head))
GNUNET_PEERSTORE_store_cancel (sc);
}
- do_disconnect (h);
+ final_disconnect (h);
}
GNUNET_free (sc->value);
GNUNET_free (sc->key);
GNUNET_free (sc);
- if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
- do_disconnect (h);
+ if ( (GNUNET_YES == h->disconnecting) &&
+ (NULL == h->store_head) )
+ final_disconnect (h);
}
{
LOG (GNUNET_ERROR_TYPE_ERROR,
_("Unexpected iteration response, this should not happen.\n"));
- reconnect (h);
+ disconnect_and_schedule_reconnect (h);
return;
}
callback = ic->callback;
ic->iterating = GNUNET_NO;
GNUNET_PEERSTORE_iterate_cancel (ic);
if (NULL != callback)
- callback (callback_cls, NULL, NULL);
+ callback (callback_cls,
+ NULL,
+ NULL);
+ h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
}
{
LOG (GNUNET_ERROR_TYPE_ERROR,
_("Unexpected iteration response, this should not happen.\n"));
- reconnect (h);
+ disconnect_and_schedule_reconnect (h);
return;
}
ic->iterating = GNUNET_YES;
record = PEERSTORE_parse_record_message (msg);
if (NULL == record)
{
- reconnect (h);
+ disconnect_and_schedule_reconnect (h);
return;
}
PEERSTORE_hash_key (record->sub_system,
LOG (GNUNET_ERROR_TYPE_ERROR,
_("Received a watch result for a non existing watch.\n"));
PEERSTORE_destroy_record (record);
- reconnect (h);
+ disconnect_and_schedule_reconnect (h);
return;
}
if (NULL != wc->callback)
wc->callback (wc->callback_cls,
record,
NULL);
+ h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
PEERSTORE_destroy_record (record);
}
/**
* Close the existing connection to PEERSTORE and reconnect.
*
- * @param h handle to the service
+ * @param cls a `struct GNUNET_PEERSTORE_Handle *`
*/
static void
-reconnect (struct GNUNET_PEERSTORE_Handle *h)
+reconnect (void *cls)
{
+ struct GNUNET_PEERSTORE_Handle *h = cls;
struct GNUNET_MQ_MessageHandler mq_handlers[] = {
GNUNET_MQ_hd_fixed_size (iterate_end,
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
h),
GNUNET_MQ_handler_end ()
};
- struct GNUNET_PEERSTORE_IterateContext *ic;
- struct GNUNET_PEERSTORE_IterateContext *next;
- GNUNET_PEERSTORE_Processor icb;
- void *icb_cls;
- struct GNUNET_PEERSTORE_StoreContext *sc;
struct GNUNET_MQ_Envelope *ev;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Reconnecting...\n");
- for (ic = h->iterate_head; NULL != ic; ic = next)
- {
- next = ic->next;
- if (GNUNET_YES == ic->iterating)
- {
- icb = ic->callback;
- icb_cls = ic->callback_cls;
- GNUNET_PEERSTORE_iterate_cancel (ic);
- if (NULL != icb)
- icb (icb_cls,
- NULL,
- "Iteration canceled due to reconnection");
- }
- }
- if (NULL != h->mq)
- {
- GNUNET_MQ_destroy (h->mq);
- h->mq = NULL;
- }
h->mq = GNUNET_CLIENT_connect (h->cfg,
"peerstore",
mq_handlers,
GNUNET_CONTAINER_multihashmap_iterate (h->watches,
&rewatch_it,
h);
- for (ic = h->iterate_head; NULL != ic; ic = ic->next)
+ for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head;
+ NULL != ic;
+ ic = ic->next)
{
ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
&ic->peer,
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
GNUNET_MQ_send (h->mq, ev);
}
- for (sc = h->store_head; NULL != sc; sc = sc->next)
+ for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head;
+ NULL != sc;
+ sc = sc->next)
{
ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
&sc->peer,