make more robust to disconnect
authorChristian Grothoff <christian@grothoff.org>
Sun, 4 Apr 2010 22:02:08 +0000 (22:02 +0000)
committerChristian Grothoff <christian@grothoff.org>
Sun, 4 Apr 2010 22:02:08 +0000 (22:02 +0000)
src/datastore/datastore_api.c
src/datastore/gnunet-service-datastore.c

index cb70cc1f3ef726d3eecbf4ab642c1928f1e79750..fb1939cd91cef94ee34645d50f241700da4ca180 100644 (file)
@@ -143,23 +143,26 @@ transmit_drop (void *cls,
 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
                                  int drop)
 {
-  GNUNET_assert (0 == h->message_size);
-  GNUNET_assert (NULL == h->response_proc);
-  if ( (GNUNET_YES == drop) &&
-       (h->client != NULL) )
+  if (h->client != NULL)
+    GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+  h->client = NULL;
+  if (GNUNET_YES == drop) 
     {
-      if (NULL != 
-         GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                              sizeof(struct GNUNET_MessageHeader),
-                                              GNUNET_TIME_UNIT_MINUTES,
-                                              GNUNET_YES,
-                                              &transmit_drop,
-                                              h))
-       return;
+      h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
+      if (h->client != NULL)
+       {
+         if (NULL != 
+             GNUNET_CLIENT_notify_transmit_ready (h->client,
+                                                  sizeof(struct GNUNET_MessageHeader),
+                                                  GNUNET_TIME_UNIT_MINUTES,
+                                                  GNUNET_YES,
+                                                  &transmit_drop,
+                                                  h))
+           return;
+         GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+       }
       GNUNET_break (0);
     }
-  if (h->client != NULL)
-    GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
   GNUNET_ARM_stop_services (h->cfg, h->sched, "datastore", NULL);
   GNUNET_free (h);
 }
@@ -445,6 +448,22 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
 }
 
 
+/**
+ * Helper function that will initiate the transmission of a message to
+ * the datastore service.  The message must already be prepared and
+ * stored in the buffer at the end of the handle.  The message must be
+ * of a type that expects a "DataMessage" in response.
+ *
+ * @param h handle to the service with prepared message
+ * @param cont function to call with result
+ * @param cont_cls closure
+ * @param timeout timeout for the operation
+ */
+static void
+transmit_for_result (struct GNUNET_DATASTORE_Handle *h,
+                    GNUNET_DATASTORE_Iterator cont,
+                    void *cont_cls,
+                    struct GNUNET_TIME_Relative timeout);
 
 
 /**
@@ -464,18 +483,35 @@ with_result_response_handler (void *cls,
   GNUNET_DATASTORE_Iterator cont = h->response_proc;
   const struct DataMessage *dm;
   size_t msize;
+  struct GNUNET_TIME_Relative remaining;
 
-  h->message_size = 0;
   if (msg == NULL)
     {
+#if DEBUG_DATASTORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Got disconnected from datastore\n");
+#endif
       h->response_proc = NULL;
       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
-      cont (h->response_proc_cls, 
-           NULL, 0, NULL, 0, 0, 0, 
-           GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      remaining = GNUNET_TIME_absolute_get_remaining (h->timeout);
+      if (remaining.value > 0)
+       {
+         transmit_for_result (h,
+                              cont,
+                              h->response_proc_cls,
+                              remaining);
+       }
+      else
+       {
+         h->message_size = 0;
+         cont (h->response_proc_cls, 
+               NULL, 0, NULL, 0, 0, 0, 
+               GNUNET_TIME_UNIT_ZERO_ABS, 0);
+       }
       return;
     }
+  h->message_size = 0;
   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
     {
       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
index 9b23f61054577242f671588398c64c736b673591..48b45894bd8ac0c809452be4e8af10e4e152eecf 100644 (file)
@@ -870,22 +870,19 @@ handle_put (void *cls,
   struct ReservationList *pos;
   uint32_t size;
 
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Processing `%s' request\n",
-             "PUT");
-#endif
-  if (ntohl(dm->type) == 0) 
-    {
-      GNUNET_break (0);
-      dm = NULL;
-    }
-  if (dm == NULL)
+  if ( (dm == NULL) ||
+       (ntohl(dm->type) == 0) ) 
     {
       GNUNET_break (0);
       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       return;
     }
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Processing `%s' request for `%s'\n",
+             "PUT",
+             GNUNET_h2s (&dm->key));
+#endif
   rid = ntohl(dm->rid);
   size = ntohl(dm->size);
   if (rid > 0)
@@ -947,17 +944,12 @@ handle_put (void *cls,
  */
 static void
 handle_get (void *cls,
-            struct GNUNET_SERVER_Client *client,
-            const struct GNUNET_MessageHeader *message)
+           struct GNUNET_SERVER_Client *client,
+           const struct GNUNET_MessageHeader *message)
 {
   const struct GetMessage *msg;
   uint16_t size;
 
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Processing `%s' request\n",
-             "GET");
-#endif
   size = ntohs(message->size);
   if ( (size != sizeof(struct GetMessage)) &&
        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
@@ -966,12 +958,19 @@ handle_get (void *cls,
       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       return;
     }
+  msg = (const struct GetMessage*) message;
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Processing `%s' request for `%s' of type %u\n",
+             "GET",
+             GNUNET_h2s (&msg->key),
+             ntohl (msg->type));
+#endif
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# GET requests received"),
                            1,
                            GNUNET_NO);
   GNUNET_SERVER_client_keep (client);
-  msg = (const struct GetMessage*) message;
   if ( (size == sizeof(struct GetMessage)) &&
        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
                                                         &msg->key)) )
@@ -1017,17 +1016,18 @@ handle_update (void *cls,
   int ret;
   char *emsg;
 
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Processing `%s' request\n",
-             "UPDATE");
-#endif
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# UPDATE requests received"),
                            1,
                            GNUNET_NO);
   msg = (const struct UpdateMessage*) message;
   emsg = NULL;
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Processing `%s' request for %llu\n",
+             "UPDATE",
+             (unsigned long long) GNUNET_ntohll (msg->uid));
+#endif
   ret = plugin->api->update (plugin->api->cls,
                             GNUNET_ntohll(msg->uid),
                             (int32_t) ntohl(msg->priority),
@@ -1120,9 +1120,10 @@ remove_callback (void *cls,
   rc->found = GNUNET_YES;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Item %llu matches `%s' request.\n",
+             "Item %llu matches `%s' request for key `%s'.\n",
              (unsigned long long) uid,
-             "REMOVE");
+             "REMOVE",
+             GNUNET_h2s (key));
 #endif 
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# bytes removed (explicit request)"),
@@ -1151,17 +1152,18 @@ handle_remove (void *cls,
   GNUNET_HashCode vhash;
   struct RemoveContext *rc;
 
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Processing `%s' request\n",
-             "REMOVE");
-#endif
   if (dm == NULL)
     {
       GNUNET_break (0);
       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       return;
     }
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Processing `%s' request for `%s'\n",
+             "REMOVE",
+             GNUNET_h2s (&dm->key));
+#endif
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# REMOVE requests received"),
                            1,