de-experimentalizing
[oweals/gnunet.git] / src / datastore / datastore_api.c
index 6c817a84042e8ad0a9277c5a3701f236542c96f5..ef373695063ed11e94ed89c7b2239b9486571fae 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
+     (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -166,7 +166,13 @@ struct GNUNET_DATASTORE_QueueEntry
    * Note that the overall struct should end at a 
    * multiple of 64 bits.
    */
-  int32_t was_transmitted;
+  int was_transmitted;
+  
+  /**
+   * Are we expecting a single message in response to this
+   * request (and, if it is data, no 'END' message)?
+   */
+  int one_shot; 
   
 };
 
@@ -181,10 +187,6 @@ struct GNUNET_DATASTORE_Handle
    */
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
-  /**
-   * Our scheduler.
-   */
-  struct GNUNET_SCHEDULER_Handle *sched;
 
   /**
    * Current connection to the datastore service.
@@ -247,30 +249,24 @@ struct GNUNET_DATASTORE_Handle
  * Connect to the datastore service.
  *
  * @param cfg configuration to use
- * @param sched scheduler to use
  * @return handle to use to access the service
  */
 struct GNUNET_DATASTORE_Handle *
 GNUNET_DATASTORE_connect (const struct
                          GNUNET_CONFIGURATION_Handle
-                         *cfg,
-                         struct
-                         GNUNET_SCHEDULER_Handle
-                         *sched)
+                         *cfg)
 {
   struct GNUNET_CLIENT_Connection *c;
   struct GNUNET_DATASTORE_Handle *h;
   
-  c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
+  c = GNUNET_CLIENT_connect ("datastore", cfg);
   if (c == NULL)
     return NULL; /* oops */
   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
                     GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
   h->client = c;
   h->cfg = cfg;
-  h->sched = sched;
-  h->stats = GNUNET_STATISTICS_create (sched,
-                                      "datastore-api", 
+  h->stats = GNUNET_STATISTICS_create ("datastore-api",
                                       cfg);
   return h;
 }
@@ -315,8 +311,9 @@ transmit_drop (void *cls,
  * @param h handle to the datastore
  * @param drop set to GNUNET_YES to delete all data in datastore (!)
  */
-void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
-                                 int drop)
+void
+GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
+                            int drop)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
@@ -327,8 +324,7 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
     }
   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
     {
-      GNUNET_SCHEDULER_cancel (h->sched,
-                              h->reconnect_task);
+      GNUNET_SCHEDULER_cancel (h->reconnect_task);
       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
     }
   while (NULL != (qe = h->queue_head))
@@ -338,7 +334,7 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
     }
   if (GNUNET_YES == drop) 
     {
-      h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
+      h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
       if (h->client != NULL)
        {
          if (NULL != 
@@ -459,8 +455,7 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
       response_proc (ret, NULL);
       return NULL;
     }
-  ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                           timeout,
+  ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
                                            &timeout_queue_entry,
                                            ret);
   pos = ret->next;
@@ -507,7 +502,7 @@ try_reconnect (void *cls,
   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
+  h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
   if (h->client == NULL)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -551,8 +546,7 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h)
 #endif
   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
   h->client = NULL;
-  h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                                   h->retry_time,
+  h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
                                                    &try_reconnect,
                                                    h);      
 }
