migrate peerstore to new service MQ API
authorChristian Grothoff <christian@grothoff.org>
Sat, 24 Sep 2016 11:58:31 +0000 (11:58 +0000)
committerChristian Grothoff <christian@grothoff.org>
Sat, 24 Sep 2016 11:58:31 +0000 (11:58 +0000)
src/include/gnunet_peerstore_plugin.h
src/include/gnunet_peerstore_service.h
src/peerstore/gnunet-service-peerstore.c
src/peerstore/peerstore.h
src/peerstore/peerstore_api.c
src/peerstore/peerstore_common.c
src/peerstore/peerstore_common.h
src/peerstore/test_peerstore_api_watch.c

index 65359aefd9ef7a3df251a2d5a3f7b47210c6c5d2..1d731f2cc78e531119376f467c32b1cfdda4f50f 100644 (file)
@@ -72,15 +72,15 @@ struct GNUNET_PEERSTORE_PluginFunctions
    */
   int
   (*store_record) (void *cls,
-      const char *sub_system,
-      const struct GNUNET_PeerIdentity *peer,
-      const char *key,
-      const void *value,
-      size_t size,
-      struct GNUNET_TIME_Absolute expiry,
-      enum GNUNET_PEERSTORE_StoreOption options,
-      GNUNET_PEERSTORE_Continuation cont,
-      void *cont_cls);
+                   const char *sub_system,
+                   const struct GNUNET_PeerIdentity *peer,
+                   const char *key,
+                   const void *value,
+                   size_t size,
+                   struct GNUNET_TIME_Absolute expiry,
+                   enum GNUNET_PEERSTORE_StoreOption options,
+                   GNUNET_PEERSTORE_Continuation cont,
+                   void *cont_cls);
 
   /**
    * Iterate over the records given an optional peer id
@@ -98,10 +98,11 @@ struct GNUNET_PEERSTORE_PluginFunctions
    */
   int
   (*iterate_records) (void *cls,
-      const char *sub_system,
-      const struct GNUNET_PeerIdentity *peer,
-      const char *key,
-      GNUNET_PEERSTORE_Processor iter, void *iter_cls);
+                      const char *sub_system,
+                      const struct GNUNET_PeerIdentity *peer,
+                      const char *key,
+                      GNUNET_PEERSTORE_Processor iter,
+                      void *iter_cls);
 
   /**
    * Delete expired records (expiry < now)
@@ -115,9 +116,9 @@ struct GNUNET_PEERSTORE_PluginFunctions
    */
   int
   (*expire_records) (void *cls,
-      struct GNUNET_TIME_Absolute now,
-      GNUNET_PEERSTORE_Continuation cont,
-      void *cont_cls);
+                     struct GNUNET_TIME_Absolute now,
+                     GNUNET_PEERSTORE_Continuation cont,
+                     void *cont_cls);
 
 };
 
index 202e0fd1a0ae3f3cde5ff2c11573a8c64a2f3916..3cafe70b855f45651fd783d461dad9f242ef3464 100644 (file)
@@ -109,9 +109,10 @@ struct GNUNET_PEERSTORE_Record
   struct GNUNET_TIME_Absolute *expiry;
 
   /**
-   * Client from which this record originated
+   * Client from which this record originated.
+   * NOTE: This is internal to the service.
    */
-  struct GNUNET_SERVER_Client *client;
+  struct GNUNET_SERVICE_Client *client;
 };
 
 
index a074d132aa08b8aaaecc8d376994e858493ba16c..07cf78daccd6a864dbcfaf54bd02efccf475c88f 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2014, 2015 GNUnet e.V.
+     Copyright (C) 2014, 2015, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 #include "gnunet_peerstore_plugin.h"
 #include "peerstore_common.h"
 
-/**
- * Connected client entry
- */
-struct ClientEntry
-{
-  /**
-   * DLL.
-   */
-  struct ClientEntry *next;
-
-  /**
-   * DLL.
-   */
-  struct ClientEntry *prev;
-
-  /**
-   * Corresponding server handle.
-   */
-  struct GNUNET_SERVER_Client *client;
-};
 
 /**
  * Interval for expired records cleanup (in seconds)
@@ -75,21 +55,6 @@ static struct GNUNET_PEERSTORE_PluginFunctions *db;
  */
 static struct GNUNET_CONTAINER_MultiHashMap *watchers;
 
-/**
- * Our notification context.
- */
-static struct GNUNET_SERVER_NotificationContext *nc;
-
-/**
- * Head of linked list of connected clients
- */
-static struct ClientEntry *client_head;
-
-/**
- * Tail of linked list of connected clients
- */
-static struct ClientEntry *client_tail;
-
 /**
  * Task run to clean up expired records.
  */
@@ -100,6 +65,12 @@ static struct GNUNET_SCHEDULER_Task *expire_task;
  */
 static int in_shutdown;
 
+/**
+ * Number of connected clients.
+ */
+static unsigned int num_clients;
+
+
 /**
  * Perform the actual shutdown operations
  */
@@ -108,15 +79,12 @@ do_shutdown ()
 {
   if (NULL != db_lib_name)
   {
-    GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name, db));
+    GNUNET_break (NULL ==
+                  GNUNET_PLUGIN_unload (db_lib_name,
+                                        db));
     GNUNET_free (db_lib_name);
     db_lib_name = NULL;
   }
