convert fs publish to MQ
[oweals/gnunet.git] / src / peerstore / gnunet-service-peerstore.c
index ed5b14eb9b70e4998e303f9ac13067a27c7ce635..af6438bb253c1496f58a14ea0884ad4f7ad734f0 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C)
+     Copyright (C) 2014, 2015 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
@@ -90,6 +90,11 @@ static struct ClientEntry *client_head;
  */
 static struct ClientEntry *client_tail;
 
+/**
+ * Task run to clean up expired records.
+ */
+static struct GNUNET_SCHEDULER_Task *expire_task;
+
 /**
  * Are we in the process of shutting down the service? #GNUNET_YES / #GNUNET_NO
  */
@@ -117,6 +122,11 @@ do_shutdown ()
     GNUNET_CONTAINER_multihashmap_destroy (watchers);
     watchers = NULL;
   }
+  if (NULL != expire_task)
+  {
+    GNUNET_SCHEDULER_cancel (expire_task);
+    expire_task = NULL;
+  }
   GNUNET_SCHEDULER_shutdown ();
 }
 
@@ -125,10 +135,9 @@ do_shutdown ()
  * Task run during shutdown.
  *
  * @param cls unused
- * @param tc unused
  */
 static void
-shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls)
 {
   in_shutdown = GNUNET_YES;
   if (NULL == client_head)      /* Only when no connected clients. */
@@ -136,25 +145,53 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 }
 
 
+/* Forward declaration */
+static void
+expire_records_continuation (void *cls, int success);
+
+
 /**
  * Deletes any expired records from storage
  */
 static void
-cleanup_expired_records (void *cls,
-                         const struct GNUNET_SCHEDULER_TaskContext *tc)
+cleanup_expired_records (void *cls)
 {
-  int deleted;
+  int ret;
 
-  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    return;
+  expire_task = NULL;
   GNUNET_assert (NULL != db);
-  deleted = db->expire_records (db->cls, GNUNET_TIME_absolute_get ());
-  if (deleted > 0)
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted);
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
-                                (GNUNET_TIME_UNIT_SECONDS,
-                                 EXPIRED_RECORDS_CLEANUP_INTERVAL),
-                                &cleanup_expired_records, NULL);
+  ret = db->expire_records (db->cls, GNUNET_TIME_absolute_get (),
+                           &expire_records_continuation, NULL);
+  if (GNUNET_OK != ret)
+  {
+    GNUNET_assert (NULL == expire_task);
+    expire_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+                                               (GNUNET_TIME_UNIT_SECONDS,
+                                                EXPIRED_RECORDS_CLEANUP_INTERVAL),
+                                               &cleanup_expired_records, NULL);
+  }
+}
+
+
+/**
+ * Continuation to expire_records called by the peerstore plugin
+ *
+ * @param cls unused
+ * @param success count of records deleted or #GNUNET_SYSERR
+ */
+static void
+expire_records_continuation (void *cls,
+                            int success)
+{
+  if (success > 0)
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               "%d records expired.\n",
+               success);
+  GNUNET_assert (NULL == expire_task);
+  expire_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+                                             (GNUNET_TIME_UNIT_SECONDS,
+                                              EXPIRED_RECORDS_CLEANUP_INTERVAL),
+                                             &cleanup_expired_records, NULL);
 }
 
 
@@ -214,22 +251,37 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
  * @param emsg error message or NULL if no errors
  * @return #GNUNET_YES to continue iteration
  */
