adding configure code for --enable-benchmarks, --enable-expensive-tests, some clean up
[oweals/gnunet.git] / src / datastore / datastore_api.c
index 76d9e5f3c8363d4fe804863c61105c8a02fdfd83..a3196530eab54a1f65c0dc9bf65e8370b7719f7f 100644 (file)
@@ -315,6 +315,11 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
+  if (NULL != h->th)
+    {
+      GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
+      h->th = NULL;
+    }
   if (h->client != NULL)
     {
       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
@@ -344,11 +349,13 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
                                                   h))
            return;
          GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+         h->client = NULL;
        }
       GNUNET_break (0);
     }
   GNUNET_STATISTICS_destroy (h->stats,
                             GNUNET_NO);
+  h->stats = NULL;
   GNUNET_free (h);
 }
 
@@ -370,7 +377,7 @@ timeout_queue_entry (void *cls,
                            1,
                            GNUNET_NO);
   qe->task = GNUNET_SCHEDULER_NO_TASK;
-  GNUNET_assert (qe->was_transmitted == GNUNET_NO);
+  GNUNET_assert (qe->was_transmitted == GNUNET_NO); 
   qe->response_proc (qe->h, NULL);
 }
 
@@ -458,7 +465,8 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
   pos = ret->next;
   while (pos != NULL) 
     {
-      if (pos->max_queue < h->queue_size)
+      if ( (pos->max_queue < h->queue_size) &&
+          (pos->was_transmitted == GNUNET_NO) )
        {
          GNUNET_assert (pos->response_proc != NULL);
          /* move 'pos' element to head so that it will be 
@@ -469,6 +477,10 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
          GNUNET_CONTAINER_DLL_insert (h->queue_head,
                                       h->queue_tail,
                                       pos);
+         GNUNET_STATISTICS_update (h->stats,
+                                   gettext_noop ("# Requests dropped from datastore queue"),
+                                   1,
+                                   GNUNET_NO);
          pos->response_proc (h, NULL);
          break;
        }
@@ -558,6 +570,37 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h)
 }
 
 
+/**
+ * Function called whenever we receive a message from
+ * the service.  Calls the appropriate handler.
+ *
+ * @param cls the 'struct GNUNET_DATASTORE_Handle'
+ * @param msg the received message
+ */
+static void 
+receive_cb (void *cls,
+           const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_DATASTORE_Handle *h = cls;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+
+  h->in_receive = GNUNET_NO;
+  if (h->skip_next_messages > 0)
+    {
+      h->skip_next_messages--;
+      process_queue (h);
+      return;
+   } 
+  if (NULL == (qe = h->queue_head))
+    {
+      GNUNET_break (0);
+      process_queue (h);
+      return; 
+    }
+  qe->response_proc (h, msg);
+}
+
+
 /**
  * Transmit request from queue to datastore service.
  *
@@ -603,9 +646,10 @@ transmit_request (void *cls,
   qe->was_transmitted = GNUNET_YES;
   GNUNET_SCHEDULER_cancel (qe->task);
   qe->task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_assert (GNUNET_NO == h->in_receive);
   h->in_receive = GNUNET_YES;
   GNUNET_CLIENT_receive (h->client,
-                        qe->response_proc,
+                        &receive_cb,
                         h,
                         GNUNET_TIME_absolute_get_remaining (qe->timeout));
   GNUNET_STATISTICS_update (h->stats,
@@ -675,6 +719,8 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
                                               GNUNET_YES,
                                               &transmit_request,
                                               h);
+  GNUNET_assert (GNUNET_NO == h->in_receive);
+  GNUNET_break (NULL != h->th);
 }
 
 
@@ -692,6 +738,13 @@ drop_status_cont (void *cls, int32_t result, const char *emsg)
 }
 
 
+/**
+ * Free a queue entry.  Removes the given entry from the
+ * queue and releases associated resources.  Does NOT
+ * call the callback.
+ * 
+ * @param qe entry to free.
+ */
 static void
 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
 {
@@ -710,6 +763,7 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
   GNUNET_free (qe);
 }
 
+
 /**
  * Type of a function to call when we receive a message
  * from the service.
@@ -730,13 +784,6 @@ process_status_message (void *cls,
   int32_t status;
   int was_transmitted;
 
-  h->in_receive = GNUNET_NO;
-  if (h->skip_next_messages > 0)
-    {
-      h->skip_next_messages--;
-      process_queue (h);
-      return;
-   } 
   if (NULL == (qe = h->queue_head))
     {
       GNUNET_break (0);
@@ -756,6 +803,8 @@ process_status_message (void *cls,
                 _("Failed to receive status response from database."));
       if (was_transmitted == GNUNET_YES)
        do_disconnect (h);
+      else
+       process_queue (h);
       return;
     }
   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
@@ -1203,13 +1252,6 @@ process_result_message (void *cls,
   struct ResultContext rc;
   const struct DataMessage *dm;
 
-  h->in_receive = GNUNET_NO;
-  if (h->skip_next_messages > 0)
-    {
-      h->skip_next_messages--;
-      process_queue (h);
-      return;
-    }
   if (msg == NULL)
     {
       qe = h->queue_head;
@@ -1226,6 +1268,8 @@ process_result_message (void *cls,
                     NULL, 0, NULL, 0, 0, 0, 
                     GNUNET_TIME_UNIT_ZERO_ABS, 0);    
        }
+      else
+       process_queue (h);
       return;
     }
   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
@@ -1281,15 +1325,17 @@ process_result_message (void *cls,
 #endif
   free_queue_entry (qe);
   h->retry_time.rel_value = 0;
-  rc.proc (rc.proc_cls,
-          &dm->key,
-          ntohl(dm->size),
-          &dm[1],
-          ntohl(dm->type),
-          ntohl(dm->priority),
-          ntohl(dm->anonymity),
-          GNUNET_TIME_absolute_ntoh(dm->expiration),   
-          GNUNET_ntohll(dm->uid));
+  process_queue (h);
+  if (rc.proc != NULL)
+    rc.proc (rc.proc_cls,
+            &dm->key,
+            ntohl(dm->size),
+            &dm[1],
+            ntohl(dm->type),
+            ntohl(dm->priority),
+            ntohl(dm->anonymity),
+            GNUNET_TIME_absolute_ntoh(dm->expiration), 
+            GNUNET_ntohll(dm->uid));
 }