-  if (NULL != nc)
-  {
-    GNUNET_SERVER_notification_context_destroy (nc);
-    nc = NULL;
-  }
   if (NULL != watchers)
   {
     GNUNET_CONTAINER_multihashmap_destroy (watchers);
@@ -140,14 +108,15 @@ static void
 shutdown_task (void *cls)
 {
   in_shutdown = GNUNET_YES;
-  if (NULL == client_head)      /* Only when no connected clients. */
+  if (0 == num_clients)      /* Only when no connected clients. */
     do_shutdown ();
 }
 
 
 /* Forward declaration */
 static void
-expire_records_continuation (void *cls, int success);
+expire_records_continuation (void *cls,
+                             int success);
 
 
 /**
@@ -160,15 +129,18 @@ cleanup_expired_records (void *cls)
 
   expire_task = NULL;
   GNUNET_assert (NULL != db);
-  ret = db->expire_records (db->cls, GNUNET_TIME_absolute_get (),
-                           &expire_records_continuation, NULL);
+  ret = db->expire_records (db->cls,
+                            GNUNET_TIME_absolute_get (),
+                           &expire_records_continuation,
+                            NULL);
   if (GNUNET_OK != ret)
   {
     GNUNET_assert (NULL == expire_task);
     expire_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
                                                (GNUNET_TIME_UNIT_SECONDS,
                                                 EXPIRED_RECORDS_CLEANUP_INTERVAL),
-                                               &cleanup_expired_records, NULL);
+                                               &cleanup_expired_records,
+                                                NULL);
   }
 }
 
@@ -191,23 +163,49 @@ expire_records_continuation (void *cls,
   expire_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
                                              (GNUNET_TIME_UNIT_SECONDS,
                                               EXPIRED_RECORDS_CLEANUP_INTERVAL),
-                                             &cleanup_expired_records, NULL);
+                                             &cleanup_expired_records,
+                                              NULL);
+}
+
+
+/**
+ * A client disconnected.  Remove all of its data structure entries.
+ *
+ * @param cls closure, NULL
+ * @param client identification of the client
+ * @param mq the message queue
+ * @return
+ */
+static void *
+client_connect_cb (void *cls,
+                   struct GNUNET_SERVICE_Client *client,
+                   struct GNUNET_MQ_Handle *mq)
+{
+  num_clients++;
+  return client;
 }
 
 
 /**
  * Search for a disconnected client and remove it
  *
- * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *'
+ * @param cls closuer, a `struct GNUNET_SERVICE_Client`
  * @param key hash of record key
- * @param value the watcher client, a 'struct GNUNET_SERVER_Client *'
+ * @param value the watcher client, a `struct GNUNET_SERVICE_Client *`
  * @return #GNUNET_OK to continue iterating
  */
 static int
-client_disconnect_it (void *cls, const struct GNUNET_HashCode *key, void *value)
+client_disconnect_it (void *cls,
+                      const struct GNUNET_HashCode *key,
+                      void *value)
 {
-  if (cls == value)
-    GNUNET_CONTAINER_multihashmap_remove (watchers, key, value);
+  if (value == cls)
+  {
+    GNUNET_CONTAINER_multihashmap_remove (watchers,
+                                          key,
+                                          value);
+    num_clients++;
+  }
   return GNUNET_OK;
 }
 
@@ -219,26 +217,19 @@ client_disconnect_it (void *cls, const struct GNUNET_HashCode *key, void *value)
  * @param client identification of the client
  */
 static void
-handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+client_disconnect_cb (void *cls,
+                      struct GNUNET_SERVICE_Client *client,
+                      void *app_cls)
 {
-  struct ClientEntry *ce;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "A client disconnected, cleaning up.\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "A client disconnected, cleaning up.\n");
   if (NULL != watchers)
-    GNUNET_CONTAINER_multihashmap_iterate (watchers, &client_disconnect_it,
+    GNUNET_CONTAINER_multihashmap_iterate (watchers,
+                                           &client_disconnect_it,
                                            client);
-  ce = client_head;
-  while (ce != NULL)
-  {
-    if (ce->client == client)
-    {
-      GNUNET_CONTAINER_DLL_remove (client_head, client_tail, ce);
-      GNUNET_free (ce);
-      break;
-    }
-    ce = ce->next;
-  }
-  if (NULL == client_head && in_shutdown)
+  num_clients--;
+  if ( (0 == num_clients) &&
+       in_shutdown)
     do_shutdown ();
 }
 
@@ -252,36 +243,40 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
  * @return #GNUNET_YES to continue iteration
  */
 static void
-record_iterator (void *cls, const struct GNUNET_PEERSTORE_Record *record,
+record_iterator (void *cls,
+                 const struct GNUNET_PEERSTORE_Record *record,
                  const char *emsg)
 {
   struct GNUNET_PEERSTORE_Record *cls_record = cls;
-  struct StoreRecordMessage *srm;
+  struct GNUNET_MQ_Envelope *env;
 
   if (NULL == record)
   {
     /* No more records */
-    struct GNUNET_MessageHeader endmsg;
-
-    endmsg.size = htons (sizeof (struct GNUNET_MessageHeader));
-    endmsg.type = htons (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
-    GNUNET_SERVER_notification_context_unicast (nc, cls_record->client, &endmsg,
-                                                GNUNET_NO);
-    GNUNET_SERVER_receive_done (cls_record->client,
-                                NULL == emsg ? GNUNET_OK : GNUNET_SYSERR);
+    struct GNUNET_MessageHeader *endmsg;
+
+    env = GNUNET_MQ_msg (endmsg,
+                         GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
+    GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client),
+                    env);
+    if (NULL == emsg)
+      GNUNET_SERVICE_client_continue (cls_record->client);
+    else
+      GNUNET_SERVICE_client_drop (cls_record->client);
     PEERSTORE_destroy_record (cls_record);
     return;
   }
 
-  srm =
-      PEERSTORE_create_record_message (record->sub_system, record->peer,
-                                       record->key, record->value,
-                                       record->value_size, record->expiry,
-                                       GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
-  GNUNET_SERVER_notification_context_unicast (nc, cls_record->client,
-                                              (struct GNUNET_MessageHeader *)
-                                              srm, GNUNET_NO);
-  GNUNET_free (srm);
+  env = PEERSTORE_create_record_mq_envelope (record->sub_system,
+                                             record->peer,
+                                             record->key,
+                                             record->value,
+                                             record->value_size,
+                                             record->expiry,
+                                             0,
+                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
+  GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client),
+                  env);
 }
 
 
@@ -289,28 +284,32 @@ record_iterator (void *cls, const struct GNUNET_PEERSTORE_Record *record,
  * Iterator over all watcher clients
  * to notify them of a new record
  *
- * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *'
+ * @param cls closure, a `struct GNUNET_PEERSTORE_Record *`
  * @param key hash of record key
- * @param value the watcher client, a 'struct GNUNET_SERVER_Client *'
+ * @param value the watcher client, a `struct GNUNET_SERVICE_Client *`
  * @return #GNUNET_YES to continue iterating
  */
 static int
-watch_notifier_it (void *cls, const struct GNUNET_HashCode *key, void *value)
+watch_notifier_it (void *cls,
+                   const struct GNUNET_HashCode *key,
+                   void *value)
 {
   struct GNUNET_PEERSTORE_Record *record = cls;
-  struct GNUNET_SERVER_Client *client = value;
-  struct StoreRecordMessage *srm;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
-  srm =
-      PEERSTORE_create_record_message (record->sub_system, record->peer,
-                                       record->key, record->value,
-                                       record->value_size, record->expiry,
-                                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
-  GNUNET_SERVER_notification_context_unicast (nc, client,
-                                              (const struct GNUNET_MessageHeader
-                                               *) srm, GNUNET_NO);
-  GNUNET_free (srm);
+  struct GNUNET_SERVICE_Client *client = value;
+  struct GNUNET_MQ_Envelope *env;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Found a watcher to update.\n");
+  env = PEERSTORE_create_record_mq_envelope (record->sub_system,
+                                             record->peer,
+                                             record->key,
+                                             record->value,
+                                             record->value_size,
+                                             record->expiry,
+                                             0,
+                                             GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
+  GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
+                  env);
   return GNUNET_YES;
 }
 
@@ -325,96 +324,127 @@ watch_notifier (struct GNUNET_PEERSTORE_Record *record)
 {
   struct GNUNET_HashCode keyhash;
 
-  PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash);
-  GNUNET_CONTAINER_multihashmap_get_multiple (watchers, &keyhash,
-                                              &watch_notifier_it, record);
+  PEERSTORE_hash_key (record->sub_system,
+                      record->peer,
+                      record->key,
+                      &keyhash);
+  GNUNET_CONTAINER_multihashmap_get_multiple (watchers,
+                                              &keyhash,
+                                              &watch_notifier_it,
+                                              record);
 }
 
 
 /**
  * Handle a watch cancel request from client
  *
- * @param cls unused
- * @param client identification of the client
- * @param message the actual message
+ * @param cls identification of the client
+ * @param hm the actual message
  */
 static void
-handle_watch_cancel (void *cls, struct GNUNET_SERVER_Client *client,
-                     const struct GNUNET_MessageHeader *message)
+handle_watch_cancel (void *cls,
+                     const struct StoreKeyHashMessage *hm)
 {
-  struct StoreKeyHashMessage *hm;
+  struct GNUNET_SERVICE_Client *client = cls;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request.\n");
-  hm = (struct StoreKeyHashMessage *) message;
-  GNUNET_CONTAINER_multihashmap_remove (watchers, &hm->keyhash, client);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received a watch cancel request.\n");
+  if (GNUNET_OK !=
+      GNUNET_CONTAINER_multihashmap_remove (watchers,
+                                            &hm->keyhash,
+                                            client))
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (client);
+    return;
+  }
+  num_clients++;
+  GNUNET_SERVICE_client_continue (client);
 }
 
 
 /**
  * Handle a watch request from client
  *
- * @param cls unused
- * @param client identification of the client
- * @param message the actual message
+ * @param cls identification of the client
+ * @param hm the actual message
  */
 static void
-handle_watch (void *cls, struct GNUNET_SERVER_Client *client,
-              const struct GNUNET_MessageHeader *message)
+handle_watch (void *cls,
+              const struct StoreKeyHashMessage *hm)
 {
-  struct StoreKeyHashMessage *hm;
+  struct GNUNET_SERVICE_Client *client = cls;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received a watch request.\n");
-  hm = (struct StoreKeyHashMessage *) message;
-  GNUNET_SERVER_client_mark_monitor (client);
-  GNUNET_SERVER_notification_context_add (nc, client);
-  GNUNET_CONTAINER_multihashmap_put (watchers, &hm->keyhash, client,
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received a watch request.\n");
+  num_clients--; /* do not count watchers */
+  GNUNET_SERVICE_client_mark_monitor (client);
+  GNUNET_CONTAINER_multihashmap_put (watchers,
+                                     &hm->keyhash,
+                                     client,
                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVICE_client_continue (client);
 }
 
 
 /**
- * Handle an iterate request from client
+ * Check an iterate request from client
  *
- * @param cls unused
- * @param client identification of the client
- * @param message the actual message
+ * @param cls client identification of the client
+ * @param srm the actual message
+ * @return #GNUNET_OK if @a srm is well-formed
  */
-static void
-handle_iterate (void *cls, struct GNUNET_SERVER_Client *client,
-                const struct GNUNET_MessageHeader *message)
+static int
+check_iterate (void *cls,
+               const struct StoreRecordMessage *srm)
 {
   struct GNUNET_PEERSTORE_Record *record;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received an iterate request.\n");
-  record = PEERSTORE_parse_record_message (message);
+  record = PEERSTORE_parse_record_message (srm);
   if (NULL == record)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Malformed iterate request.\n"));
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-    return;
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
   if (NULL == record->sub_system)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("Sub system not supplied in client iterate request.\n"));
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_break (0);
     PEERSTORE_destroy_record (record);
-    return;
+    return GNUNET_SYSERR;
   }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle an iterate request from client
+ *
+ * @param cls identification of the client
+ * @param srm the actual message
+ */
+static void
+handle_iterate (void *cls,
+                const struct StoreRecordMessage *srm)
+{
+  struct GNUNET_SERVICE_Client *client = cls;
+  struct GNUNET_PEERSTORE_Record *record;
+
+  record = PEERSTORE_parse_record_message (srm);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Iterate request: ss `%s', peer `%s', key `%s'\n",
               record->sub_system,
               (NULL == record->peer) ? "NULL" : GNUNET_i2s (record->peer),
               (NULL == record->key) ? "NULL" : record->key);
-  GNUNET_SERVER_notification_context_add (nc, client);
   record->client = client;
   if (GNUNET_OK !=
-      db->iterate_records (db->cls, record->sub_system, record->peer,
-                           record->key, &record_iterator, record))
+      db->iterate_records (db->cls,
+                           record->sub_system,
+                           record->peer,
+                           record->key,
+                           &record_iterator,
+                           record))
   {
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVICE_client_drop (client);
     PEERSTORE_destroy_record (record);
   }
 }
@@ -427,186 +457,174 @@ handle_iterate (void *cls, struct GNUNET_SERVER_Client *client,
  * @param success result
  */
 static void
-store_record_continuation (void *cls, int success)
+store_record_continuation (void *cls,
+                           int success)
 {
   struct GNUNET_PEERSTORE_Record *record = cls;
 
-  GNUNET_SERVER_receive_done (record->client, success);
   if (GNUNET_OK == success)
   {
     watch_notifier (record);
+    GNUNET_SERVICE_client_continue (record->client);
+  }
+  else
+  {
+    GNUNET_SERVICE_client_drop (record->client);
   }
   PEERSTORE_destroy_record (record);
 }
 
 
 /**
- * Handle a store request from client
+ * Check a store request from client
  *
- * @param cls unused
- * @param client identification of the client
- * @param message the actual message
+ * @param cls client identification of the client
+ * @param srm the actual message
+ * @return #GNUNET_OK if @a srm is well-formed
  */
-static void
-handle_store (void *cls, struct GNUNET_SERVER_Client *client,
-              const struct GNUNET_MessageHeader *message)
+static int
+check_store (void *cls,
+              const struct StoreRecordMessage *srm)
 {
   struct GNUNET_PEERSTORE_Record *record;
-  struct StoreRecordMessage *srm;
 
-  record = PEERSTORE_parse_record_message (message);
+  record = PEERSTORE_parse_record_message (srm);
   if (NULL == record)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("Malformed store request from client\n"));
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-    return;
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
-  srm = (struct StoreRecordMessage *) message;
-  if (NULL == record->sub_system || NULL == record->peer || NULL == record->key)
+  if ( (NULL == record->sub_system) ||
+       (NULL == record->peer) ||
+       (NULL == record->key) )
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("Full key not supplied in client store request\n"));
+    GNUNET_break (0);
     PEERSTORE_destroy_record (record);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-    return;
+    return GNUNET_SYSERR;
   }
+  PEERSTORE_destroy_record (record);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle a store request from client
+ *
+ * @param cls client identification of the client
+ * @param srm the actual message
+ */
+static void
+handle_store (void *cls,
+              const struct StoreRecordMessage *srm)
+{
+  struct GNUNET_SERVICE_Client *client = cls;
+  struct GNUNET_PEERSTORE_Record *record;
+
+  record = PEERSTORE_parse_record_message (srm);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Received a store request (size: %u). Sub system `%s' Peer `%s Key `%s' Options: %d.\n",
-             (unsigned int) record->value_size,
+              "Received a store request. Sub system `%s' Peer `%s Key `%s' Options: %d.\n",
              record->sub_system,
               GNUNET_i2s (record->peer),
              record->key,
               ntohl (srm->options));
   record->client = client;
   if (GNUNET_OK !=
-      db->store_record (db->cls, record->sub_system, record->peer, record->key,
-                        record->value, record->value_size, *record->expiry,
-                        ntohl (srm->options), store_record_continuation,
+      db->store_record (db->cls,
+                        record->sub_system,
+                        record->peer,
+                        record->key,
+                        record->value,
+                        record->value_size,
+                        *record->expiry,
+                        ntohl (srm->options),
+                        &store_record_continuation,
                         record))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("Failed to store requested value, database error."));
     PEERSTORE_destroy_record (record);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVICE_client_drop (client);
     return;
   }
 }
 
 
