- moved timeout handling responsibility from for nat tests from caller to the library
[oweals/gnunet.git] / src / peerstore / gnunet-service-peerstore.c
index b7827692242733db2641eb1d30ffcfe459fe6131..140db80d8795714b5dbbfa534c696be9bf575066 100644 (file)
 #include "gnunet_peerstore_plugin.h"
 #include "peerstore_common.h"
 
-//TODO: GNUNET_SERVER_receive_done() ?
-
 /**
  * Interval for expired records cleanup (in seconds)
  */
-#define CLEANUP_INTERVAL 300 /* 5mins */
+#define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */
 
 /**
  * Our configuration.
@@ -51,6 +49,16 @@ char *db_lib_name;
  */
 static struct GNUNET_PEERSTORE_PluginFunctions *db;
 
+/**
+ * Hashmap with all watch requests
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *watchers;
+
+/**
+ * Our notification context.
+ */
+static struct GNUNET_SERVER_NotificationContext *nc;
+
 /**
  * Task run during shutdown.
  *
@@ -67,6 +75,10 @@ shutdown_task (void *cls,
     GNUNET_free (db_lib_name);
     db_lib_name = NULL;
   }
+  GNUNET_SERVER_notification_context_destroy(nc);
+  GNUNET_CONTAINER_multihashmap_destroy(watchers);
+  watchers = NULL;
+  GNUNET_SCHEDULER_shutdown();
 }
 
 /**
@@ -78,14 +90,32 @@ cleanup_expired_records(void *cls,
 {
   int deleted;
 
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+    return;
   GNUNET_assert(NULL != db);
   deleted = db->expire_records(db->cls, GNUNET_TIME_absolute_get());
   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted);
   GNUNET_SCHEDULER_add_delayed(
-      GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, CLEANUP_INTERVAL),
+      GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, EXPIRED_RECORDS_CLEANUP_INTERVAL),
       &cleanup_expired_records, NULL);
 }
 
+/**
+ * Search for a disconnected client and remove it
+ *
+ * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *'
+ * @param key hash of record key
+ * @param value the watcher client, a 'struct GNUNET_SERVER_Client *'
+ * @return #GNUNET_OK to continue iterating
+ */
+int client_disconnect_it(void *cls,
+    const struct GNUNET_HashCode *key,
+    void *value)
+{
+  if(cls == value)
+    GNUNET_CONTAINER_multihashmap_remove(watchers, key, value);
+  return GNUNET_OK;
+}
 
 /**
  * A client disconnected.  Remove all of its data structure entries.
@@ -98,6 +128,10 @@ handle_client_disconnect (void *cls,
                          struct GNUNET_SERVER_Client
                          * client)
 {
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "A client was disconnected, cleaning up.\n");
+  if(NULL != watchers)
+    GNUNET_CONTAINER_multihashmap_iterate(watchers,
+        &client_disconnect_it, client);
 }
 
 /**
@@ -113,7 +147,7 @@ int record_iterator(void *cls,
     struct GNUNET_PEERSTORE_Record *record,
     char *emsg)
 {
-  struct GNUNET_SERVER_TransmitContext *tc = cls;
+  struct GNUNET_SERVER_Client *client = cls;
   struct StoreRecordMessage *srm;
 
   srm = PEERSTORE_create_record_message(record->sub_system,
@@ -123,10 +157,99 @@ int record_iterator(void *cls,
       record->value_size,
       record->expiry,
       GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
-  GNUNET_SERVER_transmit_context_append_message(tc, (const struct GNUNET_MessageHeader *)srm);
+  GNUNET_SERVER_notification_context_unicast(nc, client, (struct GNUNET_MessageHeader *)srm, GNUNET_NO);
+  GNUNET_free(srm);
   return GNUNET_YES;
 }
 
+/**
+ * Iterator over all watcher clients
+ * to notify them of a new record
+ *
+ * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *'
+ * @param key hash of record key
+ * @param value the watcher client, a 'struct GNUNET_SERVER_Client *'
+ * @return #GNUNET_YES to continue iterating
+ */
+int watch_notifier_it(void *cls,
+    const struct GNUNET_HashCode *key,
+    void *value)
+{
+  struct GNUNET_PEERSTORE_Record *record = cls;
+  struct GNUNET_SERVER_Client *client = value;
+  struct StoreRecordMessage *srm;
+
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
+  srm = PEERSTORE_create_record_message(record->sub_system,
+      record->peer,
+      record->key,
+      record->value,
+      record->value_size,
+      record->expiry,
+      GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
+  GNUNET_SERVER_notification_context_unicast(nc, client,
+      (const struct GNUNET_MessageHeader *)srm, GNUNET_NO);
+  GNUNET_free(srm);
+  return GNUNET_YES;
+}
+
+/**
+ * Given a new record, notifies watchers
+ *
+ * @param record changed record to update watchers with
+ */
+void watch_notifier (struct GNUNET_PEERSTORE_Record *record)
+{
+  struct GNUNET_HashCode keyhash;
+
+  PEERSTORE_hash_key(record->sub_system,
+      record->peer,
+      record->key,
+      &keyhash);
+  GNUNET_CONTAINER_multihashmap_get_multiple(watchers, &keyhash, &watch_notifier_it, record);
+}
+
+/**
+ * Handle a watch cancel request from client
+ *
+ * @param cls unused
+ * @param client identification of the client
+ * @param message the actual message
+ */
+void handle_watch_cancel (void *cls,
+    struct GNUNET_SERVER_Client *client,
+    const struct GNUNET_MessageHeader *message)
+{
+  struct StoreKeyHashMessage *hm;
+
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from client.\n");
+  hm = (struct StoreKeyHashMessage *) message;
+  GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client);
+  GNUNET_SERVER_receive_done(client, GNUNET_OK);
+}
+
+/**
+ * Handle a watch request from client
+ *
+ * @param cls unused
+ * @param client identification of the client
+ * @param message the actual message
+ */
+void handle_watch (void *cls,
+    struct GNUNET_SERVER_Client *client,
+    const struct GNUNET_MessageHeader *message)
+{
+  struct StoreKeyHashMessage *hm;
+
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from client.\n");
+  hm = (struct StoreKeyHashMessage *) message;
+  GNUNET_SERVER_client_mark_monitor(client);
+  GNUNET_SERVER_notification_context_add(nc, client);
+  GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash,
+     client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  GNUNET_SERVER_receive_done(client, GNUNET_OK);
+}
+
 /**
  * Handle an iterate request from client
  *
@@ -139,7 +262,7 @@ void handle_iterate (void *cls,
     const struct GNUNET_MessageHeader *message)
 {
   struct GNUNET_PEERSTORE_Record *record;
-  struct GNUNET_SERVER_TransmitContext *tc;
+  struct GNUNET_MessageHeader *endmsg;
 
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received an iterate request from client.\n");
   record = PEERSTORE_parse_record_message(message);
@@ -155,23 +278,30 @@ void handle_iterate (void *cls,
     GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
     return;
   }
-  tc = GNUNET_SERVER_transmit_context_create (client);
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Iterate request: ss `%s', peer `%s', key `%s'\n",
+      record->sub_system,
+      (NULL == record->peer) ? "NULL" : GNUNET_i2s(record->peer),
+      (NULL == record->key) ? "NULL" : record->key);
+  GNUNET_SERVER_notification_context_add(nc, client);
   if(GNUNET_OK == db->iterate_records(db->cls,
       record->sub_system,
       record->peer,
       record->key,
       &record_iterator,
-      tc))
+      client))
   {
-    GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
-    GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
+    endmsg = GNUNET_new(struct GNUNET_MessageHeader);
+    endmsg->size = htons(sizeof(struct GNUNET_MessageHeader));
+    endmsg->type = htons(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
+    GNUNET_SERVER_notification_context_unicast(nc, client, endmsg, GNUNET_NO);
+    GNUNET_free(endmsg);
+    GNUNET_SERVER_receive_done(client, GNUNET_OK);
   }
   else
   {
-    GNUNET_free(tc);
     GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
   }
-  GNUNET_free(record);
+  PEERSTORE_destroy_record(record);
 }
 
 /**
@@ -186,8 +316,7 @@ void handle_store (void *cls,
     const struct GNUNET_MessageHeader *message)
 {
   struct GNUNET_PEERSTORE_Record *record;
-  uint16_t response_type;
-  struct GNUNET_SERVER_TransmitContext *tc;
+  struct StoreRecordMessage *srm;
 
   record = PEERSTORE_parse_record_message(message);
   if(NULL == record)
@@ -196,11 +325,13 @@ void handle_store (void *cls,
     GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
     return;
   }
+  srm = (struct StoreRecordMessage *)message;
   if(NULL == record->sub_system
       || NULL == record->peer
       || NULL == record->key)
   {
     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Full key not supplied in client store request\n");
+    PEERSTORE_destroy_record(record);
     GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
     return;
   }
@@ -209,26 +340,23 @@ void handle_store (void *cls,
       record->sub_system,
       GNUNET_i2s (record->peer),
       record->key);
-  if(GNUNET_OK == db->store_record(db->cls,
+  if(GNUNET_OK != db->store_record(db->cls,
       record->sub_system,
       record->peer,
       record->key,
       record->value,
       record->value_size,
-      *record->expiry))
-  {
-    response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK;
-  }
-  else
+      *record->expiry,
+      srm->options))
   {
     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to store requested value, sqlite database error.");
-    response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL;
+    PEERSTORE_destroy_record(record);
+    GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
+    return;
   }
-
-  tc = GNUNET_SERVER_transmit_context_create (client);
-  GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, response_type);
-  GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
-
+  GNUNET_SERVER_receive_done(client, GNUNET_OK);
+  watch_notifier(record);
+  PEERSTORE_destroy_record(record);
 }
 
 /**
@@ -246,6 +374,8 @@ run (void *cls,
   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
       {&handle_store, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, 0},
       {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0},
+      {&handle_watch, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH, sizeof(struct StoreKeyHashMessage)},
+      {&handle_watch_cancel, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL, sizeof(struct StoreKeyHashMessage)},
       {NULL, NULL, 0, 0}
   };
   char *database;
@@ -266,7 +396,9 @@ run (void *cls,
          GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Could not load database backend `%s'\n", db_lib_name);
   else
   {
-    cleanup_expired_records(NULL, NULL);
+    nc = GNUNET_SERVER_notification_context_create (server, 16);
+    watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
+    GNUNET_SCHEDULER_add_now(&cleanup_expired_records, NULL);
     GNUNET_SERVER_add_handlers (server, handlers);
     GNUNET_SERVER_disconnect_notify (server,
              &handle_client_disconnect,