train hacks
[oweals/gnunet.git] / src / datastore / datastore_api.c
index c91884aa0d57f5b0c091387faa375efded085ae7..21675b0b7c0fecb1b54858cf676ff1e793f3e9bb 100644 (file)
@@ -4,7 +4,7 @@
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
@@ -214,6 +214,11 @@ struct GNUNET_DATASTORE_Handle
    */
   unsigned int queue_size;
 
+  /**
+   * Are we currently trying to receive from the service?
+   */
+  int in_receive;
+
 };
 
 
@@ -240,7 +245,7 @@ GNUNET_DATASTORE_connect (const struct
   if (c == NULL)
     return NULL; /* oops */
   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
-                    GNUNET_SERVER_MAX_MESSAGE_SIZE);
+                    GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
   h->client = c;
   h->cfg = cfg;
   h->sched = sched;
@@ -415,7 +420,6 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
   if (c > max_queue_size)
     {
       response_proc (ret, NULL);
-      GNUNET_free (ret);
       return NULL;
     }
   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
@@ -468,7 +472,15 @@ try_reconnect (void *cls,
   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
   if (h->client == NULL)
-    return;
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 "DATASTORE reconnect failed (fatally)\n");
+      return;
+    }
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Reconnected to DATASTORE\n");
+#endif
   process_queue (h);
 }
 
@@ -483,7 +495,19 @@ static void
 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
 {
   if (h->client == NULL)
-    return;
+    {
+#if DEBUG_DATASTORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "client NULL in disconnect, will not try to reconnect\n");
+#endif
+      return;
+    }
+#if 0
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# reconnected to DATASTORE"),
+                           1,
+                           GNUNET_NO);
+#endif
   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
   h->client = NULL;
   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
@@ -516,7 +540,7 @@ transmit_request (void *cls,
   if (buf == NULL)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Failed to transmit request to database.\n"));
+                 _("Failed to transmit request to DATASTORE.\n"));
       do_disconnect (h);
       return 0;
     }
@@ -525,11 +549,17 @@ transmit_request (void *cls,
       process_queue (h);
       return 0;
     }
+ #if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Transmitting %u byte request to DATASTORE\n",
+             msize);
+#endif
   memcpy (buf, &qe[1], msize);
   qe->was_transmitted = GNUNET_YES;
   GNUNET_SCHEDULER_cancel (h->sched,
                           qe->task);
   qe->task = GNUNET_SCHEDULER_NO_TASK;
+  h->in_receive = GNUNET_YES;
   GNUNET_CLIENT_receive (h->client,
                         qe->response_proc,
                         qe,
@@ -550,13 +580,42 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
   if (NULL == (qe = h->queue_head))
-    return; /* no entry in queue */
+    {
+#if DEBUG_DATASTORE > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Queue empty\n");
+#endif
+      return; /* no entry in queue */
+    }
   if (qe->was_transmitted == GNUNET_YES)
-    return; /* waiting for replies */
+    {
+#if DEBUG_DATASTORE > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Head request already transmitted\n");
+#endif
+      return; /* waiting for replies */
+    }
   if (h->th != NULL)
-    return; /* request pending */
+    {
+#if DEBUG_DATASTORE > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Pending transmission request\n");
+#endif
+      return; /* request pending */
+    }
   if (h->client == NULL)
-    return; /* waiting for reconnect */
+    {
+#if DEBUG_DATASTORE > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Not connected\n");
+#endif
+      return; /* waiting for reconnect */
+    }
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Queueing %u byte request to DATASTORE\n",
+             qe->message_size);
+#endif
   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
                                               qe->message_size,
                                               GNUNET_TIME_absolute_get_remaining (qe->timeout),