-/**
- * Creates an entry for a new client or returns it if it already exists.
- *
- * @param client Client handle
- * @return Client entry struct
- */
-static struct ClientEntry *
-make_client_entry (struct GNUNET_SERVER_Client *client)
-{
-  struct ClientEntry *ce;
-
-  ce = client_head;
-  while (NULL != ce)
-  {
-    if (ce->client == client)
-      return ce;
-    ce = ce->next;
-  }
-  if (GNUNET_YES == in_shutdown)
-  {
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-    return NULL;
-  }
-  ce = GNUNET_new (struct ClientEntry);
-  ce->client = client;
-  GNUNET_CONTAINER_DLL_insert (client_head, client_tail, ce);
-  return ce;
-}
-
-
-/**
- * Callback on a new client connection
- *
- * @param cls closure (unused)
- * @param client identification of the client
- */
-static void
-handle_client_connect (void *cls, struct GNUNET_SERVER_Client *client)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New client connection created.\n");
-  make_client_entry (client);
-}
-
-
 /**
  * Peerstore service runner.
  *
  * @param cls closure
- * @param server the initialized server
  * @param c configuration to use
+ * @param service the initialized service
  */
 static void
-run (void *cls, struct GNUNET_SERVER_Handle *server,
-     const struct GNUNET_CONFIGURATION_Handle *c)
+run (void *cls,
+     const struct GNUNET_CONFIGURATION_Handle *c,
+     struct GNUNET_SERVICE_Handle *service)
 {
-  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
-    {&handle_store, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, 0},
-    {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0},
-    {&handle_watch, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH,
-     sizeof (struct StoreKeyHashMessage)},
-    {&handle_watch_cancel, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL,
-     sizeof (struct StoreKeyHashMessage)},
-    {NULL, NULL, 0, 0}
-  };
   char *database;
 
   in_shutdown = GNUNET_NO;
   cfg = c;
   if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_string (cfg, "peerstore", "DATABASE",
+      GNUNET_CONFIGURATION_get_value_string (cfg,
+                                             "peerstore",
+                                             "DATABASE",
                                              &database))
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("No database backend configured\n"));
-
-  else
   {
-    GNUNET_asprintf (&db_lib_name, "libgnunet_plugin_peerstore_%s", database);
-    db = GNUNET_PLUGIN_load (db_lib_name, (void *) cfg);
-    GNUNET_free (database);
+    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
+                               "peerstore",
+                               "DATABASE");
+    GNUNET_SCHEDULER_shutdown ();
+    return;
   }
