de-experimentalizing
[oweals/gnunet.git] / src / datastore / datastore_api.c
index f9b3db81bdf8c7646594d46fad9331e0181ee21c..ef373695063ed11e94ed89c7b2239b9486571fae 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
+     (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -166,7 +166,13 @@ struct GNUNET_DATASTORE_QueueEntry
    * Note that the overall struct should end at a 
    * multiple of 64 bits.
    */
-  int32_t was_transmitted;
+  int was_transmitted;
+  
+  /**
+   * Are we expecting a single message in response to this
+   * request (and, if it is data, no 'END' message)?
+   */
+  int one_shot; 
   
 };
 
@@ -305,8 +311,9 @@ transmit_drop (void *cls,
  * @param h handle to the datastore
  * @param drop set to GNUNET_YES to delete all data in datastore (!)
  */
-void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
-                                 int drop)
+void
+GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
+                            int drop)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
@@ -668,7 +675,7 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
  * @param emsg error message
  */
 static void
-drop_status_cont (void *cls, int result, const char *emsg)
+drop_status_cont (void *cls, int32_t result, const char *emsg)
 {
   /* do nothing */
 }
@@ -792,6 +799,7 @@ process_status_message (void *cls,
  * @param type type of the content
  * @param priority priority of the content
  * @param anonymity anonymity-level for the content
+ * @param replication how often should the content be replicated to other peers?
  * @param expiration expiration time for the content
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
@@ -805,13 +813,14 @@ process_status_message (void *cls,
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
-                     int rid,
+                     uint32_t rid,
                       const GNUNET_HashCode * key,
                       size_t size,
                       const void *data,
                       enum GNUNET_BLOCK_Type type,
                       uint32_t priority,
                       uint32_t anonymity,
+                     uint32_t replication,
                       struct GNUNET_TIME_Absolute expiration,
                      unsigned int queue_priority,
                      unsigned int max_queue_size,
@@ -957,7 +966,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
-                                 int rid,
+                                 uint32_t rid,
                                  unsigned int queue_priority,
                                  unsigned int max_queue_size,
                                  struct GNUNET_TIME_Relative timeout,
@@ -1020,7 +1029,7 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
-                        unsigned long long uid,
+                        uint64_t uid,
                         uint32_t priority,
                         struct GNUNET_TIME_Absolute expiration,
                         unsigned int queue_priority,
@@ -1197,11 +1206,12 @@ process_result_message (void *cls,
   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
     {
       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
+      free_queue_entry (qe);
 #if DEBUG_DATASTORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Received end of result set\n");
+                 "Received end of result set, new queue size is %u\n",
+                 h->queue_size);
 #endif
-      free_queue_entry (qe);
       if (rc.iter != NULL)
        rc.iter (rc.iter_cls,
                 NULL, 0, NULL, 0, 0, 0, 
@@ -1247,7 +1257,10 @@ process_result_message (void *cls,
          do_disconnect (h);      
          return;
        }
-      GNUNET_DATASTORE_get_next (h, GNUNET_NO);
+      if (GNUNET_YES == qe->one_shot)
+       free_queue_entry (qe);
+      else
+       GNUNET_DATASTORE_iterate_get_next (h);
       return;
     }
   dm = (const struct DataMessage*) msg;
@@ -1259,6 +1272,8 @@ process_result_message (void *cls,
              ntohl(dm->size),
              GNUNET_h2s(&dm->key));
 #endif
+  if (GNUNET_YES == qe->one_shot)
+    free_queue_entry (qe);
   h->retry_time.rel_value = 0;
   rc.iter (rc.iter_cls,
           &dm->key,
@@ -1273,7 +1288,11 @@ process_result_message (void *cls,
 
 
 /**
- * Get a random value from the datastore.
+ * Get a random value from the datastore for content replication.
+ * Returns a single, random value among those with the highest
+ * replication score, lowering positive replication scores by one for
+ * the chosen value (if only content with a replication score exists,
+ * a random value is returned and replication scores are not changed).
  *
  * @param h handle to the datastore
  * @param queue_priority ranking of this request in the priority queue
@@ -1289,12 +1308,12 @@ process_result_message (void *cls,
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
-                            unsigned int queue_priority,
-                            unsigned int max_queue_size,
-                            struct GNUNET_TIME_Relative timeout,
-                             GNUNET_DATASTORE_Iterator iter, 
-                            void *iter_cls)
+GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
+                                     unsigned int queue_priority,
+                                     unsigned int max_queue_size,
+                                     struct GNUNET_TIME_Relative timeout,
+                                     GNUNET_DATASTORE_Iterator iter, 
+                                     void *iter_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GNUNET_MessageHeader *m;
@@ -1302,7 +1321,7 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
 
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to get random entry in %llu ms\n",
+             "Asked to get replication entry in %llu ms\n",
              (unsigned long long) timeout.rel_value);
 #endif
   qc.rc.iter = iter;
@@ -1314,16 +1333,17 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
     {
 #if DEBUG_DATASTORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Could not create queue entry for GET RANDOM\n");
+                 "Could not create queue entry for GET REPLICATION\n");
 #endif
       return NULL;    
     }
+  qe->one_shot = GNUNET_YES;
   GNUNET_STATISTICS_update (h->stats,
-                           gettext_noop ("# GET RANDOM requests executed"),
+                           gettext_noop ("# GET REPLICATION requests executed"),
                            1,
                            GNUNET_NO);
   m = (struct GNUNET_MessageHeader*) &qe[1];
-  m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
+  m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
   m->size = htons(sizeof (struct GNUNET_MessageHeader));
   process_queue (h);
   return qe;
@@ -1348,18 +1368,19 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
-                                    unsigned int queue_priority,
-                                    unsigned int max_queue_size,
-                                    struct GNUNET_TIME_Relative timeout,
-                                    enum GNUNET_BLOCK_Type type,
-                                    GNUNET_DATASTORE_Iterator iter, 
-                                    void *iter_cls)
+GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
+                                        unsigned int queue_priority,
+                                        unsigned int max_queue_size,
+                                        struct GNUNET_TIME_Relative timeout,
+                                        enum GNUNET_BLOCK_Type type,
+                                        GNUNET_DATASTORE_Iterator iter, 
+                                        void *iter_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GetZeroAnonymityMessage *m;
   union QueueContext qc;
 
+  GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Asked to get zero-anonymity entry in %llu ms\n",
@@ -1397,7 +1418,7 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
  * in the datastore.  The iterator will only be called
  * once initially; if the first call did contain a
  * result, further results can be obtained by calling
- * "GNUNET_DATASTORE_get_next" with the given argument.
+ * "GNUNET_DATASTORE_iterate_get_next" with the given argument.
  *
  * @param h handle to the datastore
  * @param key maybe NULL (to match all entries)
@@ -1414,14 +1435,14 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
-                      const GNUNET_HashCode * key,
-                     enum GNUNET_BLOCK_Type type,
-                     unsigned int queue_priority,
-                     unsigned int max_queue_size,
-                     struct GNUNET_TIME_Relative timeout,
-                      GNUNET_DATASTORE_Iterator iter, 
-                     void *iter_cls)
+GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
+                             const GNUNET_HashCode * key,
+                             enum GNUNET_BLOCK_Type type,
+                             unsigned int queue_priority,
+                             unsigned int max_queue_size,
+                             struct GNUNET_TIME_Relative timeout,
+                             GNUNET_DATASTORE_Iterator iter, 
+                             void *iter_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GetMessage *gm;
@@ -1473,26 +1494,13 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
  * from the datastore.
  * 
  * @param h handle to the datastore
- * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
- *        iteration (with a final call to "iter" with key/data == NULL).
  */
 void 
-GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
-                          int more)
+GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
-  struct ResultContext rc = qe->qc.rc;
 
   GNUNET_assert (&process_result_message == qe->response_proc);
-  if (GNUNET_YES != more)
-    {
-      qe->qc.rc.iter = NULL;
-      qe->qc.rc.iter_cls = NULL;
-      if (rc.iter != NULL)
-       rc.iter (rc.iter_cls,
-                NULL, 0, NULL, 0, 0, 0, 
-                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
-    }
   h->in_receive = GNUNET_YES;
   GNUNET_CLIENT_receive (h->client,
                         qe->response_proc,
@@ -1511,7 +1519,6 @@ void
 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
 {
   struct GNUNET_DATASTORE_Handle *h;
-  int reconnect;
 
   h = qe->h;
 #if DEBUG_DATASTORE
@@ -1521,19 +1528,11 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
               qe->was_transmitted,
               h->queue_head == qe);
 #endif
-  reconnect = GNUNET_NO;
   if (GNUNET_YES == qe->was_transmitted) 
     {
-      if (qe->response_proc == &process_result_message)        
-       {
-         qe->qc.rc.iter = NULL;    
-         if (GNUNET_YES != h->in_receive)
-           GNUNET_DATASTORE_get_next (h, GNUNET_YES);
-       }
-      else
-       {
-         qe->qc.sc.cont = NULL;
-       }
+      free_queue_entry (qe);
+      h->retry_time = GNUNET_TIME_UNIT_ZERO;
+      do_disconnect (h);
       return;
     }
   free_queue_entry (qe);