fix
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
index 6d0f29671211c240121b9cf0763353f0f37a77a0..40ea153de0ab3efb64e9a79ebb5b7952c3e412c2 100644 (file)
@@ -26,7 +26,6 @@
 
 #include "platform.h"
 #include "gnunet_util_lib.h"
-#include "gnunet_arm_service.h"
 #include "gnunet_protocols.h"
 #include "gnunet_statistics_service.h"
 #include "plugin_datastore.h"
@@ -125,11 +124,6 @@ static struct ReservationList *reservations;
  */
 static struct GNUNET_CONTAINER_BloomFilter *filter;
 
-/**
- * Static counter to produce reservation identifiers.
- */
-static int reservation_gen;
-
 /**
  * How much space are we allowed to use?
  */
@@ -242,7 +236,7 @@ static struct TransmitCallbackContext *tcc_head;
 static struct TransmitCallbackContext *tcc_tail;
 
 /**
- * Have we already clean ed up the TCCs and are hence no longer
+ * Have we already cleaned up the TCCs and are hence no longer
  * willing (or able) to transmit anything to anyone?
  */
 static int cleaning_done;
@@ -316,7 +310,9 @@ expired_processor (void *cls,
   plugin->api->next_request (next_cls, GNUNET_NO);
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Deleting content that expired %llu ms ago\n",
+             "Deleting content `%s' of type %u that expired %llu ms ago\n",
+             GNUNET_h2s (key),
+             type,
              (unsigned long long) (now.value - expiration.value));
 #endif
   GNUNET_STATISTICS_update (stats,
@@ -397,8 +393,10 @@ manage (void *cls,
                             (0 == *need) ? GNUNET_YES : GNUNET_NO);
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Deleting %llu bytes of low-priority content (still trying to free another %llu bytes)\n",
-             size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
+             "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
+             (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
+             GNUNET_h2s (key),
+             type,
              *need);
 #endif
   GNUNET_STATISTICS_update (stats,
@@ -524,6 +522,11 @@ transmit (struct GNUNET_SERVER_Client *client,
 
   if (GNUNET_YES == cleaning_done)
     {
+#if DEBUG_DATASTORE
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 "Shutdown in progress, aborting transmission.\n");
+#endif
+      GNUNET_free (msg);
       if (NULL != tc)
        tc (tc_cls, GNUNET_SYSERR);
       return;
@@ -683,8 +686,10 @@ transmit_item (void *cls,
   memcpy (&dm[1], data, size);
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitting `%s' message\n",
-             "DATA");
+             "Transmitting `%s' message for `%s' of type %u\n",
+             "DATA",
+             GNUNET_h2s (key),
+             type);
 #endif
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# results found"),
@@ -707,6 +712,11 @@ handle_reserve (void *cls,
                struct GNUNET_SERVER_Client *client,
                const struct GNUNET_MessageHeader *message)
 {
+  /**
+   * Static counter to produce reservation identifiers.
+   */
+  static int reservation_gen;
+
   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
   struct ReservationList *e;
   unsigned long long used;
@@ -755,6 +765,10 @@ handle_reserve (void *cls,
       return;      
     }
   reserved += req;
+  GNUNET_STATISTICS_set (stats,
+                        gettext_noop ("# reserved"),
+                        reserved,
+                        GNUNET_NO);
   e = GNUNET_malloc (sizeof(struct ReservationList));
   e->next = reservations;
   reservations = e;
@@ -806,6 +820,10 @@ handle_release_reserve (void *cls,
          rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
          GNUNET_assert (reserved >= rem);
          reserved -= rem;
+         GNUNET_STATISTICS_set (stats,
+                        gettext_noop ("# reserved"),
+                                reserved,
+                                GNUNET_NO);
 #if DEBUG_DATASTORE
          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                      "Returning %llu remaining reserved bytes to storage pool\n",
@@ -851,6 +869,142 @@ check_data (const struct GNUNET_MessageHeader *message)
 }
 
 
+/**
+ * Context for a put request used to see if the content is
+ * already present.
+ */
+struct PutContext
+{
+  /**
+   * Client to notify on completion.
+   */
+  struct GNUNET_SERVER_Client *client;
+
+  /**
+   * Did we find the data already in the database?
+   */
+  int is_present;
+  
+  /* followed by the 'struct DataMessage' */
+};
+
+
+/**
+ * Actually put the data message.
+ */
+static void
+execute_put (struct GNUNET_SERVER_Client *client,
+            const struct DataMessage *dm)
+{
+  uint32_t size;
+  char *msg;
+  int ret;
+
+  size = ntohl(dm->size);
+  msg = NULL;
+  ret = plugin->api->put (plugin->api->cls,
+                         &dm->key,
+                         size,
+                         &dm[1],
+                         ntohl(dm->type),
+                         ntohl(dm->priority),
+                         ntohl(dm->anonymity),
+                         GNUNET_TIME_absolute_ntoh(dm->expiration),
+                         &msg);
+  if (GNUNET_OK == ret)
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# bytes stored"),
+                               size,
+                               GNUNET_YES);
+      GNUNET_CONTAINER_bloomfilter_add (filter,
+                                       &dm->key);
+#if DEBUG_DATASTORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Successfully stored %u bytes of type %u under key `%s'\n",
+                 size,
+                 ntohl(dm->type),
+                 GNUNET_h2s (&dm->key));
+#endif
+    }
+  transmit_status (client, 
+                  (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
+                  msg);
+  GNUNET_free_non_null (msg);
+  if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
+                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
+                 (unsigned long long) (quota - reserved - cache_size),
+                 (unsigned long long) plugin->api->get_size (plugin->api->cls));
+      manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
+    }
+}
+
+
+
+/**
+ * Function that will check if the given datastore entry
+ * matches the put and if none match executes the put.
+ *
+ * @param cls closure, pointer to the client (of type 'struct PutContext').
+ * @param next_cls closure to use to ask for the next item
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
+ *         GNUNET_NO to delete the item and continue (if supported)
+ */
+static int
+check_present (void *cls,
+              void *next_cls,
+              const GNUNET_HashCode * key,
+              uint32_t size,
+              const void *data,
+              enum GNUNET_BLOCK_Type type,
+              uint32_t priority,
+              uint32_t anonymity,
+              struct GNUNET_TIME_Absolute
+              expiration, uint64_t uid)
+{
+  struct PutContext *pc = cls;
+  const struct DataMessage *dm;
+
+  dm = (const struct DataMessage*) &pc[1];
+  if (key == NULL)
+    {
+      if (pc->is_present == GNUNET_YES)        
+       transmit_status (pc->client, GNUNET_OK, NULL);
+      else
+       execute_put (pc->client, dm);
+      GNUNET_SERVER_client_drop (pc->client);
+      GNUNET_free (pc);
+      return GNUNET_SYSERR;
+    }
+  if ( (size == ntohl(dm->size)) &&
+       (0 == memcmp (&dm[1],
+                    data,
+                    size)) )
+    {
+      pc->is_present = GNUNET_YES;
+      plugin->api->next_request (next_cls, GNUNET_YES);
+    }
+  else
+    {
+      plugin->api->next_request (next_cls, GNUNET_NO);
+    }
+  return GNUNET_OK;
+}
+
+
 /**
  * Handle PUT-message.
  *
@@ -864,10 +1018,9 @@ handle_put (void *cls,
            const struct GNUNET_MessageHeader *message)
 {
   const struct DataMessage *dm = check_data (message);
-  char *msg;
-  int ret;
   int rid;
   struct ReservationList *pos;
+  struct PutContext *pc;
   uint32_t size;
 
   if ( (dm == NULL) ||
@@ -879,9 +1032,10 @@ handle_put (void *cls,
     }
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Processing `%s' request for `%s'\n",
+             "Processing `%s' request for `%s' of type %u\n",
              "PUT",
-             GNUNET_h2s (&dm->key));
+             GNUNET_h2s (&dm->key),
+             ntohl (dm->type));
 #endif
   rid = ntohl(dm->rid);
   size = ntohl(dm->size);
@@ -895,43 +1049,32 @@ handle_put (void *cls,
       if (NULL != pos)
        {
          GNUNET_break (pos->entries > 0);
-         GNUNET_break (pos->amount > size);
+         GNUNET_break (pos->amount >= size);
          pos->entries--;
          pos->amount -= size;
          reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
+         GNUNET_STATISTICS_set (stats,
+                                gettext_noop ("# reserved"),
+                                reserved,
+                                GNUNET_NO);
        }
     }
-  msg = NULL;
-  ret = plugin->api->put (plugin->api->cls,
-                         &dm->key,
-                         size,
-                         &dm[1],
-                         ntohl(dm->type),
-                         ntohl(dm->priority),
-                         ntohl(dm->anonymity),
-                         GNUNET_TIME_absolute_ntoh(dm->expiration),
-                         &msg);
-  if (GNUNET_OK == ret)
+  if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
+                                                      &dm->key))
     {
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# bytes stored"),
-                               size,
-                               GNUNET_YES);
-      GNUNET_CONTAINER_bloomfilter_add (filter,
-                                       &dm->key);
-#if DEBUG_DATASTORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Successfully stored %u bytes under key `%s'\n",
-                 size,
-                 GNUNET_h2s (&dm->key));
-#endif
+      pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
+      pc->client = client;
+      GNUNET_SERVER_client_keep (client);
+      memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
+      plugin->api->get (plugin->api->cls,
+                       &dm->key,
+                       NULL,
+                       ntohl (dm->type),
+                       &check_present,
+                       pc);      
+      return;
     }
-  transmit_status (client, 
-                  (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
-                  msg);
-  GNUNET_free_non_null (msg);
-  if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
-    manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
+  execute_put (client, dm);
 }
 
 
@@ -978,7 +1121,7 @@ handle_get (void *cls,
       /* don't bother database... */
 #if DEBUG_DATASTORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Empty result set for `%s' request for `%s'.\n",