+  GNUNET_asprintf (&db_lib_name,
+                   "libgnunet_plugin_peerstore_%s",
+                   database);
+  db = GNUNET_PLUGIN_load (db_lib_name,
+                           (void *) cfg);
+  GNUNET_free (database);
   if (NULL == db)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 _("Could not load database backend `%s'\n"),
                db_lib_name);
-    GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
+    GNUNET_SCHEDULER_shutdown ();
     return;
   }
-  nc = GNUNET_SERVER_notification_context_create (server, 16);
-  watchers = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
+  watchers = GNUNET_CONTAINER_multihashmap_create (10,
+                                                   GNUNET_NO);
   expire_task = GNUNET_SCHEDULER_add_now (&cleanup_expired_records,
                                          NULL);
-  GNUNET_SERVER_add_handlers (server, handlers);
-  GNUNET_SERVER_connect_notify (server, &handle_client_connect, NULL);
-  GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
                                 NULL);
 }
 
 
 /**
- * The main function for the peerstore service.
- *
- * @param argc number of arguments from the command line
- * @param argv command line arguments
- * @return 0 ok, 1 on error
+ * Define "main" method using service macro.
  */
-int
-main (int argc, char *const *argv)
-{
-  return (GNUNET_OK ==
-          GNUNET_SERVICE_run (argc, argv, "peerstore",
-                              GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN, &run,
-                              NULL)) ? 0 : 1;
-}
+GNUNET_SERVICE_MAIN
+("peerstore",
+ GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
+ &run,
+ &client_connect_cb,
+ &client_disconnect_cb,
+ NULL,
+ GNUNET_MQ_hd_var_size (store,
+                        GNUNET_MESSAGE_TYPE_PEERSTORE_STORE,
+                        struct StoreRecordMessage,
+                        NULL),
+ GNUNET_MQ_hd_var_size (iterate,
+                        GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE,
+                        struct StoreRecordMessage,
+                        NULL),
+ GNUNET_MQ_hd_fixed_size (watch,
+                         GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH,
+                         struct StoreKeyHashMessage,
+                         NULL),
+ GNUNET_MQ_hd_fixed_size (watch_cancel,
+                         GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL,
+                         struct StoreKeyHashMessage,
+                         NULL),
+ GNUNET_MQ_handler_end ());
+
 
 /* end of gnunet-service-peerstore.c */
