fix
[oweals/gnunet.git] / src / datastore / datastore_api.c
index da63058e4569105c3aa81c6d6db986b579c910a4..dcafeb839d659ab6461edcddbae7775e538438ab 100644 (file)
@@ -4,7 +4,7 @@
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
@@ -214,6 +214,11 @@ struct GNUNET_DATASTORE_Handle
    */
   unsigned int queue_size;
 
+  /**
+   * Are we currently trying to receive from the service?
+   */
+  int in_receive;
+
 };
 
 
@@ -240,7 +245,7 @@ GNUNET_DATASTORE_connect (const struct
   if (c == NULL)
     return NULL; /* oops */
   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
-                    GNUNET_SERVER_MAX_MESSAGE_SIZE);
+                    GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
   h->client = c;
   h->cfg = cfg;
   h->sched = sched;
@@ -467,7 +472,15 @@ try_reconnect (void *cls,
   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
   if (h->client == NULL)
-    return;
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 "DATASTORE reconnect failed (fatally)\n");
+      return;
+    }
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Reconnected to DATASTORE\n");
+#endif
   process_queue (h);
 }
 
@@ -482,10 +495,16 @@ static void
 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
 {
   if (h->client == NULL)
-    return;
+    {
+#if DEBUG_DATASTORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "client NULL in disconnect, will not try to reconnect\n");
+#endif
+      return;
+    }
 #if 0
   GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# reconnected to datastore"),
+                           gettext_noop ("# reconnected to DATASTORE"),
                            1,
                            GNUNET_NO);
 #endif
@@ -521,7 +540,7 @@ transmit_request (void *cls,
   if (buf == NULL)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Failed to transmit request to database.\n"));
+                 _("Failed to transmit request to DATASTORE.\n"));
       do_disconnect (h);
       return 0;
     }
@@ -530,11 +549,17 @@ transmit_request (void *cls,
       process_queue (h);
       return 0;
     }
+ #if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Transmitting %u byte request to DATASTORE\n",
+             msize);
+#endif
   memcpy (buf, &qe[1], msize);
   qe->was_transmitted = GNUNET_YES;
   GNUNET_SCHEDULER_cancel (h->sched,
                           qe->task);
   qe->task = GNUNET_SCHEDULER_NO_TASK;
+  h->in_receive = GNUNET_YES;
   GNUNET_CLIENT_receive (h->client,
                         qe->response_proc,
                         qe,
@@ -555,13 +580,42 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
   if (NULL == (qe = h->queue_head))
-    return; /* no entry in queue */
+    {
+#if DEBUG_DATASTORE > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Queue empty\n");
+#endif
+      return; /* no entry in queue */
+    }
   if (qe->was_transmitted == GNUNET_YES)
-    return; /* waiting for replies */
+    {
+#if DEBUG_DATASTORE > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Head request already transmitted\n");
+#endif
+      return; /* waiting for replies */
+    }
   if (h->th != NULL)
-    return; /* request pending */
+    {
+#if DEBUG_DATASTORE > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Pending transmission request\n");
+#endif
+      return; /* request pending */
+    }
   if (h->client == NULL)
-    return; /* waiting for reconnect */
+    {
+#if DEBUG_DATASTORE > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Not connected\n");
+#endif
+      return; /* waiting for reconnect */
+    }
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Queueing %u byte request to DATASTORE\n",
+             qe->message_size);
+#endif
   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
                                               qe->message_size,
                                               GNUNET_TIME_absolute_get_remaining (qe->timeout),
@@ -621,18 +675,26 @@ process_status_message (void *cls,
   const struct StatusMessage *sm;
   const char *emsg;
   int32_t status;
+  int was_transmitted;
 
-  free_queue_entry (qe);
+  h->in_receive = GNUNET_NO;
+  was_transmitted = qe->was_transmitted;
   if (msg == NULL)
     {      
+      free_queue_entry (qe);
       if (NULL == h->client)
        return; /* forced disconnect */
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Failed to receive response from database.\n"));
-      do_disconnect (h);
+      if (was_transmitted == GNUNET_YES)
+       {
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                     _("Failed to receive response from database.\n"));
+         do_disconnect (h);
+       }
       return;
     }
-
+  GNUNET_assert (GNUNET_YES == qe->was_transmitted);
+  GNUNET_assert (h->queue_head == qe);
+  free_queue_entry (qe);
   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
     {
@@ -728,7 +790,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
              GNUNET_h2s (key));
 #endif
   msize = sizeof(struct DataMessage) + size;
-  GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
+  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
   qe = make_queue_entry (h, msize,
@@ -975,7 +1037,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
   msize = sizeof(struct DataMessage) + size;
-  GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
+  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
   qe = make_queue_entry (h, msize,
                         queue_priority, max_queue_size, timeout,
                         &process_status_message, &qc);
@@ -1015,16 +1077,17 @@ process_result_message (void *cls,
   const struct DataMessage *dm;
   int was_transmitted;
 
+  h->in_receive = GNUNET_NO;
   if (msg == NULL)
     {
-#if DEBUG_DATASTORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Failed to receive response from datastore or queue full\n");
-#endif
       was_transmitted = qe->was_transmitted;
       free_queue_entry (qe);
-      if (GNUNET_YES == was_transmitted)       
-       do_disconnect (h);
+      if (was_transmitted == GNUNET_YES)
+       {
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                     _("Failed to receive response from database.\n"));
+         do_disconnect (h);
+       }
       if (rc.iter != NULL)
        rc.iter (rc.iter_cls,
                 NULL, 0, NULL, 0, 0, 0, 
@@ -1065,6 +1128,12 @@ process_result_message (void *cls,
   if (rc.iter == NULL)
     {
       /* abort iteration */
+#if DEBUG_DATASTORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Aborting iteration via disconnect (client has cancelled)\n");
+#endif
+      free_queue_entry (qe);
+      h->retry_time = GNUNET_TIME_UNIT_ZERO;
       do_disconnect (h);
       return;
     }
@@ -1218,10 +1287,10 @@ GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
   struct ResultContext rc = qe->qc.rc;
 
-  GNUNET_assert (NULL != qe);
   GNUNET_assert (&process_result_message == qe->response_proc);
   if (GNUNET_YES == more)
     {     
+      h->in_receive = GNUNET_YES;
       GNUNET_CLIENT_receive (h->client,
                             qe->response_proc,
                             qe,
@@ -1250,22 +1319,35 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
   int reconnect;
 
   h = qe->h;
+#if DEBUG_DATASTORE
+  GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
+              "Pending DATASTORE request %p cancelled (%d, %d)\n",
+              qe,
+              qe->was_transmitted,
+              h->queue_head == qe);
+#endif
   reconnect = GNUNET_NO;
   if (GNUNET_YES == qe->was_transmitted) 
     {
       if (qe->response_proc == &process_result_message)        
-       qe->qc.rc.iter = NULL;    
-      else
-       reconnect = GNUNET_YES;
+       {
+         qe->qc.rc.iter = NULL;    
+         if (GNUNET_YES != h->in_receive)
+           GNUNET_DATASTORE_get_next (h, GNUNET_YES);
+         return;
+       }
+      reconnect = GNUNET_YES;
     }
-
   free_queue_entry (qe);
-  h->queue_size--;
   if (reconnect)
     {
       h->retry_time = GNUNET_TIME_UNIT_ZERO;
       do_disconnect (h);
     }
+  else
+    {
+      process_queue (h);
+    }
 }