static void
reconnect (struct GNUNET_PEERSTORE_Handle *h);
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
+ */
+void watch_request_sent (void *cls);
+
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
+ */
+void iterate_request_sent (void *cls);
+
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
+ */
+void store_request_sent (void *cls);
+
/**
* MQ message handlers
*/
reconnect(h);
}
+/**
+ * Iterator over previous watches to resend them
+ */
+int rewatch_it(void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GNUNET_PEERSTORE_Handle *h = cls;
+ struct GNUNET_PEERSTORE_WatchContext *wc = value;
+ struct StoreKeyHashMessage *hm;
+
+ if(GNUNET_YES == wc->request_sent)
+ { /* Envelope gone, create new one. */
+ wc->ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
+ hm->keyhash = wc->keyhash;
+ wc->request_sent = GNUNET_NO;
+ }
+ GNUNET_MQ_notify_sent(wc->ev, &watch_request_sent, wc);
+ GNUNET_MQ_send(h->mq, wc->ev);
+ return GNUNET_YES;
+}
+
/**
* Close the existing connection to PEERSTORE and reconnect.
*
static void
reconnect (struct GNUNET_PEERSTORE_Handle *h)
{
+ struct GNUNET_PEERSTORE_IterateContext *ic;
+ GNUNET_PEERSTORE_Processor icb;
+ void *icb_cls;
+ struct GNUNET_PEERSTORE_StoreContext *sc;
+
LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
if (NULL != h->mq)
{
h->client = NULL;
}
h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
- //FIXME: retry connecting if fails again (client == NULL)
+ GNUNET_assert(NULL != h->client);
h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
mq_handlers,
&handle_client_error,
h);
- //FIXME: resend pending requests after reconnecting
-
+ LOG(GNUNET_ERROR_TYPE_DEBUG,
+ "Resending pending requests after reconnect.\n");
+ if (NULL != h->watches)
+ {
+ GNUNET_CONTAINER_multihashmap_iterate(h->watches,
+ &rewatch_it, h);
+ }
+ ic = h->iterate_head;
+ while (NULL != ic)
+ {
+ if (GNUNET_YES == ic->request_sent)
+ {
+ icb = ic->callback;
+ icb_cls = ic->callback_cls;
+ GNUNET_PEERSTORE_iterate_cancel(ic);
+ if(NULL != icb)
+ icb(icb_cls, NULL,_("Iteration canceled due to reconnection."));
+ }
+ else
+ {
+ GNUNET_MQ_notify_sent(ic->ev, &iterate_request_sent, ic);
+ GNUNET_MQ_send(h->mq, ic->ev);
+ }
+ ic = ic->next;
+ }
+ sc = h->store_head;
+ while (NULL != sc)
+ {
+ GNUNET_MQ_notify_sent(sc->ev, &store_request_sent, sc);
+ GNUNET_MQ_send(h->mq, sc->ev);
+ sc = sc->next;
+ }
}
/**
void
GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
{
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
if(NULL != h->watches)
{
GNUNET_CONTAINER_multihashmap_destroy(h->watches);
void
GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
{
- LOG(GNUNET_ERROR_TYPE_DEBUG,
- "Canceling store request.\n");
if(NULL != sc->ev)
{
GNUNET_MQ_send_cancel(sc->ev);
}
/**
- * Store a new entry in the PEERSTORE
+ * Store a new entry in the PEERSTORE.
+ * Note that stored entries can be lost in some cases
+ * such as power failure.
*
* @param h Handle to the PEERSTORE service
* @param sub_system name of the sub system
sc->cont = cont;
sc->cont_cls = cont_cls;
sc->h = h;
- GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
+ GNUNET_CONTAINER_DLL_insert_tail(h->store_head, h->store_tail, sc);
GNUNET_MQ_notify_sent(ev, &store_request_sent, sc);
GNUNET_MQ_send(h->mq, ev);
return sc;
void
GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
{
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling iterate request.\n");
if(GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
{
GNUNET_SCHEDULER_cancel(ic->timeout_task);
ic->ev = ev;
ic->h = h;
ic->request_sent = GNUNET_NO;
- GNUNET_CONTAINER_DLL_insert(h->iterate_head, h->iterate_tail, ic);
+ GNUNET_CONTAINER_DLL_insert_tail(h->iterate_head, h->iterate_tail, ic);
LOG(GNUNET_ERROR_TYPE_DEBUG,
"Sending an iterate request for sub system `%s'\n", sub_system);
GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic);
GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash,
wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
LOG(GNUNET_ERROR_TYPE_DEBUG,
- "Sending a watch request for sub system `%s'.\n", sub_system);
+ "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
+ sub_system, GNUNET_i2s(peer), key);
GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc);
GNUNET_MQ_send(h->mq, ev);
return wc;