index f5e2cd19ca02571a709714a08a74cd3788357c64..8b3c4dd92bd3004497b83ca838e177f9f6540b4c 100644 (file)
@@ -33,7 +33,7 @@ GNUNET_NETWORK_STRUCT_BEGIN
 /**
  * Message carrying a PEERSTORE record message
  */
-    struct StoreRecordMessage
+struct StoreRecordMessage
 {
 
   /**
@@ -78,8 +78,9 @@ GNUNET_NETWORK_STRUCT_BEGIN
    * Options, needed only in case of a
    * store operation
    */
-             uint32_t /* enum GNUNET_PEERSTORE_StoreOption */ options
-             GNUNET_PACKED;
+  uint32_t /* enum GNUNET_PEERSTORE_StoreOption */ options GNUNET_PACKED;
+
+  /* Followed by key and value */
 
 };
 
index f6910c017a1382a87399a3da384f97ba6268cfe0..47bf7775e73e246d7c7a922addb7c5a5432f7f3b 100644 (file)
@@ -579,7 +579,7 @@ handle_iterate_end (void *cls,
  */
 static int
 check_iterate_result (void *cls,
-                      const struct GNUNET_MessageHeader *msg)
+                      const struct StoreRecordMessage *msg)
 {
   /* we defer validation to #handle_iterate_result */
   return GNUNET_OK;
@@ -594,7 +594,7 @@ check_iterate_result (void *cls,
  */
 static void
 handle_iterate_result (void *cls,
-                       const struct GNUNET_MessageHeader *msg)
+                       const struct StoreRecordMessage *msg)
 {
   struct GNUNET_PEERSTORE_Handle *h = cls;
   struct GNUNET_PEERSTORE_IterateContext *ic;
@@ -725,7 +725,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
  */
 static int
 check_watch_record (void *cls,
-                    const struct GNUNET_MessageHeader *msg)
+                    const struct StoreRecordMessage *msg)
 {
   /* we defer validation to #handle_watch_result */
   return GNUNET_OK;
@@ -740,7 +740,7 @@ check_watch_record (void *cls,
  */
 static void
 handle_watch_record (void *cls,
-                     const struct GNUNET_MessageHeader *msg)
+                     const struct StoreRecordMessage *msg)
 {
   struct GNUNET_PEERSTORE_Handle *h = cls;
   struct GNUNET_PEERSTORE_Record *record;
@@ -793,11 +793,11 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h)
                              h),
     GNUNET_MQ_hd_var_size (iterate_result,
                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
-                           struct GNUNET_MessageHeader,
+                           struct StoreRecordMessage,
                            h),
     GNUNET_MQ_hd_var_size (watch_record,
                            GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
-                           struct GNUNET_MessageHeader,
+                           struct StoreRecordMessage,
                            h),
     GNUNET_MQ_handler_end ()
   };