@@ -601,8 +595,7 @@ transmit_request (void *cls,
 #endif
   memcpy (buf, &qe[1], msize);
   qe->was_transmitted = GNUNET_YES;
-  GNUNET_SCHEDULER_cancel (h->sched,
-                          qe->task);
+  GNUNET_SCHEDULER_cancel (qe->task);
   qe->task = GNUNET_SCHEDULER_NO_TASK;
   h->in_receive = GNUNET_YES;
   GNUNET_CLIENT_receive (h->client,
@@ -682,7 +675,7 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
  * @param emsg error message
  */
 static void
-drop_status_cont (void *cls, int result, const char *emsg)
+drop_status_cont (void *cls, int32_t result, const char *emsg)
 {
   /* do nothing */
 }
@@ -698,8 +691,7 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
                               qe);
   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
     {
-      GNUNET_SCHEDULER_cancel (h->sched,
-                              qe->task);
+      GNUNET_SCHEDULER_cancel (qe->task);
       qe->task = GNUNET_SCHEDULER_NO_TASK;
     }
   h->queue_size--;
@@ -807,6 +799,7 @@ process_status_message (void *cls,
  * @param type type of the content
  * @param priority priority of the content
  * @param anonymity anonymity-level for the content
+ * @param replication how often should the content be replicated to other peers?
  * @param expiration expiration time for the content
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
@@ -820,13 +813,14 @@ process_status_message (void *cls,
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
-                     int rid,
+                     uint32_t rid,
                       const GNUNET_HashCode * key,
                       size_t size,
                       const void *data,
                       enum GNUNET_BLOCK_Type type,
                       uint32_t priority,
                       uint32_t anonymity,
+                     uint32_t replication,
                       struct GNUNET_TIME_Absolute expiration,
                      unsigned int queue_priority,
                      unsigned int max_queue_size,
@@ -972,7 +966,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
-                                 int rid,
+                                 uint32_t rid,
                                  unsigned int queue_priority,
                                  unsigned int max_queue_size,
                                  struct GNUNET_TIME_Relative timeout,
@@ -1035,7 +1029,7 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
-                        unsigned long long uid,
+                        uint64_t uid,
                         uint32_t priority,
                         struct GNUNET_TIME_Absolute expiration,
                         unsigned int queue_priority,
@@ -1212,11 +1206,12 @@ process_result_message (void *cls,
   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
     {
       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
+      free_queue_entry (qe);
 #if DEBUG_DATASTORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Received end of result set\n");
+                 "Received end of result set, new queue size is %u\n",
+                 h->queue_size);
 #endif
-      free_queue_entry (qe);
       if (rc.iter != NULL)
        rc.iter (rc.iter_cls,
                 NULL, 0, NULL, 0, 0, 0, 
@@ -1262,7 +1257,10 @@ process_result_message (void *cls,
          do_disconnect (h);      
          return;
        }
-      GNUNET_DATASTORE_get_next (h, GNUNET_NO);
+      if (GNUNET_YES == qe->one_shot)
+       free_queue_entry (qe);
+      else
+       GNUNET_DATASTORE_iterate_get_next (h);
       return;
     }
   dm = (const struct DataMessage*) msg;
@@ -1274,6 +1272,8 @@ process_result_message (void *cls,
              ntohl(dm->size),
              GNUNET_h2s(&dm->key));
 #endif
+  if (GNUNET_YES == qe->one_shot)
+    free_queue_entry (qe);
   h->retry_time.rel_value = 0;
   rc.iter (rc.iter_cls,
           &dm->key,
@@ -1288,7 +1288,11 @@ process_result_message (void *cls,
 
 
 /**
- * Get a random value from the datastore.
+ * Get a random value from the datastore for content replication.
+ * Returns a single, random value among those with the highest
+ * replication score, lowering positive replication scores by one for
+ * the chosen value (if only content with a replication score exists,
+ * a random value is returned and replication scores are not changed).
  *
  * @param h handle to the datastore
  * @param queue_priority ranking of this request in the priority queue
@@ -1304,12 +1308,12 @@ process_result_message (void *cls,
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
-                            unsigned int queue_priority,
-                            unsigned int max_queue_size,
-                            struct GNUNET_TIME_Relative timeout,
-                             GNUNET_DATASTORE_Iterator iter, 
-                            void *iter_cls)
+GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
+                                     unsigned int queue_priority,
+                                     unsigned int max_queue_size,
+                                     struct GNUNET_TIME_Relative timeout,
+                                     GNUNET_DATASTORE_Iterator iter, 
+                                     void *iter_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GNUNET_MessageHeader *m;
@@ -1317,7 +1321,7 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
 
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to get random entry in %llu ms\n",
+             "Asked to get replication entry in %llu ms\n",
              (unsigned long long) timeout.rel_value);
 #endif
   qc.rc.iter = iter;
@@ -1329,16 +1333,17 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
     {
 #if DEBUG_DATASTORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Could not create queue entry for GET RANDOM\n");
+                 "Could not create queue entry for GET REPLICATION\n");
 #endif
       return NULL;    
     }
+  qe->one_shot = GNUNET_YES;
   GNUNET_STATISTICS_update (h->stats,
-                           gettext_noop ("# GET RANDOM requests executed"),
+                           gettext_noop ("# GET REPLICATION requests executed"),
                            1,
                            GNUNET_NO);
   m = (struct GNUNET_MessageHeader*) &qe[1];
-  m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
+  m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
   m->size = htons(sizeof (struct GNUNET_MessageHeader));
   process_queue (h);
   return qe;
@@ -1363,18 +1368,19 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
-                                    unsigned int queue_priority,
-                                    unsigned int max_queue_size,
-                                    struct GNUNET_TIME_Relative timeout,
-                                    enum GNUNET_BLOCK_Type type,
-                                    GNUNET_DATASTORE_Iterator iter, 
-                                    void *iter_cls)
+GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
+                                        unsigned int queue_priority,
+                                        unsigned int max_queue_size,
+                                        struct GNUNET_TIME_Relative timeout,
+                                        enum GNUNET_BLOCK_Type type,
+                                        GNUNET_DATASTORE_Iterator iter, 
+                                        void *iter_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GetZeroAnonymityMessage *m;
   union QueueContext qc;
 
+  GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Asked to get zero-anonymity entry in %llu ms\n",
@@ -1412,7 +1418,7 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
  * in the datastore.  The iterator will only be called
  * once initially; if the first call did contain a
  * result, further results can be obtained by calling
- * "GNUNET_DATASTORE_get_next" with the given argument.
+ * "GNUNET_DATASTORE_iterate_get_next" with the given argument.
  *
  * @param h handle to the datastore
  * @param key maybe NULL (to match all entries)
@@ -1429,14 +1435,14 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
-                      const GNUNET_HashCode * key,
-                     enum GNUNET_BLOCK_Type type,
-                     unsigned int queue_priority,
-                     unsigned int max_queue_size,
-                     struct GNUNET_TIME_Relative timeout,
-                      GNUNET_DATASTORE_Iterator iter, 
-                     void *iter_cls)
+GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
+                             const GNUNET_HashCode * key,
+                             enum GNUNET_BLOCK_Type type,
+                             unsigned int queue_priority,
+                             unsigned int max_queue_size,
+                             struct GNUNET_TIME_Relative timeout,
+                             GNUNET_DATASTORE_Iterator iter, 
+                             void *iter_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GetMessage *gm;
@@ -1488,26 +1494,13 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
  * from the datastore.
  * 
  * @param h handle to the datastore
- * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
- *        iteration (with a final call to "iter" with key/data == NULL).
  */
 void 
-GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
-                          int more)
+GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
-  struct ResultContext rc = qe->qc.rc;
 
   GNUNET_assert (&process_result_message == qe->response_proc);
-  if (GNUNET_YES != more)
-    {
-      qe->qc.rc.iter = NULL;
-      qe->qc.rc.iter_cls = NULL;
-      if (rc.iter != NULL)
-       rc.iter (rc.iter_cls,
-                NULL, 0, NULL, 0, 0, 0, 
-                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
-    }
   h->in_receive = GNUNET_YES;
   GNUNET_CLIENT_receive (h->client,
                         qe->response_proc,
@@ -1526,7 +1519,6 @@ void
 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
 {
   struct GNUNET_DATASTORE_Handle *h;
-  int reconnect;
 
   h = qe->h;
 #if DEBUG_DATASTORE
@@ -1536,19 +1528,11 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
               qe->was_transmitted,
               h->queue_head == qe);
 #endif
-  reconnect = GNUNET_NO;
   if (GNUNET_YES == qe->was_transmitted) 
     {
-      if (qe->response_proc == &process_result_message)        
-       {
-         qe->qc.rc.iter = NULL;    
-         if (GNUNET_YES != h->in_receive)
-           GNUNET_DATASTORE_get_next (h, GNUNET_YES);
-       }
-      else
-       {
-         qe->qc.sc.cont = NULL;
-       }
+      free_queue_entry (qe);
+      h->retry_time = GNUNET_TIME_UNIT_ZERO;
+      do_disconnect (h);
       return;
     }
   free_queue_entry (qe);