+                 "Empty result set for `%s' request for `%s' (bloomfilter).\n",
                  "GET",
                  GNUNET_h2s (&msg->key));
 #endif 
@@ -1120,10 +1263,11 @@ remove_callback (void *cls,
   rc->found = GNUNET_YES;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Item %llu matches `%s' request for key `%s'.\n",
+             "Item %llu matches `%s' request for key `%s' and type %u.\n",
              (unsigned long long) uid,
              "REMOVE",
-             GNUNET_h2s (key));
+             GNUNET_h2s (key),
+             type);
 #endif 
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# bytes removed (explicit request)"),
@@ -1160,9 +1304,10 @@ handle_remove (void *cls,
     }
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Processing `%s' request for `%s'\n",
+             "Processing `%s' request for `%s' of type %u\n",
              "REMOVE",
-             GNUNET_h2s (&dm->key));
+             GNUNET_h2s (&dm->key),
+             ntohl (dm->type));
 #endif
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# REMOVE requests received"),
@@ -1205,29 +1350,6 @@ handle_drop (void *cls,
 }
 
 
-/**
- * List of handlers for the messages understood by this
- * service.
- */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
-  {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
-   sizeof(struct ReserveMessage) }, 
-  {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
-   sizeof(struct ReleaseReserveMessage) }, 
-  {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
-  {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
-   sizeof (struct UpdateMessage) }, 
-  {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
-  {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
-   sizeof(struct GNUNET_MessageHeader) }, 
-  {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
-  {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
-   sizeof(struct GNUNET_MessageHeader) }, 
-  {NULL, NULL, 0, 0}
-};
-
-
-
 /**
  * Load the datastore plugin.
  */
@@ -1304,7 +1426,6 @@ unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
       GNUNET_CONTAINER_bloomfilter_free (filter);
       filter = NULL;
     }
-  GNUNET_ARM_stop_services (cfg, tc->sched, "statistics", NULL);
   if (stats != NULL)
     {
       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
@@ -1333,7 +1454,7 @@ cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
          GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
          GNUNET_SERVER_client_drop (tcc->client);
        }
-   if (NULL != tcc->tc)
+      if (NULL != tcc->tc)
        tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
       GNUNET_free (tcc->msg);
       GNUNET_free (tcc);
@@ -1390,6 +1511,10 @@ cleanup_reservations (void *cls,
        }
       pos = next;
     }
