- moved timeout handling responsibility from for nat tests from caller to the library
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
index 14b1c3e8899c9fcd86d62515bf152c106bf13e60..b53bc2f1a2133742eea9161708679e4fa6a1d85b 100644 (file)
@@ -76,14 +76,9 @@ struct GNUNET_PEERSTORE_Handle
   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
 
   /**
-   * Head of WATCH requests (active and inactive).
+   * Hashmap of watch requests
    */
-  struct GNUNET_PEERSTORE_WatchContext *watch_head;
-
-  /**
-   * Tail of WATCH requests (active and inactive).
-   */
-  struct GNUNET_PEERSTORE_WatchContext *watch_tail;
+  struct GNUNET_CONTAINER_MultiHashMap *watches;
 
 };
 
@@ -122,12 +117,6 @@ struct GNUNET_PEERSTORE_StoreContext
    */
   void *cont_cls;
 
-  /**
-   * #GNUNET_YES / #GNUNET_NO
-   * if sent, cannot be canceled
-   */
-  int request_sent;
-
 };
 
 /**
@@ -214,6 +203,11 @@ struct GNUNET_PEERSTORE_WatchContext
    */
   void *callback_cls;
 
+  /**
+   * Hash of the combined key
+   */
+  struct GNUNET_HashCode keyhash;
+
   /**
    * #GNUNET_YES / #GNUNET_NO
    * if sent, cannot be canceled
@@ -226,14 +220,6 @@ struct GNUNET_PEERSTORE_WatchContext
 /*******************             DECLARATIONS             *********************/
 /******************************************************************************/
 
-/**
- * When a response for store request is received
- *
- * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
- * @param msg message received, NULL on timeout or fatal error
- */
-void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg);
-
 /**
  * When a response for iterate request is received
  *
@@ -258,12 +244,31 @@ void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
 static void
 reconnect (struct GNUNET_PEERSTORE_Handle *h);
 
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
+ */
+void watch_request_sent (void *cls);
+
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
+ */
+void iterate_request_sent (void *cls);
+
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
+ */
+void store_request_sent (void *cls);
+
 /**
  * MQ message handlers
  */
 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
-    {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)},
-    {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL, sizeof(struct GNUNET_MessageHeader)},
     {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
     {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)},
     {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
@@ -284,24 +289,25 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error)
 }
 
 /**
- * Should be called only after destroying MQ and connection
+ * Iterator over previous watches to resend them
  */
