get FS test with CADET to finally pass again
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cadet_client.c
index 193fe2263f3c7173d85f92eb129d203ef007c59d..55e0cbc24629a686bba7c96aa04bf14c95e10cdf 100644 (file)
@@ -77,7 +77,7 @@ struct GSF_CadetRequest
   GSF_CadetReplyProcessor proc;
 
   /**
-   * Closure for 'proc'
+   * Closure for @e proc
    */
   void *proc_cls;
 
@@ -125,11 +125,6 @@ struct CadetHandle
    */
   struct GNUNET_CADET_Channel *channel;
 
-  /**
-   * Handle for active write operation, or NULL.
-   */
-  struct GNUNET_CADET_TransmitHandle *wh;
-
   /**
    * Which peer does this cadet go to?
    */
@@ -140,14 +135,14 @@ struct CadetHandle
    * a few seconds to give the application a chance to give
    * us another query).
    */
-  struct GNUNET_SCHEDULER_Task * timeout_task;
+  struct GNUNET_SCHEDULER_Task *timeout_task;
 
   /**
    * Task to reset cadets that had errors (asynchronously,
    * as we may not be able to do it immediately during a
    * callback from the cadet API).
    */
-  struct GNUNET_SCHEDULER_Task * reset_task;
+  struct GNUNET_SCHEDULER_Task *reset_task;
 
 };
 
@@ -170,10 +165,10 @@ struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
 /**
  * Transmit pending requests via the cadet.
  *
- * @param mh cadet to process
+ * @param cls `struct CadetHandle` to process
  */
 static void
-transmit_pending (struct CadetHandle *mh);
+transmit_pending (void *cls);
 
 
 /**
@@ -206,65 +201,19 @@ move_to_pending (void *cls,
 
 
 /**
- * We had a serious error, tear down and re-create cadet from scratch.
- *
- * @param mh cadet to reset
- */
-static void
-reset_cadet (struct CadetHandle *mh)
-{
-  struct GNUNET_CADET_Channel *channel = mh->channel;
-  struct GNUNET_HashCode port;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Resetting cadet channel to %s\n",
-             GNUNET_i2s (&mh->target));
-  mh->channel = NULL;
-
-  if (NULL != channel)
-  {
-    /* Avoid loop */
-    if (NULL != mh->wh)
-    {
-      GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
-      mh->wh = NULL;
-    }
-    GNUNET_CADET_channel_destroy (channel);
-  }
-  GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
-                                        &move_to_pending,
-                                        mh);
-  GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
-                      strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
-                      &port);
-  mh->channel = GNUNET_CADET_channel_create (cadet_handle,
-                                             mh,
-                                             &mh->target,
-                                             &port,
-                                             GNUNET_CADET_OPTION_RELIABLE);
-  transmit_pending (mh);
-}
-
-
-/**
- * Task called when it is time to destroy an inactive cadet channel.
+ * Functions with this signature are called whenever a complete reply
+ * is received.
  *
- * @param cls the `struct CadetHandle` to tear down
+ * @param cls closure with the `struct CadetHandle`
+ * @param srm the actual message
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
  */
-static void
-cadet_timeout (void *cls)
+static int
+check_reply (void *cls,
+             const struct CadetReplyMessage *srm)
 {
-  struct CadetHandle *mh = cls;
-  struct GNUNET_CADET_Channel *tun;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Timeout on cadet channel to %s\n",
-             GNUNET_i2s (&mh->target));
-  mh->timeout_task = NULL;
-  tun = mh->channel;
-  mh->channel = NULL;
-  if(NULL != tun)
-       GNUNET_CADET_channel_destroy (tun);
+  /* We check later... */
+  return GNUNET_OK;
 }
 
 
@@ -274,13 +223,7 @@ cadet_timeout (void *cls)
  * @param cls the `struct CadetHandle` to tear down
  */
 static void