+  GNUNET_STATISTICS_set (stats,
+                        gettext_noop ("# reserved"),
+                        reserved,
+                        GNUNET_NO);
 }
 
 
@@ -1407,6 +1532,22 @@ run (void *cls,
      struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
+  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+    {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
+     sizeof(struct ReserveMessage) }, 
+    {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
+     sizeof(struct ReleaseReserveMessage) }, 
+    {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
+    {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
+     sizeof (struct UpdateMessage) }, 
+    {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
+    {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
+     sizeof(struct GNUNET_MessageHeader) }, 
+    {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
+    {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
+     sizeof(struct GNUNET_MessageHeader) }, 
+    {NULL, NULL, 0, 0}
+  };
   char *fn;
   unsigned int bf_size;
 
@@ -1423,7 +1564,15 @@ run (void *cls,
       return;
     }
   stats = GNUNET_STATISTICS_create (sched, "datastore", cfg);
+  GNUNET_STATISTICS_set (stats,
+                        gettext_noop ("# quota"),
+                        quota,
+                        GNUNET_NO);
   cache_size = quota / 8; /* Or should we make this an option? */
+  GNUNET_STATISTICS_set (stats,
+                        gettext_noop ("# cache size"),
+                        cache_size,
+                        GNUNET_NO);
   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
   fn = NULL;
   if ( (GNUNET_OK !=
@@ -1453,13 +1602,11 @@ run (void *cls,
        }
       return;
     }
-  GNUNET_ARM_start_services (cfg, sched, "statistics", NULL);
   plugin = load_plugin ();
   if (NULL == plugin)
     {
       GNUNET_CONTAINER_bloomfilter_free (filter);
       filter = NULL;
-      GNUNET_ARM_stop_services (cfg, sched, "statistics", NULL);
       if (stats != NULL)
        {
          GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
@@ -1476,7 +1623,6 @@ run (void *cls,
   GNUNET_SCHEDULER_add_delayed (sched,
                                 GNUNET_TIME_UNIT_FOREVER_REL,
                                 &cleaning_task, NULL);
-  
 }