fixing reconnect issues
[oweals/gnunet.git] / src / datastore / datastore_api.c
index c91884aa0d57f5b0c091387faa375efded085ae7..60a5b7479dd97c43c6350109046843a55e40d71e 100644 (file)
@@ -415,7 +415,6 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
   if (c > max_queue_size)
     {
       response_proc (ret, NULL);
-      GNUNET_free (ret);
       return NULL;
     }
   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
@@ -484,6 +483,12 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h)
 {
   if (h->client == NULL)
     return;
+#if 0
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# reconnected to datastore"),
+                           1,
+                           GNUNET_NO);
+#endif
   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
   h->client = NULL;
   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
@@ -616,18 +621,25 @@ process_status_message (void *cls,
   const struct StatusMessage *sm;
   const char *emsg;
   int32_t status;
+  int was_transmitted;
 
-  free_queue_entry (qe);
+  was_transmitted = qe->was_transmitted;
   if (msg == NULL)
     {      
+      free_queue_entry (qe);
       if (NULL == h->client)
        return; /* forced disconnect */
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Failed to receive response from database.\n"));
-      do_disconnect (h);
+      if (was_transmitted == GNUNET_YES)
+       {
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                     _("Failed to receive response from database.\n"));
+         do_disconnect (h);
+       }
       return;
     }
-
+  GNUNET_assert (GNUNET_YES == qe->was_transmitted);
+  GNUNET_assert (h->queue_head == qe);
+  free_queue_entry (qe);
   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
     {
@@ -1008,21 +1020,26 @@ process_result_message (void *cls,
   struct GNUNET_DATASTORE_Handle *h = qe->h;
   struct ResultContext rc = qe->qc.rc;
   const struct DataMessage *dm;
+  int was_transmitted;
 
-  GNUNET_assert (h->queue_head == qe);
   if (msg == NULL)
     {
-#if DEBUG_DATASTORE
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Failed to receive response from datastore\n"));
-#endif
+      was_transmitted = qe->was_transmitted;
       free_queue_entry (qe);
-      do_disconnect (h);
-      rc.iter (rc.iter_cls,
-              NULL, 0, NULL, 0, 0, 0, 
-              GNUNET_TIME_UNIT_ZERO_ABS, 0);   
+      if (was_transmitted == GNUNET_YES)
+       {
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                     _("Failed to receive response from database.\n"));
+         do_disconnect (h);
+       }
+      if (rc.iter != NULL)
+       rc.iter (rc.iter_cls,
+                NULL, 0, NULL, 0, 0, 0, 
+                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
       return;
     }
+  GNUNET_assert (GNUNET_YES == qe->was_transmitted);
+  GNUNET_assert (h->queue_head == qe);
   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
     {
       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
@@ -1031,9 +1048,10 @@ process_result_message (void *cls,
                  "Received end of result set\n");
 #endif
       free_queue_entry (qe);
-      rc.iter (rc.iter_cls,
-              NULL, 0, NULL, 0, 0, 0, 
-              GNUNET_TIME_UNIT_ZERO_ABS, 0);   
+      if (rc.iter != NULL)
+       rc.iter (rc.iter_cls,
+                NULL, 0, NULL, 0, 0, 0, 
+                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
       process_queue (h);
       return;
     }
@@ -1045,9 +1063,16 @@ process_result_message (void *cls,
       free_queue_entry (qe);
       h->retry_time = GNUNET_TIME_UNIT_ZERO;
       do_disconnect (h);
-      rc.iter (rc.iter_cls,
-              NULL, 0, NULL, 0, 0, 0, 
-              GNUNET_TIME_UNIT_ZERO_ABS, 0);   
+      if (rc.iter != NULL)
+       rc.iter (rc.iter_cls,
+                NULL, 0, NULL, 0, 0, 0, 
+                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
+      return;
+    }
+  if (rc.iter == NULL)
+    {
+      /* abort iteration */
+      do_disconnect (h);
       return;
     }
   dm = (const struct DataMessage*) msg;
@@ -1232,7 +1257,16 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
   int reconnect;
 
   h = qe->h;
-  reconnect = qe->was_transmitted;
+  reconnect = GNUNET_NO;
+  if (GNUNET_YES == qe->was_transmitted) 
+    {
+      if (qe->response_proc == &process_result_message)        
+       {
+         qe->qc.rc.iter = NULL;    
+         return;
+       }
+      reconnect = GNUNET_YES;
+    }
   free_queue_entry (qe);
   h->queue_size--;
   if (reconnect)