@@ -936,18 +936,20 @@ GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
   wc->h = h;
   wc->keyhash = hm->keyhash;
   if (NULL == h->watches)
-    h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
+    h->watches = GNUNET_CONTAINER_multihashmap_create (5,
+                                                       GNUNET_NO);
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multihashmap_put (h->watches,
                                                     &wc->keyhash,
                                                     wc,
                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
+       "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
        sub_system,
        GNUNET_i2s (peer),
        key);
-  GNUNET_MQ_send (h->mq, ev);
+  GNUNET_MQ_send (h->mq,
+                  ev);
   return wc;
 }
 
index 07d43db2e5b9b379ced922c6bbf51352dd8af3ff..d12c4e21eed17ee3b03ef3ff309442690c559058 100644 (file)
@@ -31,7 +31,8 @@
  */
 void
 PEERSTORE_hash_key (const char *sub_system,
-                    const struct GNUNET_PeerIdentity *peer, const char *key,
+                    const struct GNUNET_PeerIdentity *peer,
+                    const char *key,
                     struct GNUNET_HashCode *ret)
 {
   size_t sssize;
@@ -57,64 +58,6 @@ PEERSTORE_hash_key (const char *sub_system,
 }
 
 
-/**
- * Creates a record message ready to be sent
- *
- * @param sub_system sub system string
- * @param peer Peer identity (can be NULL)
- * @param key record key string (can be NULL)
- * @param value record value BLOB (can be NULL)
- * @param value_size record value size in bytes (set to 0 if value is NULL)
- * @param expiry absolute time after which the record expires
- * @param msg_type message type to be set in header
- * @return pointer to record message struct
- */
-struct StoreRecordMessage *
-PEERSTORE_create_record_message (const char *sub_system,
-                                 const struct GNUNET_PeerIdentity *peer,
-                                 const char *key, const void *value,
-                                 size_t value_size,
-                                 struct GNUNET_TIME_Absolute *expiry,
-                                 uint16_t msg_type)
-{
-  struct StoreRecordMessage *srm;
-  size_t ss_size;
-  size_t key_size;
-  size_t request_size;
-  void *dummy;
-
-  ss_size = strlen (sub_system) + 1;
-  if (NULL == key)
-    key_size = 0;
-  else
-    key_size = strlen (key) + 1;
-  request_size =
-      sizeof (struct StoreRecordMessage) + ss_size + key_size + value_size;
-  srm = GNUNET_malloc (request_size);
-  srm->header.size = htons (request_size);
-  srm->header.type = htons (msg_type);
-  srm->key_size = htons (key_size);
-  if (NULL != expiry)
-    srm->expiry = *expiry;
-  if (NULL == peer)
-    srm->peer_set = htons (GNUNET_NO);
-  else
-  {
-    srm->peer_set = htons (GNUNET_YES);
-    srm->peer = *peer;
-  }
-  srm->sub_system_size = htons (ss_size);
-  srm->value_size = htons (value_size);
-  dummy = &srm[1];
-  GNUNET_memcpy (dummy, sub_system, ss_size);
-  dummy += ss_size;
-  GNUNET_memcpy (dummy, key, key_size);
-  dummy += key_size;
-  GNUNET_memcpy (dummy, value, value_size);
-  return srm;
-}
-
-
 /**
  * Creates a MQ envelope for a single record
  *
@@ -131,7 +74,8 @@ PEERSTORE_create_record_message (const char *sub_system,
 struct GNUNET_MQ_Envelope *
 PEERSTORE_create_record_mq_envelope (const char *sub_system,
                                      const struct GNUNET_PeerIdentity *peer,
-                                     const char *key, const void *value,
+                                     const char *key,
+                                     const void *value,
                                      size_t value_size,
                                      struct GNUNET_TIME_Absolute *expiry,
                                      enum GNUNET_PEERSTORE_StoreOption options,
@@ -178,13 +122,12 @@ PEERSTORE_create_record_mq_envelope (const char *sub_system,
 /**
  * Parses a message carrying a record
  *
- * @param message the actual message
+ * @param srm the actual message
  * @return Pointer to record or NULL if error
  */
 struct GNUNET_PEERSTORE_Record *
-PEERSTORE_parse_record_message (const struct GNUNET_MessageHeader *message)
+PEERSTORE_parse_record_message (const struct StoreRecordMessage *srm)
 {
-  struct StoreRecordMessage *srm;
   struct GNUNET_PEERSTORE_Record *record;
   uint16_t req_size;
   uint16_t ss_size;
@@ -192,37 +135,20 @@ PEERSTORE_parse_record_message (const struct GNUNET_MessageHeader *message)
   uint16_t value_size;
   char *dummy;
 
-  req_size = ntohs (message->size);
-  if (req_size < sizeof (struct StoreRecordMessage))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Received message with invalid size: (%d < %d).\n",
-                (int) req_size,
-                (int) sizeof (struct StoreRecordMessage));
-    return NULL;
-  }
-  srm = (struct StoreRecordMessage *) message;
+  req_size = ntohs (srm->header.size) - sizeof (*srm);
   ss_size = ntohs (srm->sub_system_size);
   key_size = ntohs (srm->key_size);
   value_size = ntohs (srm->value_size);