-reset_cadet_task (void *cls)
-{
-  struct CadetHandle *mh = cls;
-
-  mh->reset_task = NULL;
-  reset_cadet (mh);
-}
+reset_cadet_task (void *cls);
 
 
 /**
@@ -299,83 +242,6 @@ reset_cadet_async (struct CadetHandle *mh)
 }
 
 
-/**
- * Functions of this signature are called whenever we are ready to transmit
- * query via a cadet.
- *
- * @param cls the struct CadetHandle for which we did the write call
- * @param size the number of bytes that can be written to @a buf
- * @param buf where to write the message
- * @return number of bytes written to @a buf
- */
-static size_t
-transmit_sqm (void *cls,
-             size_t size,
-             void *buf)
-{
-  struct CadetHandle *mh = cls;
-  struct CadetQueryMessage sqm;
-  struct GSF_CadetRequest *sr;
-
-  mh->wh = NULL;
-  if (NULL == buf)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Cadet channel to %s failed during transmission attempt, rebuilding\n",
-               GNUNET_i2s (&mh->target));
-    reset_cadet_async (mh);
-    return 0;
-  }
-  sr = mh->pending_head;
-  if (NULL == sr)
-    return 0;
-  GNUNET_assert (size >= sizeof (struct CadetQueryMessage));
-  GNUNET_CONTAINER_DLL_remove (mh->pending_head,
-                              mh->pending_tail,
-                              sr);
-  GNUNET_assert (GNUNET_OK ==
-                GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
-                                                   &sr->query,
-                                                   sr,
-                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-  sr->was_transmitted = GNUNET_YES;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Sending query for %s via cadet to %s\n",
-             GNUNET_h2s (&sr->query),
-             GNUNET_i2s (&mh->target));
-  sqm.header.size = htons (sizeof (sqm));
-  sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
-  sqm.type = htonl (sr->type);
-  sqm.query = sr->query;
-  GNUNET_memcpy (buf, &sqm, sizeof (sqm));
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Successfully transmitted %u bytes via cadet to %s\n",
-             (unsigned int) size,
-             GNUNET_i2s (&mh->target));
-  transmit_pending (mh);
-  return sizeof (sqm);
-}
-
-
-/**
- * Transmit pending requests via the cadet.
- *
- * @param mh cadet to process
- */
-static void
-transmit_pending (struct CadetHandle *mh)
-{
-  if (NULL == mh->channel)
-    return;
-  if (NULL != mh->wh)
-    return;
-  mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /* allow cork */,
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             sizeof (struct CadetQueryMessage),
-                                             &transmit_sqm, mh);
-}
-
-
 /**
  * Closure for handle_reply().
  */
@@ -393,7 +259,7 @@ struct HandleReplyClosure
   struct GNUNET_TIME_Absolute expiration;
 
   /**
-   * Number of bytes in 'data'.
+   * Number of bytes in @e data.
    */
   size_t data_size;
 
