tolerate additional IPv4 address now available for gnunet.org
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pe.c
index b338c1a13127b51666853815fc13b25aa68e4ee4..5e85bfdb7c470bd9246920d833c6f4c16101a353 100644 (file)
@@ -2,20 +2,20 @@
      This file is part of GNUnet.
      Copyright (C) 2011 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
+     Affero General Public License for more details.
+    
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
-     Boston, MA 02110-1301, USA.
+     SPDX-License-Identifier: AGPL3.0-or-later
 */
 
 /**
@@ -188,11 +188,6 @@ struct PeerPlan
    */
   struct GNUNET_CONTAINER_MultiHashMap *plan_map;
 
-  /**
-   * Current transmission request handle.
-   */
-  struct GSF_PeerTransmitHandle *pth;
-
   /**
    * Peer for which this is the plan.
    */
@@ -202,6 +197,12 @@ struct PeerPlan
    * Current task for executing the plan.
    */
   struct GNUNET_SCHEDULER_Task *task;
+
+  /**
+   * Current message under transmission for the plan.
+   */
+  struct GNUNET_MQ_Envelope *env;
+
 };
 
 
@@ -240,15 +241,6 @@ get_rp_key (struct GSF_RequestPlan *rp)
 }
 
 
-/**
- * Figure out when and how to transmit to the given peer.
- *
- * @param cls the `struct GSF_ConnectedPeer` for transmission
- */
-static void
-schedule_peer_transmission (void *cls);
-
-
 /**
  * Insert the given request plan into the heap with the appropriate weight.
  *
@@ -329,21 +321,22 @@ plan (struct PeerPlan *pp,
   rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Earliest (re)transmission for `%s' in %us\n",
-              GNUNET_h2s (&prd->query), rp->transmission_counter);
+              GNUNET_h2s (&prd->query),
+             rp->transmission_counter);
   GNUNET_assert (rp->hn == NULL);
   if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us)
-    rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
+    rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
+                                          rp,
+                                          rp->priority);
   else
     rp->hn =
-        GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp,
+        GNUNET_CONTAINER_heap_insert (pp->delay_heap,
+                                     rp,
                                       rp->earliest_transmission.abs_value_us);
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
                                                                get_rp_key (rp),
                                                                rp));
-  if (NULL != pp->task)
-    GNUNET_SCHEDULER_cancel (pp->task);
-  pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
 #undef N
 }
 
@@ -382,75 +375,6 @@ get_latest (const struct GSF_RequestPlan *rp)
 }
 
 
-/**
- * Function called to get a message for transmission.
- *
- * @param cls closure
- * @param buf_size number of bytes available in @a buf
- * @param buf where to copy the message, NULL on error (peer disconnect)
- * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
- */
-static size_t
-transmit_message_callback (void *cls,
-                           size_t buf_size,
-                           void *buf)
-{
-  struct PeerPlan *pp = cls;
-  struct GSF_RequestPlan *rp;
-  size_t msize;
-
-  pp->pth = NULL;
-  if (NULL == buf)
-  {
-    /* failed, try again... */
-    if (NULL != pp->task)
-      GNUNET_SCHEDULER_cancel (pp->task);
-
-    pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
-    GNUNET_STATISTICS_update (GSF_stats,
-                              gettext_noop
-                              ("# transmission failed (core has no bandwidth)"),
-                              1, GNUNET_NO);
-    return 0;
-  }
-  rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
-  if (NULL == rp)
-  {
-    if (NULL != pp->task)
-      GNUNET_SCHEDULER_cancel (pp->task);
-    pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
-    return 0;
-  }
-  msize = GSF_pending_request_get_message_ (get_latest (rp),
-                                            buf_size,
-                                            buf);
-  if (msize > buf_size)
-  {
-    if (NULL != pp->task)
-      GNUNET_SCHEDULER_cancel (pp->task);
-    /* buffer to small (message changed), try again */
-    pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
-    return 0;
-  }
-  /* remove from root, add again elsewhere... */
-  GNUNET_assert (rp ==
-                 GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
-  rp->hn = NULL;
-  rp->last_transmission = GNUNET_TIME_absolute_get ();
-  rp->transmission_counter++;
-  total_delay++;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Executing plan %p executed %u times, planning retransmission\n",
-              rp, rp->transmission_counter);
-  plan (pp, rp);
-  GNUNET_STATISTICS_update (GSF_stats,
-                            gettext_noop ("# query messages sent to other peers"),
-                            1,
-                            GNUNET_NO);
-  return msize;
-}
-
-
 /**
  * Figure out when and how to transmit to the given peer.
  *
@@ -461,14 +385,16 @@ schedule_peer_transmission (void *cls)
 {
   struct PeerPlan *pp = cls;
   struct GSF_RequestPlan *rp;
-  size_t msize;
   struct GNUNET_TIME_Relative delay;
 
-  pp->task = NULL;
-  if (NULL != pp->pth)
+  if (NULL != pp->task)
   {
-    GSF_peer_transmit_cancel_ (pp->pth);
-    pp->pth = NULL;
+    pp->task = NULL;
+  }
+  else
+  {
+    GNUNET_assert (NULL != pp->env);
+    pp->env = NULL;
   }
   /* move ready requests to priority queue */
   while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