@@ -616,18 +675,25 @@ process_status_message (void *cls,
   const struct StatusMessage *sm;
   const char *emsg;
   int32_t status;
+  int was_transmitted;
 
-  free_queue_entry (qe);
+  h->in_receive = GNUNET_NO;
+  was_transmitted = qe->was_transmitted;
   if (msg == NULL)
     {      
+      free_queue_entry (qe);
       if (NULL == h->client)
        return; /* forced disconnect */
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Failed to receive response from database.\n"));
-      do_disconnect (h);
+      rc.cont (rc.cont_cls, 
+              GNUNET_SYSERR,
+              _("Failed to receive response from database."));
+      if (was_transmitted == GNUNET_YES)
+       do_disconnect (h);
       return;
     }
-
+  GNUNET_assert (GNUNET_YES == qe->was_transmitted);
+  GNUNET_assert (h->queue_head == qe);
+  free_queue_entry (qe);
   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
     {
@@ -723,7 +789,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
              GNUNET_h2s (key));
 #endif
   msize = sizeof(struct DataMessage) + size;
-  GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
+  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
   qe = make_queue_entry (h, msize,
@@ -970,7 +1036,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
   msize = sizeof(struct DataMessage) + size;
-  GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
+  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
   qe = make_queue_entry (h, msize,
                         queue_priority, max_queue_size, timeout,
                         &process_status_message, &qc);
@@ -1008,21 +1074,27 @@ process_result_message (void *cls,
   struct GNUNET_DATASTORE_Handle *h = qe->h;
   struct ResultContext rc = qe->qc.rc;
   const struct DataMessage *dm;
+  int was_transmitted;
 
-  GNUNET_assert (h->queue_head == qe);
+  h->in_receive = GNUNET_NO;
   if (msg == NULL)
-    {
-#if DEBUG_DATASTORE
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Failed to receive response from datastore\n"));
-#endif
+   {
+      was_transmitted = qe->was_transmitted;
       free_queue_entry (qe);
-      do_disconnect (h);
-      rc.iter (rc.iter_cls,
-              NULL, 0, NULL, 0, 0, 0, 
-              GNUNET_TIME_UNIT_ZERO_ABS, 0);   
+      if (was_transmitted == GNUNET_YES)
+       {
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                     _("Failed to receive response from database.\n"));
+         do_disconnect (h);
+       }
+      if (rc.iter != NULL)
+       rc.iter (rc.iter_cls,
+                NULL, 0, NULL, 0, 0, 0, 
+                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
       return;
     }
+  GNUNET_assert (GNUNET_YES == qe->was_transmitted);
+  GNUNET_assert (h->queue_head == qe);
   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
     {
       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
@@ -1031,9 +1103,10 @@ process_result_message (void *cls,
                  "Received end of result set\n");
 #endif
       free_queue_entry (qe);
-      rc.iter (rc.iter_cls,
-              NULL, 0, NULL, 0, 0, 0, 
-              GNUNET_TIME_UNIT_ZERO_ABS, 0);   
+      if (rc.iter != NULL)
+       rc.iter (rc.iter_cls,
+                NULL, 0, NULL, 0, 0, 0, 
+                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
       process_queue (h);
       return;
     }
@@ -1045,9 +1118,22 @@ process_result_message (void *cls,
       free_queue_entry (qe);
       h->retry_time = GNUNET_TIME_UNIT_ZERO;
       do_disconnect (h);
-      rc.iter (rc.iter_cls,
-              NULL, 0, NULL, 0, 0, 0, 
-              GNUNET_TIME_UNIT_ZERO_ABS, 0);   
+      if (rc.iter != NULL)
+       rc.iter (rc.iter_cls,
+                NULL, 0, NULL, 0, 0, 0, 
+                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
+      return;
+    }
+  if (rc.iter == NULL)
+    {
+      /* abort iteration */
+#if DEBUG_DATASTORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Aborting iteration via disconnect (client has cancelled)\n");
+#endif
+      free_queue_entry (qe);
+      h->retry_time = GNUNET_TIME_UNIT_ZERO;
+      do_disconnect (h);
       return;
     }
   dm = (const struct DataMessage*) msg;
@@ -1200,10 +1286,10 @@ GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
   struct ResultContext rc = qe->qc.rc;
 
-  GNUNET_assert (NULL != qe);
   GNUNET_assert (&process_result_message == qe->response_proc);
   if (GNUNET_YES == more)
     {     
+      h->in_receive = GNUNET_YES;
       GNUNET_CLIENT_receive (h->client,
                             qe->response_proc,
                             qe,
@@ -1232,14 +1318,35 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
   int reconnect;
 
   h = qe->h;
-  reconnect = qe->was_transmitted;
+#if DEBUG_DATASTORE
+  GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
+              "Pending DATASTORE request %p cancelled (%d, %d)\n",
+              qe,
+              qe->was_transmitted,
+              h->queue_head == qe);
+#endif
+  reconnect = GNUNET_NO;
+  if (GNUNET_YES == qe->was_transmitted) 
+    {
+      if (qe->response_proc == &process_result_message)        
+       {
+         qe->qc.rc.iter = NULL;    
+         if (GNUNET_YES != h->in_receive)
+           GNUNET_DATASTORE_get_next (h, GNUNET_YES);
+         return;
+       }
+      reconnect = GNUNET_YES;
+    }
   free_queue_entry (qe);
-  h->queue_size--;
   if (reconnect)
     {
       h->retry_time = GNUNET_TIME_UNIT_ZERO;
       do_disconnect (h);
     }
+  else
+    {
+      process_queue (h);
+    }
 }