- moved timeout handling responsibility from for nat tests from caller to the library
[oweals/gnunet.git] / src / peerstore / gnunet-service-peerstore.c
index f568609714a4e301a788795f7dd7a8dcbface317..140db80d8795714b5dbbfa534c696be9bf575066 100644 (file)
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "peerstore.h"
+#include "gnunet_peerstore_plugin.h"
+#include "peerstore_common.h"
+
+/**
+ * Interval for expired records cleanup (in seconds)
+ */
+#define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */
 
 /**
  * Our configuration.
@@ -42,6 +49,16 @@ char *db_lib_name;
  */
 static struct GNUNET_PEERSTORE_PluginFunctions *db;
 
+/**
+ * Hashmap with all watch requests
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *watchers;
+
+/**
+ * Our notification context.
+ */
+static struct GNUNET_SERVER_NotificationContext *nc;
+
 /**
  * Task run during shutdown.
  *
@@ -58,8 +75,47 @@ shutdown_task (void *cls,
     GNUNET_free (db_lib_name);
     db_lib_name = NULL;
   }
+  GNUNET_SERVER_notification_context_destroy(nc);
+  GNUNET_CONTAINER_multihashmap_destroy(watchers);
+  watchers = NULL;
+  GNUNET_SCHEDULER_shutdown();
+}
+
+/**
+ * Deletes any expired records from storage
+ */
+static void
+cleanup_expired_records(void *cls,
+    const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  int deleted;
+
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+    return;
+  GNUNET_assert(NULL != db);
+  deleted = db->expire_records(db->cls, GNUNET_TIME_absolute_get());
+  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted);
+  GNUNET_SCHEDULER_add_delayed(
+      GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, EXPIRED_RECORDS_CLEANUP_INTERVAL),
+      &cleanup_expired_records, NULL);
 }
 