-static int
-record_iterator (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
+static void
+record_iterator (void *cls, const struct GNUNET_PEERSTORE_Record *record,
+                 const char *emsg)
 {
-  struct GNUNET_SERVER_Client *client = cls;
+  struct GNUNET_PEERSTORE_Record *cls_record = cls;
   struct StoreRecordMessage *srm;
 
+  if (NULL == record)
+  {
+    /* No more records */
+    struct GNUNET_MessageHeader endmsg;
+
+    endmsg.size = htons (sizeof (struct GNUNET_MessageHeader));
+    endmsg.type = htons (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
+    GNUNET_SERVER_notification_context_unicast (nc, cls_record->client, &endmsg,
+                                                GNUNET_NO);
+    GNUNET_SERVER_receive_done (cls_record->client,
+                                NULL == emsg ? GNUNET_OK : GNUNET_SYSERR);
+    PEERSTORE_destroy_record (cls_record);
+    return;
+  }
+
   srm =
       PEERSTORE_create_record_message (record->sub_system, record->peer,
                                        record->key, record->value,
                                        record->value_size, record->expiry,
                                        GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
-  GNUNET_SERVER_notification_context_unicast (nc, client,
+  GNUNET_SERVER_notification_context_unicast (nc, cls_record->client,
                                               (struct GNUNET_MessageHeader *)
                                               srm, GNUNET_NO);
   GNUNET_free (srm);
-  return GNUNET_YES;
 }
 
 
@@ -334,7 +386,6 @@ handle_iterate (void *cls, struct GNUNET_SERVER_Client *client,
                 const struct GNUNET_MessageHeader *message)
 {
   struct GNUNET_PEERSTORE_Record *record;
-  struct GNUNET_MessageHeader *endmsg;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received an iterate request.\n");
   record = PEERSTORE_parse_record_message (message);
@@ -358,21 +409,32 @@ handle_iterate (void *cls, struct GNUNET_SERVER_Client *client,
               (NULL == record->peer) ? "NULL" : GNUNET_i2s (record->peer),
               (NULL == record->key) ? "NULL" : record->key);
   GNUNET_SERVER_notification_context_add (nc, client);
-  if (GNUNET_OK ==
+  record->client = client;
+  if (GNUNET_OK !=
       db->iterate_records (db->cls, record->sub_system, record->peer,
-                           record->key, &record_iterator, client))
+                           record->key, &record_iterator, record))
   {
-    endmsg = GNUNET_new (struct GNUNET_MessageHeader);
-
-    endmsg->size = htons (sizeof (struct GNUNET_MessageHeader));
-    endmsg->type = htons (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
-    GNUNET_SERVER_notification_context_unicast (nc, client, endmsg, GNUNET_NO);
-    GNUNET_free (endmsg);
-    GNUNET_SERVER_receive_done (client, GNUNET_OK);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    PEERSTORE_destroy_record (record);
   }
-  else
+}
+
+
+/**
+ * Continuation of store_record called by the peerstore plugin
+ *
+ * @param cls closure
+ * @param success result
+ */
+static void
+store_record_continuation (void *cls, int success)
+{
+  struct GNUNET_PEERSTORE_Record *record = cls;
+
+  GNUNET_SERVER_receive_done (record->client, success);
+  if (GNUNET_OK == success)
   {
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    watch_notifier (record);
   }
   PEERSTORE_destroy_record (record);
 }
@@ -410,28 +472,24 @@ handle_store (void *cls, struct GNUNET_SERVER_Client *client,
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Received a store request (size: %lu).\n"
-              " Sub system `%s'\n"
-              " Peer `%s'\n"
-              " Key `%s'\n"
-              " Value size %lu\n"
-              " Options: %d.\n",
-              record->value_size, record->sub_system, GNUNET_i2s (record->peer),
-              record->key, record->value_size, ntohl (srm->options));
+              "Received a store request (size: %lu).\n" " Sub system `%s'\n"
+              " Peer `%s'\n" " Key `%s'\n" " Value size %lu\n"
+              " Options: %d.\n", record->value_size, record->sub_system,
+              GNUNET_i2s (record->peer), record->key, record->value_size,
+              ntohl (srm->options));
+  record->client = client;
   if (GNUNET_OK !=
       db->store_record (db->cls, record->sub_system, record->peer, record->key,
                         record->value, record->value_size, *record->expiry,
-                        ntohl (srm->options)))
+                        ntohl (srm->options), store_record_continuation,
+                        record))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("Failed to store requested value, sqlite database error."));
+                _("Failed to store requested value, database error."));
     PEERSTORE_destroy_record (record);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
   }
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-  watch_notifier (record);
-  PEERSTORE_destroy_record (record);
 }
 
 
@@ -517,18 +575,20 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
   if (NULL == db)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("Could not load database backend `%s'\n"), db_lib_name);
+                _("Could not load database backend `%s'\n"),
+               db_lib_name);
     GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
     return;
   }
   nc = GNUNET_SERVER_notification_context_create (server, 16);
   watchers = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
-  GNUNET_SCHEDULER_add_now (&cleanup_expired_records, NULL);
+  expire_task = GNUNET_SCHEDULER_add_now (&cleanup_expired_records,
+                                         NULL);
   GNUNET_SERVER_add_handlers (server, handlers);
   GNUNET_SERVER_connect_notify (server, &handle_client_connect, NULL);
   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
-                                NULL);
+  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+                                NULL);
 }