-  if (ss_size + key_size + value_size + sizeof (struct StoreRecordMessage) !=
-      req_size)
+  if (ss_size + key_size + value_size != req_size)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Received message with invalid sizes: (%d + %d + %d + %d != %d).\n",
-                ss_size,
-                key_size,
-                value_size,
-                (int) sizeof (struct StoreRecordMessage),
-                req_size);
+    GNUNET_break (0);
     return NULL;
   }
   record = GNUNET_new (struct GNUNET_PEERSTORE_Record);
   if (GNUNET_YES == ntohs (srm->peer_set))
   {
     record->peer = GNUNET_new (struct GNUNET_PeerIdentity);
-
-    GNUNET_memcpy (record->peer, &srm->peer, sizeof (struct GNUNET_PeerIdentity));
+    *record->peer = srm->peer;
   }
   record->expiry = GNUNET_new (struct GNUNET_TIME_Absolute);
 
index 4b806bf912d3cc81801297728ec4b75c816314e4..3d938b5da8600e1d12f2eaf91edad35f0146816c 100644 (file)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      Copyright (C)
+      Copyright (C) 2013-2016 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -23,7 +23,6 @@
  * @brief Helper peerstore functions
  * @author Omar Tarabai
  */
-
 #include "platform.h"
 #include "peerstore.h"
 
  */
 void
 PEERSTORE_hash_key (const char *sub_system,
-                    const struct GNUNET_PeerIdentity *peer, const char *key,
+                    const struct GNUNET_PeerIdentity *peer,
+                    const char *key,
                     struct GNUNET_HashCode *ret);
 
