- moved timeout handling responsibility from for nat tests from caller to the library
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
index 0143c58cc83821ec5a214443ae7157bc54b0f20d..b53bc2f1a2133742eea9161708679e4fa6a1d85b 100644 (file)
@@ -244,6 +244,27 @@ void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
 static void
 reconnect (struct GNUNET_PEERSTORE_Handle *h);
 
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
+ */
+void watch_request_sent (void *cls);
+
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
+ */
+void iterate_request_sent (void *cls);
+
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
+ */
+void store_request_sent (void *cls);
+
 /**
  * MQ message handlers
  */
@@ -267,6 +288,28 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error)
   reconnect(h);
 }
 
+/**
+ * Iterator over previous watches to resend them
+ */
+int rewatch_it(void *cls,
+    const struct GNUNET_HashCode *key,
+    void *value)
+{
+  struct GNUNET_PEERSTORE_Handle *h = cls;
+  struct GNUNET_PEERSTORE_WatchContext *wc = value;
+  struct StoreKeyHashMessage *hm;
+
+  if(GNUNET_YES == wc->request_sent)
+  { /* Envelope gone, create new one. */
+    wc->ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
+    hm->keyhash = wc->keyhash;
+    wc->request_sent = GNUNET_NO;
+  }
+  GNUNET_MQ_notify_sent(wc->ev, &watch_request_sent, wc);
+  GNUNET_MQ_send(h->mq, wc->ev);
+  return GNUNET_YES;
+}
+
 /**
  * Close the existing connection to PEERSTORE and reconnect.
  *
@@ -275,6 +318,11 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error)
 static void
 reconnect (struct GNUNET_PEERSTORE_Handle *h)
 {
+  struct GNUNET_PEERSTORE_IterateContext *ic;
+  GNUNET_PEERSTORE_Processor icb;
+  void *icb_cls;
+  struct GNUNET_PEERSTORE_StoreContext *sc;
+
   LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
   if (NULL != h->mq)
   {
@@ -287,13 +335,43 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h)
     h->client = NULL;
   }
   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
-  //FIXME: retry connecting if fails again (client == NULL)
+  GNUNET_assert(NULL != h->client);
   h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
       mq_handlers,
       &handle_client_error,
       h);
-  //FIXME: resend pending requests after reconnecting
-
+  LOG(GNUNET_ERROR_TYPE_DEBUG,
+      "Resending pending requests after reconnect.\n");
+  if (NULL != h->watches)
+  {
+    GNUNET_CONTAINER_multihashmap_iterate(h->watches,
+        &rewatch_it, h);
+  }
+  ic = h->iterate_head;
+  while (NULL != ic)
+  {
+    if (GNUNET_YES == ic->request_sent)
+    {
+      icb = ic->callback;
+      icb_cls = ic->callback_cls;
+      GNUNET_PEERSTORE_iterate_cancel(ic);
+      if(NULL != icb)
+        icb(icb_cls, NULL,_("Iteration canceled due to reconnection."));
+    }
+    else
+    {
+      GNUNET_MQ_notify_sent(ic->ev, &iterate_request_sent, ic);
+      GNUNET_MQ_send(h->mq, ic->ev);
+    }
+    ic = ic->next;
+  }
+  sc = h->store_head;
+  while (NULL != sc)
+  {
+    GNUNET_MQ_notify_sent(sc->ev, &store_request_sent, sc);
+    GNUNET_MQ_send(h->mq, sc->ev);
+    sc = sc->next;
+  }
 }
 
 /**
@@ -336,6 +414,7 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 void
 GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
 {
+  LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
   if(NULL != h->watches)
   {
     GNUNET_CONTAINER_multihashmap_destroy(h->watches);
@@ -387,8 +466,6 @@ void store_request_sent (void *cls)
 void
 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
 {
-  LOG(GNUNET_ERROR_TYPE_DEBUG,
-          "Canceling store request.\n");
   if(NULL != sc->ev)
   {
     GNUNET_MQ_send_cancel(sc->ev);
@@ -399,7 +476,9 @@ GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
 }
 
 /**
- * Store a new entry in the PEERSTORE
+ * Store a new entry in the PEERSTORE.
+ * Note that stored entries can be lost in some cases
+ * such as power failure.
  *
  * @param h Handle to the PEERSTORE service
  * @param sub_system name of the sub system
@@ -442,7 +521,7 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
   sc->cont = cont;
   sc->cont_cls = cont_cls;
   sc->h = h;
-  GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
+  GNUNET_CONTAINER_DLL_insert_tail(h->store_head, h->store_tail, sc);
   GNUNET_MQ_notify_sent(ev, &store_request_sent, sc);
   GNUNET_MQ_send(h->mq, ev);
   return sc;
@@ -548,7 +627,6 @@ void iterate_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 void
 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
 {
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling iterate request.\n");
   if(GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
   {
     GNUNET_SCHEDULER_cancel(ic->timeout_task);
@@ -605,7 +683,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
   ic->ev = ev;
   ic->h = h;
   ic->request_sent = GNUNET_NO;
-  GNUNET_CONTAINER_DLL_insert(h->iterate_head, h->iterate_tail, ic);
+  GNUNET_CONTAINER_DLL_insert_tail(h->iterate_head, h->iterate_tail, ic);
   LOG(GNUNET_ERROR_TYPE_DEBUG,
         "Sending an iterate request for sub system `%s'\n", sub_system);
   GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic);
@@ -728,7 +806,8 @@ GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
   GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash,
       wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
   LOG(GNUNET_ERROR_TYPE_DEBUG,
-      "Sending a watch request for sub system `%s'.\n", sub_system);
+      "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
+      sub_system, GNUNET_i2s(peer), key);
   GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc);
   GNUNET_MQ_send(h->mq, ev);
   return wc;