-static void
-cleanup_handle(struct GNUNET_PEERSTORE_Handle *h)
+int rewatch_it(void *cls,
+    const struct GNUNET_HashCode *key,
+    void *value)
 {
-  struct GNUNET_PEERSTORE_StoreContext *sc;
-  struct GNUNET_PEERSTORE_IterateContext *ic;
-
-  while (NULL != (sc = h->store_head))
-  {
-    GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc);
-    GNUNET_free(sc);
-  }
-  while (NULL != (ic = h->iterate_head))
-  {
-    GNUNET_CONTAINER_DLL_remove(h->iterate_head, h->iterate_tail, ic);
-    GNUNET_free(ic);
+  struct GNUNET_PEERSTORE_Handle *h = cls;
+  struct GNUNET_PEERSTORE_WatchContext *wc = value;
+  struct StoreKeyHashMessage *hm;
+
+  if(GNUNET_YES == wc->request_sent)
+  { /* Envelope gone, create new one. */
+    wc->ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
+    hm->keyhash = wc->keyhash;
+    wc->request_sent = GNUNET_NO;
   }
+  GNUNET_MQ_notify_sent(wc->ev, &watch_request_sent, wc);
+  GNUNET_MQ_send(h->mq, wc->ev);
+  return GNUNET_YES;
 }
 
 /**
@@ -312,6 +318,10 @@ cleanup_handle(struct GNUNET_PEERSTORE_Handle *h)
 static void
 reconnect (struct GNUNET_PEERSTORE_Handle *h)
 {
+  struct GNUNET_PEERSTORE_IterateContext *ic;
+  GNUNET_PEERSTORE_Processor icb;
+  void *icb_cls;
+  struct GNUNET_PEERSTORE_StoreContext *sc;
 
   LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
   if (NULL != h->mq)
@@ -324,13 +334,44 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h)
     GNUNET_CLIENT_disconnect (h->client);
     h->client = NULL;
   }
-  cleanup_handle(h);
   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
+  GNUNET_assert(NULL != h->client);
   h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
       mq_handlers,
       &handle_client_error,
       h);
-
+  LOG(GNUNET_ERROR_TYPE_DEBUG,
+      "Resending pending requests after reconnect.\n");
+  if (NULL != h->watches)
+  {
+    GNUNET_CONTAINER_multihashmap_iterate(h->watches,
+        &rewatch_it, h);
+  }
+  ic = h->iterate_head;
+  while (NULL != ic)
+  {
+    if (GNUNET_YES == ic->request_sent)
+    {
+      icb = ic->callback;
+      icb_cls = ic->callback_cls;
+      GNUNET_PEERSTORE_iterate_cancel(ic);
+      if(NULL != icb)
+        icb(icb_cls, NULL,_("Iteration canceled due to reconnection."));
+    }
+    else
+    {
+      GNUNET_MQ_notify_sent(ic->ev, &iterate_request_sent, ic);
+      GNUNET_MQ_send(h->mq, ic->ev);
+    }
+    ic = ic->next;
+  }
+  sc = h->store_head;
+  while (NULL != sc)
+  {
+    GNUNET_MQ_notify_sent(sc->ev, &store_request_sent, sc);
+    GNUNET_MQ_send(h->mq, sc->ev);
+    sc = sc->next;
+  }
 }
 
 /**
@@ -373,6 +414,12 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 void
 GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
 {
+  LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
+  if(NULL != h->watches)
+  {
+    GNUNET_CONTAINER_multihashmap_destroy(h->watches);
+    h->watches = NULL;
+  }
   if(NULL != h->mq)
   {
     GNUNET_MQ_destroy(h->mq);
@@ -383,7 +430,6 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
     GNUNET_CLIENT_disconnect (h->client);
     h->client = NULL;
   }
-  cleanup_handle(h);
   GNUNET_free(h);
   LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnected, BYE!\n");
 }
@@ -393,49 +439,6 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
 /*******************            STORE FUNCTIONS           *********************/
 /******************************************************************************/
 
-/**
- * When a response for store request is received
- *
- * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
- * @param msg message received, NULL on timeout or fatal error
- */
-void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg)
-{
-  struct GNUNET_PEERSTORE_Handle *h = cls;
-  struct GNUNET_PEERSTORE_StoreContext *sc;
-  uint16_t msg_type;
-  GNUNET_PEERSTORE_Continuation cont;
-  void *cont_cls;
-
-  sc = h->store_head;
-  if(NULL == sc)
-  {
-    LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected store response, this should not happen.\n");
-    reconnect(h);
-    return;
-  }
-  cont = sc->cont;
-  cont_cls = sc->cont_cls;
-  GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc);
-  GNUNET_free(sc);
-  if(NULL == msg) /* Connection error */
-  {
-    if(NULL != cont)
-      cont(cont_cls, GNUNET_SYSERR);
-    reconnect(h);
-    return;
-  }
-  if(NULL != cont) /* Run continuation */
-  {
-    msg_type = ntohs(msg->type);
-    if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type)
-      cont(cont_cls, GNUNET_OK);
-    else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type)
-      cont(cont_cls, GNUNET_SYSERR);
-  }
-
-}
-
 /**
  * Callback after MQ envelope is sent
  *
@@ -444,9 +447,15 @@ void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg)
 void store_request_sent (void *cls)
 {
   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
+  GNUNET_PEERSTORE_Continuation cont;
+  void *cont_cls;
 
-  sc->request_sent = GNUNET_YES;
   sc->ev = NULL;
+  cont = sc->cont;
+  cont_cls = sc->cont_cls;
+  GNUNET_PEERSTORE_store_cancel(sc);
+  if(NULL != cont)
+    cont(cont_cls, GNUNET_OK);
 }
 
 /**
@@ -457,27 +466,19 @@ void store_request_sent (void *cls)
 void
 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
 {
-  LOG(GNUNET_ERROR_TYPE_DEBUG,
-          "Canceling store request.\n");
-  if(GNUNET_NO == sc->request_sent)
+  if(NULL != sc->ev)
   {
-    if(NULL != sc->ev)
-    {
-      GNUNET_MQ_send_cancel(sc->ev);
-      sc->ev = NULL;
-    }
-    GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
-    GNUNET_free(sc);
-  }
-  else
-  { /* request already sent, will have to wait for response */
-    sc->cont = NULL;
+    GNUNET_MQ_send_cancel(sc->ev);
+    sc->ev = NULL;
   }