+/**
+ * Search for a disconnected client and remove it
+ *
+ * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *'
+ * @param key hash of record key
+ * @param value the watcher client, a 'struct GNUNET_SERVER_Client *'
+ * @return #GNUNET_OK to continue iterating
+ */
+int client_disconnect_it(void *cls,
+    const struct GNUNET_HashCode *key,
+    void *value)
+{
+  if(cls == value)
+    GNUNET_CONTAINER_multihashmap_remove(watchers, key, value);
+  return GNUNET_OK;
+}
 
 /**
  * A client disconnected.  Remove all of its data structure entries.
@@ -72,6 +128,180 @@ handle_client_disconnect (void *cls,
                          struct GNUNET_SERVER_Client
                          * client)
 {
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "A client was disconnected, cleaning up.\n");
+  if(NULL != watchers)
+    GNUNET_CONTAINER_multihashmap_iterate(watchers,
+        &client_disconnect_it, client);
+}
+
+/**
+ * Function called by for each matching record.
+ *
+ * @param cls closure
+ * @param peer peer identity
+ * @param sub_system name of the GNUnet sub system responsible
+ * @param value stored value
+ * @param size size of stored value
+ */
+int record_iterator(void *cls,
+    struct GNUNET_PEERSTORE_Record *record,
+    char *emsg)
+{
+  struct GNUNET_SERVER_Client *client = cls;
+  struct StoreRecordMessage *srm;
+
+  srm = PEERSTORE_create_record_message(record->sub_system,
+      record->peer,
+      record->key,
+      record->value,
+      record->value_size,
+      record->expiry,
+      GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
+  GNUNET_SERVER_notification_context_unicast(nc, client, (struct GNUNET_MessageHeader *)srm, GNUNET_NO);
+  GNUNET_free(srm);
+  return GNUNET_YES;
+}
+
+/**
+ * Iterator over all watcher clients
+ * to notify them of a new record
+ *
+ * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *'
+ * @param key hash of record key
+ * @param value the watcher client, a 'struct GNUNET_SERVER_Client *'
+ * @return #GNUNET_YES to continue iterating
+ */
+int watch_notifier_it(void *cls,
+    const struct GNUNET_HashCode *key,
+    void *value)
+{
+  struct GNUNET_PEERSTORE_Record *record = cls;
+  struct GNUNET_SERVER_Client *client = value;
+  struct StoreRecordMessage *srm;
+
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
+  srm = PEERSTORE_create_record_message(record->sub_system,
+      record->peer,
+      record->key,
+      record->value,
+      record->value_size,
+      record->expiry,
+      GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
+  GNUNET_SERVER_notification_context_unicast(nc, client,
+      (const struct GNUNET_MessageHeader *)srm, GNUNET_NO);
+  GNUNET_free(srm);
+  return GNUNET_YES;
+}
+
+/**
+ * Given a new record, notifies watchers
+ *
+ * @param record changed record to update watchers with
+ */
+void watch_notifier (struct GNUNET_PEERSTORE_Record *record)
+{
+  struct GNUNET_HashCode keyhash;
+
+  PEERSTORE_hash_key(record->sub_system,
+      record->peer,
+      record->key,
+      &keyhash);
+  GNUNET_CONTAINER_multihashmap_get_multiple(watchers, &keyhash, &watch_notifier_it, record);
+}
+
+/**
+ * Handle a watch cancel request from client
+ *
+ * @param cls unused
+ * @param client identification of the client
+ * @param message the actual message
+ */
+void handle_watch_cancel (void *cls,
+    struct GNUNET_SERVER_Client *client,
+    const struct GNUNET_MessageHeader *message)
+{
+  struct StoreKeyHashMessage *hm;
+
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from client.\n");
+  hm = (struct StoreKeyHashMessage *) message;
+  GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client);
+  GNUNET_SERVER_receive_done(client, GNUNET_OK);
+}
+
+/**
+ * Handle a watch request from client
+ *
+ * @param cls unused
+ * @param client identification of the client
+ * @param message the actual message
+ */
+void handle_watch (void *cls,
+    struct GNUNET_SERVER_Client *client,
+    const struct GNUNET_MessageHeader *message)
+{
+  struct StoreKeyHashMessage *hm;
+
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from client.\n");
+  hm = (struct StoreKeyHashMessage *) message;
+  GNUNET_SERVER_client_mark_monitor(client);
+  GNUNET_SERVER_notification_context_add(nc, client);
+  GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash,
+     client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  GNUNET_SERVER_receive_done(client, GNUNET_OK);
+}
+
+/**
+ * Handle an iterate request from client
+ *
+ * @param cls unused
+ * @param client identification of the client
+ * @param message the actual message
+ */
+void handle_iterate (void *cls,
+    struct GNUNET_SERVER_Client *client,
+    const struct GNUNET_MessageHeader *message)
+{
+  struct GNUNET_PEERSTORE_Record *record;
+  struct GNUNET_MessageHeader *endmsg;
+
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received an iterate request from client.\n");
+  record = PEERSTORE_parse_record_message(message);
+  if(NULL == record)
+  {
+    GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Malformed iterate request from client\n");
+    GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
+    return;
+  }
+  if(NULL == record->sub_system)
+  {
+    GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Sub system not supplied in client iterate request\n");
+    GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
+    return;
+  }
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Iterate request: ss `%s', peer `%s', key `%s'\n",
+      record->sub_system,
+      (NULL == record->peer) ? "NULL" : GNUNET_i2s(record->peer),
+      (NULL == record->key) ? "NULL" : record->key);
+  GNUNET_SERVER_notification_context_add(nc, client);
+  if(GNUNET_OK == db->iterate_records(db->cls,
+      record->sub_system,
+      record->peer,
+      record->key,
+      &record_iterator,
+      client))
+  {
+    endmsg = GNUNET_new(struct GNUNET_MessageHeader);
+    endmsg->size = htons(sizeof(struct GNUNET_MessageHeader));
+    endmsg->type = htons(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
+    GNUNET_SERVER_notification_context_unicast(nc, client, endmsg, GNUNET_NO);
+    GNUNET_free(endmsg);
+    GNUNET_SERVER_receive_done(client, GNUNET_OK);
+  }
+  else
+  {
+    GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
+  }
+  PEERSTORE_destroy_record(record);
 }
 
 /**
@@ -85,32 +315,48 @@ void handle_store (void *cls,
     struct GNUNET_SERVER_Client *client,
     const struct GNUNET_MessageHeader *message)
 {
-  struct StoreRequestMessage *sreqm;
-  struct GNUNET_SERVER_TransmitContext *tc;
-  struct StoreResponseMessage *sresm;
-  uint16_t msg_size;
-  char *sub_system;
-
-  msg_size = ntohs(message->size);
-  GNUNET_break_op(msg_size > sizeof(struct GNUNET_MessageHeader) + sizeof(struct StoreRequestMessage));
-  sreqm = (struct StoreRequestMessage *)&message[1];
-  sub_system = (char *)&sreqm[1];
-  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a store request (size: %lu) for sub system `%s' and peer `%s'\n",
-      msg_size,
-      sub_system,
-      GNUNET_i2s (&sreqm->peer));
-  //TODO: do the actual storage
-  //create a fake response for testing
-  char *response = "This is a response";
-  tc = GNUNET_SERVER_transmit_context_create (client);
-  sresm = malloc(sizeof(struct StoreResponseMessage) + strlen(response));
-  sresm->header.type = htons(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT);
-  sresm->header.size = htons(sizeof(struct StoreResponseMessage) + strlen(response));
-  sresm->success = htons(GNUNET_NO);
-  sresm->emsg_size = htons(strlen(response));
-  memcpy(&sresm[1], response, strlen(response));
-  GNUNET_SERVER_transmit_context_append_message(tc, (struct GNUNET_MessageHeader *)sresm);
-  GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
+  struct GNUNET_PEERSTORE_Record *record;
+  struct StoreRecordMessage *srm;
+
+  record = PEERSTORE_parse_record_message(message);
+  if(NULL == record)
+  {
+    GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Malformed store request from client\n");
+    GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
+    return;
+  }
+  srm = (struct StoreRecordMessage *)message;
+  if(NULL == record->sub_system
+      || NULL == record->peer
+      || NULL == record->key)
+  {
+    GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Full key not supplied in client store request\n");
+    PEERSTORE_destroy_record(record);
+    GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
+    return;
+  }
+  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a store request (size: %lu) for sub system `%s', peer `%s', key `%s'\n",
+      record->value_size,
+      record->sub_system,
+      GNUNET_i2s (record->peer),
+      record->key);
+  if(GNUNET_OK != db->store_record(db->cls,
+      record->sub_system,
+      record->peer,
+      record->key,
+      record->value,
+      record->value_size,
+      *record->expiry,
+      srm->options))
+  {
+    GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to store requested value, sqlite database error.");
+    PEERSTORE_destroy_record(record);
+    GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
+    return;
+  }
+  GNUNET_SERVER_receive_done(client, GNUNET_OK);
+  watch_notifier(record);
+  PEERSTORE_destroy_record(record);
 }
 
 /**
@@ -127,6 +373,9 @@ run (void *cls,
 {
   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
       {&handle_store, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, 0},
+      {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0},
+      {&handle_watch, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH, sizeof(struct StoreKeyHashMessage)},
+      {&handle_watch_cancel, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL, sizeof(struct StoreKeyHashMessage)},
       {NULL, NULL, 0, 0}
   };
   char *database;
@@ -147,6 +396,9 @@ run (void *cls,
          GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Could not load database backend `%s'\n", db_lib_name);
   else
   {
+    nc = GNUNET_SERVER_notification_context_create (server, 16);
+    watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
+    GNUNET_SCHEDULER_add_now(&cleanup_expired_records, NULL);
     GNUNET_SERVER_add_handlers (server, handlers);
     GNUNET_SERVER_disconnect_notify (server,
              &handle_client_disconnect,