@@ -439,19 +305,24 @@ process_reply (void *cls,
 
 
 /**
- * Functions with this signature are called whenever a complete reply
- * is received.
+ * Iterator called on each entry in a waiting map to
+ * call the 'proc' continuation and release associated
+ * resources.
  *
- * @param cls closure with the `struct CadetHandle`
- * @param srm the actual message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
+ * @param cls the `struct CadetHandle`
+ * @param key the key of the entry in the map (the query)
+ * @param value the `struct GSF_CadetRequest` to clean up
+ * @return #GNUNET_YES (continue to iterate)
  */
 static int
-check_reply (void *cls,
-             const struct CadetReplyMessage *srm)
+free_waiting_entry (void *cls,
+                   const struct GNUNET_HashCode *key,
+                   void *value)
 {
-  /* We check later... */
-  return GNUNET_OK;
+  struct GSF_CadetRequest *sr = value;
+
+  GSF_cadet_query_cancel (sr);
+  return GNUNET_YES;
 }
 
 
@@ -516,28 +387,6 @@ handle_reply (void *cls,
 }
 
 
-/**
- * Iterator called on each entry in a waiting map to
- * call the 'proc' continuation and release associated
- * resources.
- *
- * @param cls the `struct CadetHandle`
- * @param key the key of the entry in the map (the query)
- * @param value the `struct GSF_CadetRequest` to clean up
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-free_waiting_entry (void *cls,
-                   const struct GNUNET_HashCode *key,
-                   void *value)
-{
-  struct GSF_CadetRequest *sr = value;
-
-  GSF_cadet_query_cancel (sr);
-  return GNUNET_YES;
-}
-
-
 /**
  * Function called by cadet when a client disconnects.
  * Cleans up our `struct CadetClient` of that channel.
@@ -569,8 +418,6 @@ disconnect_cb (void *cls,
   GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
                                         &free_waiting_entry,
                                         mh);
-  if (NULL != mh->wh)
-    GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
   if (NULL != mh->timeout_task)
     GNUNET_SCHEDULER_cancel (mh->timeout_task);
   if (NULL != mh->reset_task)
@@ -602,6 +449,133 @@ window_change_cb (void *cls,
                   int window_size)
 {
   /* FIXME: for flow control, implement? */
+#if 0
+  /* Something like this instead of the GNUNET_MQ_notify_sent() in
+     transmit_pending() might be good (once the window change CB works...) */
+  if (0 < window_size) /* test needed? */
+    transmit_pending (mh);
+#endif
+}
+
+
+/**
+ * We had a serious error, tear down and re-create cadet from scratch.
+ *
+ * @param mh cadet to reset
+ */
+static void
+reset_cadet (struct CadetHandle *mh)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Resetting cadet channel to %s\n",
+             GNUNET_i2s (&mh->target));
+  GNUNET_CADET_channel_destroy (mh->channel);
+  mh->channel = NULL;
+  GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+                                        &move_to_pending,
+                                        mh);
+  {
+    struct GNUNET_MQ_MessageHandler handlers[] = {
+      GNUNET_MQ_hd_var_size (reply,
+                             GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
+                             struct CadetReplyMessage,
+                             mh),
+      GNUNET_MQ_handler_end ()
+    };
+    struct GNUNET_HashCode port;
+
+    GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
+                        strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
+                        &port);
+    mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
+                                               mh,
+                                               &mh->target,
+                                               &port,
+                                               GNUNET_CADET_OPTION_RELIABLE,
+                                               &window_change_cb,
+                                               &disconnect_cb,
+                                               handlers);
+  }
+  transmit_pending (mh);
+}
+
+
+/**
+ * Task called when it is time to destroy an inactive cadet channel.
+ *
+ * @param cls the `struct CadetHandle` to tear down
+ */
+static void
+cadet_timeout (void *cls)
+{
+  struct CadetHandle *mh = cls;
+  struct GNUNET_CADET_Channel *tun;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Timeout on cadet channel to %s\n",
+             GNUNET_i2s (&mh->target));
+  mh->timeout_task = NULL;
+  tun = mh->channel;
+  mh->channel = NULL;
+  if (NULL != tun)
+    GNUNET_CADET_channel_destroy (tun);
+}
+
+
+/**
+ * Task called when it is time to reset an cadet.
+ *
+ * @param cls the `struct CadetHandle` to tear down
+ */
+static void
+reset_cadet_task (void *cls)
+{
+  struct CadetHandle *mh = cls;
+
+  mh->reset_task = NULL;
+  reset_cadet (mh);
+}
+
+
+/**
+ * Transmit pending requests via the cadet.
+ *
+ * @param cls `struct CadetHandle` to process
+ */
+static void
+transmit_pending (void *cls)
+{
+  struct CadetHandle *mh = cls;
+  struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
+  struct GSF_CadetRequest *sr;
+  struct GNUNET_MQ_Envelope *env;
+  struct CadetQueryMessage *sqm;
+
+  if ( (0 != GNUNET_MQ_get_length (mq)) ||
+       (NULL == (sr = mh->pending_head)) )
+    return;
+  GNUNET_CONTAINER_DLL_remove (mh->pending_head,
+                              mh->pending_tail,
+                              sr);
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
+                                                   &sr->query,
+                                                   sr,
+                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+  sr->was_transmitted = GNUNET_YES;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Sending query for %s via cadet to %s\n",
+             GNUNET_h2s (&sr->query),
+             GNUNET_i2s (&mh->target));
+  env = GNUNET_MQ_msg (sqm,
+                       GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
+  sqm->type = htonl (sr->type);
+  sqm->query = sr->query;
+  GNUNET_MQ_notify_sent (env,
+                         &transmit_pending,
+                         mh);
+  GNUNET_MQ_send (mq,
+                  env);
 }
 
 
@@ -614,7 +588,6 @@ static struct CadetHandle *
 get_cadet (const struct GNUNET_PeerIdentity *target)
 {
   struct CadetHandle *mh;
-  struct GNUNET_HashCode port;
 
   mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
                                          target);
@@ -641,10 +614,6 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
                                                    &mh->target,
                                                    mh,
                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
-                      strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
-                      &port);
-
   {
     struct GNUNET_MQ_MessageHandler handlers[] = {
       GNUNET_MQ_hd_var_size (reply,
@@ -653,7 +622,11 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
                              mh),
       GNUNET_MQ_handler_end ()
     };
+    struct GNUNET_HashCode port;
 
+    GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
+                        strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
+                        &port);
     mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
                                                mh,
                                                &mh->target,
@@ -679,9 +652,10 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
  */
 struct GSF_CadetRequest *
 GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
-               const struct GNUNET_HashCode *query,
-               enum GNUNET_BLOCK_Type type,
-               GSF_CadetReplyProcessor proc, void *proc_cls)
+                 const struct GNUNET_HashCode *query,
+                 enum GNUNET_BLOCK_Type type,
+                 GSF_CadetReplyProcessor proc,
+                 void *proc_cls)
 {
   struct CadetHandle *mh;
   struct GSF_CadetRequest *sr;