-
+  GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
+  GNUNET_free(sc);
 }
 
 /**
- * Store a new entry in the PEERSTORE
+ * Store a new entry in the PEERSTORE.
+ * Note that stored entries can be lost in some cases
+ * such as power failure.
  *
  * @param h Handle to the PEERSTORE service
  * @param sub_system name of the sub system
@@ -497,10 +498,11 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
     const void *value,
     size_t size,
     struct GNUNET_TIME_Absolute expiry,
+    enum GNUNET_PEERSTORE_StoreOption options,
     GNUNET_PEERSTORE_Continuation cont,
     void *cont_cls)
 {
-  struct GNUNET_MQ_Envelope *ev; //FIXME: add 'replace' flag in store function (similar to multihashmap)
+  struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_PEERSTORE_StoreContext *sc;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -512,14 +514,14 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
       value,
       size,
       &expiry,
+      options,
       GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
   sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
   sc->ev = ev;
   sc->cont = cont;
   sc->cont_cls = cont_cls;
   sc->h = h;
-  sc->request_sent = GNUNET_NO;
-  GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
+  GNUNET_CONTAINER_DLL_insert_tail(h->store_head, h->store_tail, sc);
   GNUNET_MQ_notify_sent(ev, &store_request_sent, sc);
   GNUNET_MQ_send(h->mq, ev);
   return sc;
@@ -567,6 +569,7 @@ void handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
   msg_type = ntohs(msg->type);
   if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
   {
+    ic->request_sent = GNUNET_NO;
     GNUNET_PEERSTORE_iterate_cancel(ic);
     if(NULL != callback)
       callback(callback_cls, NULL, NULL);
@@ -576,9 +579,12 @@ void handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
   {
     record = PEERSTORE_parse_record_message(msg);
     if(NULL == record)
-      continue_iter = callback(callback_cls, record, _("Received a malformed response from service."));
+      continue_iter = callback(callback_cls, NULL, _("Received a malformed response from service."));
     else
+    {
       continue_iter = callback(callback_cls, record, NULL);
+      PEERSTORE_destroy_record(record);
+    }
     if(GNUNET_NO == continue_iter)
       ic->callback = NULL;
   }
@@ -621,7 +627,6 @@ void iterate_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 void
 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
 {
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling iterate request.\n");
   if(GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
   {
     GNUNET_SCHEDULER_cancel(ic->timeout_task);
@@ -655,7 +660,7 @@ GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
  */
 struct GNUNET_PEERSTORE_IterateContext *
 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
-    char *sub_system,
+    const char *sub_system,
     const struct GNUNET_PeerIdentity *peer,
     const char *key,
     struct GNUNET_TIME_Relative timeout,
@@ -670,6 +675,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
       NULL,
       0,
       NULL,
+      0,
       GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
   ic = GNUNET_new(struct GNUNET_PEERSTORE_IterateContext);
   ic->callback = callback;
@@ -677,7 +683,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
   ic->ev = ev;
   ic->h = h;
   ic->request_sent = GNUNET_NO;