@@ -501,30 +427,47 @@ schedule_peer_transmission (void *cls)
                            gettext_noop ("# delay heap timeout (ms)"),
                            delay.rel_value_us / 1000LL, GNUNET_NO);
 
-    pp->task =
-        GNUNET_SCHEDULER_add_delayed (delay,
-                                      &schedule_peer_transmission,
-                                      pp);
+    pp->task
+      = GNUNET_SCHEDULER_add_at (rp->earliest_transmission,
+                                 &schedule_peer_transmission,
+                                 pp);
     return;
   }
 #if INSANE_STATISTICS
-  GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"),
-                            1, GNUNET_NO);
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# query plans executed"),
+                            1,
+                           GNUNET_NO);
 #endif
   /* process from priority heap */
-  rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
+  rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Executing query plan %p\n",
               rp);
   GNUNET_assert (NULL != rp);
-  msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
-  pp->pth =
-      GSF_peer_transmit_ (pp->cp, GNUNET_YES,
-                          rp->priority,
-                          GNUNET_TIME_UNIT_FOREVER_REL,
-                          msize,
-                          &transmit_message_callback, pp);
-  GNUNET_assert (NULL != pp->pth);
+  rp->hn = NULL;
+  rp->last_transmission = GNUNET_TIME_absolute_get ();
+  rp->transmission_counter++;
+  total_delay++;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Executing plan %p executed %u times, planning retransmission\n",
+              rp,
+             rp->transmission_counter);
+  GNUNET_assert (NULL == pp->env);
+  pp->env = GSF_pending_request_get_message_ (get_latest (rp));
+  GNUNET_MQ_notify_sent (pp->env,
+                        &schedule_peer_transmission,
+                        pp);
+  GSF_peer_transmit_ (pp->cp,
+                     GNUNET_YES,
+                     rp->priority,
+                     pp->env);
+  GNUNET_STATISTICS_update (GSF_stats,
+                            gettext_noop ("# query messages sent to other peers"),
+                            1,
+                            GNUNET_NO);
+  plan (pp,
+       rp);
 }
 
 
@@ -646,6 +589,8 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
                                                       id,
                                                       pp,
                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
+                                        pp);
   }
   mpc.merged = GNUNET_NO;
   mpc.pr = pr;
@@ -710,11 +655,6 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multipeermap_remove (plans, id,
                                                        pp));
-  if (NULL != pp->pth)
-  {
-    GSF_peer_transmit_cancel_ (pp->pth);
-    pp->pth = NULL;
-  }
   if (NULL != pp->task)
   {
     GNUNET_SCHEDULER_cancel (pp->task);