use v2
[oweals/gnunet.git] / src / datastore / datastore_api.c
index f3db9446940ae8f20bf8838dd9b75bffd4bffae1..12ec8bf0535bc2bfe7bf6c76f62a9005965f2e79 100644 (file)
@@ -243,19 +243,20 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
   if (h->client != NULL)
-    GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+    {
+      GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+      h->client = NULL;
+    }
   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel (h->sched,
-                            h->reconnect_task);
-  h->client = NULL;
+    {
+      GNUNET_SCHEDULER_cancel (h->sched,
+                              h->reconnect_task);
+      h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   while (NULL != (qe = h->queue_head))
     {
-      GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                                  h->queue_tail,
-                                  qe);
-      if (NULL != qe->response_proc)
-       qe->response_proc (qe, NULL);
-      GNUNET_free (qe);
+      GNUNET_assert (NULL != qe->response_proc);
+      qe->response_proc (qe, NULL);
     }
   if (GNUNET_YES == drop) 
     {
@@ -332,8 +333,15 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
       c++;
       pos = pos->next;
     }
-  if (c >= max_queue_size)
-    return NULL;
+  ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
+  ret->h = h;
+  ret->response_proc = response_proc;
+  ret->client_ctx = client_ctx;
+  ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+  ret->priority = queue_priority;
+  ret->max_queue = max_queue_size;
+  ret->message_size = msize;
+  ret->was_transmitted = GNUNET_NO;
   if (pos == NULL)
     {
       /* append at the tail */
@@ -348,39 +356,29 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
           (h->queue_head->was_transmitted) )
        pos = h->queue_head;
     }
-  ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
+  c++;
   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
                                     h->queue_tail,
                                     pos,
                                     ret);
-  ret->h = h;
-  ret->response_proc = response_proc;
-  ret->client_ctx = client_ctx;
+  h->queue_size++;
+  if (c > max_queue_size)
+    {
+      response_proc (ret, NULL);
+      GNUNET_free (ret);
+      return NULL;
+    }
   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
                                            timeout,
                                            &timeout_queue_entry,
                                            ret);
-  ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
-  ret->priority = queue_priority;
-  ret->max_queue = max_queue_size;
-  ret->message_size = msize;
-  ret->was_transmitted = GNUNET_NO;
-  h->queue_size++;
-  c++;
   pos = ret->next;
   while (pos != NULL) 
     {
       if (pos->max_queue < h->queue_size)
        {
-         GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                                      h->queue_tail,
-                                      pos);
-         GNUNET_SCHEDULER_cancel (h->sched,
-                                  pos->task);
-         if (pos->response_proc != NULL)
-           pos->response_proc (pos, NULL);
-         GNUNET_free (pos);
-         h->queue_size--;
+         GNUNET_assert (pos->response_proc != NULL);
+         pos->response_proc (pos, NULL);
          break;
        }
       pos = pos->next;
@@ -429,12 +427,13 @@ try_reconnect (void *cls,
  * Disconnect from the service and then try reconnecting to the datastore service
  * after some delay.
  *
- * @param cls the 'struct GNUNET_DATASTORE_Handle'
- * @param tc scheduler context
+ * @param h handle to datastore to disconnect and reconnect
  */
 static void
 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
 {
+  if (h->client == NULL)
+    return;
   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
   h->client = NULL;
   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
@@ -551,6 +550,24 @@ drop_status_cont (void *cls, int result, const char *emsg)
 }
 
 
+static void
+free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
+{
+  struct GNUNET_DATASTORE_Handle *h = qe->h;
+
+  GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                              h->queue_tail,
+                              qe);
+  if (qe->task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (h->sched,
+                              qe->task);
+      qe->task = GNUNET_SCHEDULER_NO_TASK;
+    }
+  h->queue_size--;
+  GNUNET_free (qe);
+}
+
 /**
  * Type of a function to call when we receive a message
  * from the service.
@@ -570,12 +587,11 @@ process_status_message (void *cls,
   const char *emsg;
   int32_t status;
 
-  GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                              h->queue_tail,
-                              qe);
-  GNUNET_free (qe);
+  free_queue_entry (qe);
   if (msg == NULL)
-    {
+    {      
+      if (NULL == h->client)
+       return; /* forced disconnect */
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                  _("Failed to receive response from database.\n"));
       do_disconnect (h);
@@ -989,16 +1005,14 @@ process_result_message (void *cls,
   struct ResultContext *rc = qe->client_ctx;
   const struct DataMessage *dm;
 
+  GNUNET_assert (h->queue_head == qe);
   if (msg == NULL)
     {
 #if DEBUG_DATASTORE
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                  _("Failed to receive response from datastore\n"));
 #endif
-      GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                                  h->queue_tail,
-                                  qe);
-      GNUNET_free (qe);
+      free_queue_entry (qe);
       do_disconnect (h);
       rc->iter (rc->iter_cls,
                NULL, 0, NULL, 0, 0, 0, 
@@ -1013,10 +1027,7 @@ process_result_message (void *cls,
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Received end of result set\n");
 #endif
-      GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                                  h->queue_tail,
-                                  qe);
-      GNUNET_free (qe);
+      free_queue_entry (qe);
       rc->iter (rc->iter_cls,
                NULL, 0, NULL, 0, 0, 0, 
                GNUNET_TIME_UNIT_ZERO_ABS, 0);  
@@ -1029,10 +1040,7 @@ process_result_message (void *cls,
        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
     {
       GNUNET_break (0);
-      GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                                  h->queue_tail,
-                                  qe);
-      GNUNET_free (qe);
+      free_queue_entry (qe);
       h->retry_time = GNUNET_TIME_UNIT_ZERO;
       do_disconnect (h);
       rc->iter (rc->iter_cls,
@@ -1102,7 +1110,7 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
                         queue_priority, max_queue_size, timeout,
                         &process_result_message, rcont);
   if (qe == NULL)
-    return NULL;
+    return NULL;    
   m = (struct GNUNET_MessageHeader*) &qe[1];
   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
   m->size = htons(sizeof (struct GNUNET_MessageHeader));
@@ -1203,10 +1211,7 @@ GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
                             GNUNET_TIME_absolute_get_remaining (qe->timeout));
       return;
     }
-  GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                              h->queue_tail,
-                              qe);
-  GNUNET_free (qe);
+  free_queue_entry (qe);
   h->retry_time = GNUNET_TIME_UNIT_ZERO;
   do_disconnect (h);
   rc->iter (rc->iter_cls,
@@ -1230,13 +1235,8 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
 
   h = qe->h;
   reconnect = qe->was_transmitted;
-  GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                              h->queue_tail,
-                              qe);
-  if (qe->task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel (h->sched,
-                            qe->task);
-  GNUNET_free (qe);
+  free_queue_entry (qe);
+  h->queue_size--;
   if (reconnect)
     {
       h->retry_time = GNUNET_TIME_UNIT_ZERO;