-  GNUNET_CONTAINER_DLL_insert(h->iterate_head, h->iterate_tail, ic);
+  GNUNET_CONTAINER_DLL_insert_tail(h->iterate_head, h->iterate_tail, ic);
   LOG(GNUNET_ERROR_TYPE_DEBUG,
         "Sending an iterate request for sub system `%s'\n", sub_system);
   GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic);
@@ -698,7 +704,26 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
  */
 void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
 {
+  struct GNUNET_PEERSTORE_Handle *h = cls;
+  struct GNUNET_PEERSTORE_Record *record;
+  struct GNUNET_HashCode keyhash;
+  struct GNUNET_PEERSTORE_WatchContext *wc;
 
+  if(NULL == msg)
+  {
+    LOG(GNUNET_ERROR_TYPE_ERROR,
+        "Problem receiving a watch response, no way to determine which request.\n");
+    reconnect(h);
+    return;
+  }
+  LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
+  record = PEERSTORE_parse_record_message(msg);
+  PEERSTORE_hash_key(record->sub_system,
+      record->peer, record->key, &keyhash);
+  wc = GNUNET_CONTAINER_multihashmap_get(h->watches, &keyhash);
+  if(NULL != wc->callback)
+    wc->callback(wc->callback_cls, record, NULL);
+  PEERSTORE_destroy_record(record);
 }
 
 /**
@@ -714,6 +739,36 @@ void watch_request_sent (void *cls)
   wc->ev = NULL;
 }
 
+/**
+ * Cancel a watch request
+ *
+ * @wc handle to the watch request
+ */
+void
+GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc)
+{
+  struct GNUNET_PEERSTORE_Handle *h = wc->h;
+  struct GNUNET_MQ_Envelope *ev;
+  struct StoreKeyHashMessage *hm;
+
+  LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
+  if(GNUNET_YES == wc->request_sent) /* If request already sent to service, send a cancel request. */
+  {
+    ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
+    GNUNET_MQ_send(h->mq, ev);
+    wc->callback = NULL;
+    wc->callback_cls = NULL;
+  }
+  if(NULL != wc->ev)
+  {
+    GNUNET_MQ_send_cancel(wc->ev);
+    wc->ev = NULL;
+  }
+  GNUNET_CONTAINER_multihashmap_remove(h->watches, &wc->keyhash, wc);
+  GNUNET_free(wc);
+
+}
+
 /**
  * Request watching a given key
  * User will be notified with any new values added to key
@@ -728,30 +783,31 @@ void watch_request_sent (void *cls)
  */
 struct GNUNET_PEERSTORE_WatchContext *
 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
-    char *sub_system,
+    const char *sub_system,
     const struct GNUNET_PeerIdentity *peer,
     const char *key,
     GNUNET_PEERSTORE_Processor callback, void *callback_cls)
 {
   struct GNUNET_MQ_Envelope *ev;
+  struct StoreKeyHashMessage *hm;
   struct GNUNET_PEERSTORE_WatchContext *wc;
 
-  ev = PEERSTORE_create_record_mq_envelope(sub_system,
-      peer,
-      key,
-      NULL,
-      0,
-      NULL,
-      GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
+  ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
+  PEERSTORE_hash_key(sub_system, peer, key, &hm->keyhash);
   wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext);
   wc->callback = callback;
   wc->callback_cls = callback_cls;
   wc->ev = ev;
   wc->h = h;
   wc->request_sent = GNUNET_NO;
-  GNUNET_CONTAINER_DLL_insert(h->watch_head, h->watch_tail, wc);
+  wc->keyhash = hm->keyhash;
+  if(NULL == h->watches)
+    h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO);
+  GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash,
+      wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
   LOG(GNUNET_ERROR_TYPE_DEBUG,
-      "Sending a watch request for sub system `%s'.\n", sub_system);
+      "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
+      sub_system, GNUNET_i2s(peer), key);
   GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc);
   GNUNET_MQ_send(h->mq, ev);
   return wc;