-/**
- * Creates a record message ready to be sent
- *
- * @param sub_system sub system string
- * @param peer Peer identity (can be NULL)
- * @param key record key string (can be NULL)
- * @param value record value BLOB (can be NULL)
- * @param value_size record value size in bytes (set to 0 if value is NULL)
- * @param expiry absolute time after which the record expires
- * @param msg_type message type to be set in header
- * @return pointer to record message struct
- */
-struct StoreRecordMessage *
-PEERSTORE_create_record_message (const char *sub_system,
-                                 const struct GNUNET_PeerIdentity *peer,
-                                 const char *key, const void *value,
-                                 size_t value_size,
-                                 struct GNUNET_TIME_Absolute *expiry,
-                                 uint16_t msg_type);
 
 /**
  * Creates a MQ envelope for a single record
@@ -72,20 +53,23 @@ PEERSTORE_create_record_message (const char *sub_system,
 struct GNUNET_MQ_Envelope *
 PEERSTORE_create_record_mq_envelope (const char *sub_system,
                                      const struct GNUNET_PeerIdentity *peer,
-                                     const char *key, const void *value,
+                                     const char *key,
+                                     const void *value,
                                      size_t value_size,
                                      struct GNUNET_TIME_Absolute *expiry,
                                      enum GNUNET_PEERSTORE_StoreOption options,
                                      uint16_t msg_type);
 
+
 /**
  * Parses a message carrying a record
  *
- * @param message the actual message
- * @return Pointer to record or NULL if error
+ * @param srm the actual message
+ * @return Pointer to record or NULL on error
  */
 struct GNUNET_PEERSTORE_Record *
-PEERSTORE_parse_record_message (const struct GNUNET_MessageHeader *message);
+PEERSTORE_parse_record_message (const struct StoreRecordMessage *srm);
+
 
 /**
  * Free any memory allocated for this record
@@ -94,3 +78,5 @@ PEERSTORE_parse_record_message (const struct GNUNET_MessageHeader *message);
  */
 void
 PEERSTORE_destroy_record (struct GNUNET_PEERSTORE_Record *record);
+
+/* end of peerstore_common.h */
index 91902ba9eb84752bce1b74df50ea8619329eada5..1c9995c317d4f478bb7faba8a1c414de8fc0a8c3 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C)
+     Copyright (C) 2013-2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 #include "gnunet_testing_lib.h"
 #include "gnunet_peerstore_service.h"
 
+
 static int ok = 1;
 
 static struct GNUNET_PEERSTORE_Handle *h;
 
 static char *ss = "test_peerstore_api_watch";
-static struct GNUNET_PeerIdentity p;
+
 static char *k = "test_peerstore_api_watch_key";
+
 static char *val = "test_peerstore_api_watch_val";
 
+
 static void
-watch_cb (void *cls, const struct GNUNET_PEERSTORE_Record *record,
+watch_cb (void *cls,
+          const struct GNUNET_PEERSTORE_Record *record,
           const char *emsg)
 {
   GNUNET_assert (NULL == emsg);
-  GNUNET_assert (0 == strcmp (val, (char *) record->value));
+  GNUNET_assert (0 == strcmp (val,
+                              (char *) record->value));
   ok = 0;
-  GNUNET_PEERSTORE_disconnect (h, GNUNET_NO);
+  GNUNET_PEERSTORE_disconnect (h,
+                               GNUNET_NO);
   GNUNET_SCHEDULER_shutdown ();
 }
 
 
 static void
-run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg,
+run (void *cls,
+     const struct GNUNET_CONFIGURATION_Handle *cfg,
      struct GNUNET_TESTING_Peer *peer)
 {
+  struct GNUNET_PeerIdentity p;
+
   h = GNUNET_PEERSTORE_connect (cfg);
   GNUNET_assert (NULL != h);
-  memset (&p, 4, sizeof (p));
-  GNUNET_PEERSTORE_watch (h, ss, &p, k, &watch_cb, NULL);
-  GNUNET_PEERSTORE_store (h, ss, &p, k, val, strlen (val) + 1,
+  memset (&p,
+          4,
+          sizeof (p));
+  GNUNET_PEERSTORE_watch (h,
+                          ss,
+                          &p,
+                          k,
+                          &watch_cb,
+                          NULL);
+  GNUNET_PEERSTORE_store (h,
+                          ss,
+                          &p,
+                          k,
+                          val,
+                          strlen (val) + 1,
                           GNUNET_TIME_UNIT_FOREVER_ABS,
-                          GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL);
+                          GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+                          NULL,
+                          NULL);
 }
 
 
 int
-main (int argc, char *argv[])
+main (int argc,
+      char *argv[])
 {
   if (0 !=
-      GNUNET_TESTING_service_run ("test-gnunet-peerstore", "peerstore",
-                                  "test_peerstore_api_data.conf", &run, NULL))
+      GNUNET_TESTING_service_run ("test-gnunet-peerstore",
+                                  "peerstore",
+                                  "test_peerstore_api_data.conf",
+                                  &run,
+                                  NULL))
     return 1;
   return ok;
 }