added support for bulk operations
authorMatthias Wachs <wachs@net.in.tum.de>
Tue, 2 Jul 2013 14:54:42 +0000 (14:54 +0000)
committerMatthias Wachs <wachs@net.in.tum.de>
Tue, 2 Jul 2013 14:54:42 +0000 (14:54 +0000)
src/ats/gnunet-service-ats-solver_mlp.c
src/ats/gnunet-service-ats-solver_mlp.h
src/ats/gnunet-service-ats-solver_proportional.c
src/ats/gnunet-service-ats_addresses.c

index 27dcc21cfc759f58a76f2ec5b450527f5ada4e1c..5dbbee4e8b94186125ea1f76c9381df2b9017c31 100644 (file)
@@ -958,7 +958,7 @@ GAS_mlp_solve_problem (void *solver, struct GNUNET_CONTAINER_MultiHashMap * addr
                LOG (GNUNET_ERROR_TYPE_DEBUG, "No changes to problem\n");
                return GNUNET_OK;
        }
-
+       mlp->addresses = addresses;
        if (GNUNET_YES == mlp->mlp_prob_changed)
        {
                        LOG (GNUNET_ERROR_TYPE_DEBUG, "Problem size changed, rebuilding\n");
@@ -1061,7 +1061,7 @@ GAS_mlp_address_add (void *solver,
   GNUNET_assert (NULL != addresses);
   GNUNET_assert (NULL != address);
 
-
+       mlp->addresses = addresses;
   if (NULL == address->solver_information)
   {
                address->solver_information = GNUNET_malloc (sizeof (struct MLP_information));
@@ -1338,6 +1338,7 @@ GAS_mlp_address_update (void *solver,
        GNUNET_assert (NULL != address);
        GNUNET_assert ((NULL != prev_atsi) || (0 == prev_atsi_count));
 
+       mlp->addresses = addresses;
   if (NULL == mlpi)
   {
       LOG (GNUNET_ERROR_TYPE_ERROR, _("Updating address for peer `%s' not added before\n"), GNUNET_i2s(&address->peer));
@@ -1386,7 +1387,7 @@ GAS_mlp_address_delete (void *solver,
        GNUNET_assert (NULL != address);
 
        mlpi = address->solver_information;
-
+       mlp->addresses = addresses;
        if (NULL != mlpi)
        {
                        GNUNET_free (mlpi);
@@ -1480,7 +1481,7 @@ GAS_mlp_get_preferred_address (void *solver,
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Getting preferred address for `%s'\n",
                GNUNET_i2s (peer));
-
+       mlp->addresses = addresses;
   /* Is this peer included in the problem? */
   if (NULL == (p = GNUNET_CONTAINER_multihashmap_get (mlp->peers, &peer->hashPubKey)))
   {
@@ -1514,13 +1515,34 @@ GAS_mlp_get_preferred_address (void *solver,
 void
 GAS_mlp_bulk_start (void *solver)
 {
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Locking solver for bulk operation ...\n");
+  struct GAS_MLP_Handle *s = (struct GAS_MLP_Handle *) solver;
+
+  GNUNET_assert (NULL != solver);
 
+  s->bulk_lock ++;
 }
 
 void
 GAS_mlp_bulk_stop (void *solver)
 {
+       LOG (GNUNET_ERROR_TYPE_DEBUG, "Unlocking solver from bulk operation ...\n");
 
+  struct GAS_MLP_Handle *s = (struct GAS_MLP_Handle *) solver;
+  GNUNET_assert (NULL != solver);
+
+  if (s->bulk_lock < 1)
+  {
+       GNUNET_break (0);
+       return;
+  }
+  s->bulk_lock --;
+
+  if (0 < s->bulk_changes)
+  {
+       GAS_mlp_solve_problem (solver, s->addresses);
+       s->bulk_changes = 0;
+  }
 }
 
 
@@ -1575,10 +1597,9 @@ GAS_mlp_address_change_preference (void *solver,
                GNUNET_i2s(peer));
 
   GNUNET_STATISTICS_update (mlp->stats,"# LP address preference changes", 1, GNUNET_NO);
-
+       mlp->addresses = addresses;
   /* Update the constraints with changed preferences */
 
-
   /* Update quality constraint c7 */
 
   /* Update relativity constraint c9 */
@@ -1588,11 +1609,16 @@ GAS_mlp_address_change_preference (void *solver,
        return;
   }
   p->f = get_peer_pref_value (mlp, peer);
-  mlp_create_problem_set_value (&mlp->p, p->r_c9, mlp->p.c_r, -p->f, __LINE__);
-
+  /* FXIME: cannot use set_value mlp_create_problem_set_value (&mlp->p, p->r_c9, mlp->p.c_r, -p->f, __LINE__);*/
 
        /* Problem size changed: new address for peer with pending request */
        mlp->mlp_prob_updated = GNUNET_YES;
+       if (GNUNET_YES == mlp->bulk_lock)
+       {
+               mlp->bulk_changes++;
+               return;
+       }
+
        if (GNUNET_YES == mlp->mlp_auto_solve)
                GAS_mlp_solve_problem (solver, addresses);
   return;
index 3d762819d794cd489db904bbc577f6fd609634c4..efdd3d42000e6119544308855c83ae3acce852c1 100644 (file)
@@ -110,6 +110,7 @@ struct MLP_Problem
 #else
   void *prob;
 #endif
+
   /* Number of addresses in problem */
   unsigned int num_addresses;
   /* Number of peers in problem */
@@ -150,6 +151,7 @@ struct MLP_Problem
   int *ja;
   /* Column index value */
   double *ar;
+
 };
 
 struct MLP_Variables
@@ -218,6 +220,11 @@ struct GAS_MLP_Handle
    */
   struct GNUNET_STATISTICS_Handle *stats;
 
+  /**
+   * Address hashmap for lookups
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *addresses;
+
   /**
    * Addresses' bandwidth changed callback
    */
@@ -238,6 +245,17 @@ struct GAS_MLP_Handle
 
   struct MLP_Solution ps;
 
+  /**
+   * Bulk lock
+   */
+
+  int bulk_lock;
+
+  /**
+   * Number of changes while solver was locked
+   */
+  int bulk_changes;
+
   /**
    * GLPK LP control parameter
    */
index bafb552881a15acbad3eaa553d84cf75aec39d0d..b9b0d9a4ef603d2afe9d2443ce52669d26bb05f7 100644 (file)
@@ -221,51 +221,59 @@ struct GAS_PROPORTIONAL_Handle
   /**
    * Statistics handle
    */
-
   struct GNUNET_STATISTICS_Handle *stats;
 
   /**
-   * Total number of addresses for solver
+   * Bandwidth changed callback
    */
-  unsigned int total_addresses;
+  GAS_bandwidth_changed_cb bw_changed;
 
   /**
-   * Number of active addresses for solver
+   * Bandwidth changed callback cls
    */
-  unsigned int active_addresses;
+  void *bw_changed_cls;
 
   /**
-   * Networks array
+   * ATS function to get preferences
    */
-  struct Network *network_entries;
+  GAS_get_preferences get_preferences;
 
   /**
-   * Number of networks
+   * Closure for ATS function to get preferences
    */
-  unsigned int networks;
+  void *get_preferences_cls;
 
   /**
-   * Callback
+   * Bulk lock
    */
-  GAS_bandwidth_changed_cb bw_changed;
+  int bulk_lock;
 
   /**
-   * Callback cls
+   * Number of changes while solver was locked
    */
-  void *bw_changed_cls;
+  int bulk_changes;
+
 
   /**
-   * ATS function to get preferences
+   * Total number of addresses for solver
    */
-  GAS_get_preferences get_preferences;
+  unsigned int total_addresses;
 
   /**
-   * Closure for ATS function to get preferences
+   * Number of active addresses for solver
    */
-  void *get_preferences_cls;
+  unsigned int active_addresses;
+
+  /**
+   * Networks array
+   */
+  struct Network *network_entries;
+
+  /**
+   * Number of networks
+   */
+  unsigned int networks;
 
-  struct PreferenceClient *pc_head;
-  struct PreferenceClient *pc_tail;
 };
 
 
@@ -856,7 +864,11 @@ GAS_proportional_address_change_preference (void *solver,
   struct GAS_PROPORTIONAL_Handle *s = solver;
   GNUNET_assert (NULL != solver);
   GNUNET_assert (NULL != peer);
-  distribute_bandwidth_in_all_networks (s);
+
+  if (GNUNET_NO == s->bulk_lock)
+       distribute_bandwidth_in_all_networks (s);
+  else
+       s->bulk_changes ++;
 }
 
 /**
@@ -922,7 +934,10 @@ GAS_proportional_get_preferred_address (void *solver,
       s->bw_changed (s->bw_changed_cls, prev); /* notify about bw change, REQUIRED? */
       if (GNUNET_SYSERR == addresse_decrement (s, net_prev, GNUNET_NO, GNUNET_YES))
         GNUNET_break (0);
-      distribute_bandwidth_in_network (s, net_prev, NULL);
+      if (GNUNET_NO == s->bulk_lock)
+       distribute_bandwidth_in_network (s, net_prev, NULL);
+      else
+       s->bulk_changes ++;
   }
 
   if (GNUNET_NO == (is_bandwidth_available_in_network (cur->solver_information)))
@@ -933,7 +948,10 @@ GAS_proportional_get_preferred_address (void *solver,
 
   cur->active = GNUNET_YES;
   addresse_increment(s, net_cur, GNUNET_NO, GNUNET_YES);
-  distribute_bandwidth_in_network (s, net_cur, cur);
+  if (GNUNET_NO == s->bulk_lock)
+       distribute_bandwidth_in_network (s, net_cur, cur);
+  else
+       s->bulk_changes ++;
 
   return cur;
 }
@@ -1019,7 +1037,10 @@ GAS_proportional_address_delete (void *solver,
       address->active = GNUNET_NO;
       if (GNUNET_SYSERR == addresse_decrement (s, net, GNUNET_NO, GNUNET_YES))
         GNUNET_break (0);
-      distribute_bandwidth_in_network (s, net, NULL);
+      if (GNUNET_NO == s->bulk_lock)
+       distribute_bandwidth_in_network (s, net, NULL);
+      else
+       s->bulk_changes ++;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG, "After deleting address now total %u and active %u addresses in network `%s'\n",
       net->total_addresses,
@@ -1037,7 +1058,11 @@ GAS_proportional_address_delete (void *solver,
 void
 GAS_proportional_bulk_start (void *solver)
 {
+  LOG (GNUNET_ERROR_TYPE_ERROR, "Locking solver for bulk operation ...\n");
+  struct GAS_PROPORTIONAL_Handle *s = (struct GAS_PROPORTIONAL_Handle *) solver;
 
+  GNUNET_assert (NULL != solver);
+  s->bulk_lock ++;
 }
 
 /**
@@ -1046,7 +1071,23 @@ GAS_proportional_bulk_start (void *solver)
 void
 GAS_proportional_bulk_stop (void *solver)
 {
+       LOG (GNUNET_ERROR_TYPE_ERROR, "Unlocking solver from bulk operation ...\n");
+
+  struct GAS_PROPORTIONAL_Handle *s = (struct GAS_PROPORTIONAL_Handle *) solver;
+  GNUNET_assert (NULL != solver);
 
+  if (s->bulk_lock < 1)
+  {
+       GNUNET_break (0);
+       return;
+  }
+  s->bulk_lock --;
+  if ((0 == s->bulk_lock) && (s->bulk_changes))
+  {
+       LOG (GNUNET_ERROR_TYPE_ERROR, "No lock pending, recalculating\n");
+       distribute_bandwidth_in_all_networks (s);
+       s->bulk_changes = 0;
+  }
 }
 
 
@@ -1153,7 +1194,10 @@ GAS_proportional_address_update (void *solver,
               /* Suggest updated address */
               address->active = GNUNET_YES;
               addresse_increment (s, new_net, GNUNET_NO, GNUNET_YES);
-              distribute_bandwidth_in_network (solver, new_net, NULL);
+              if (GNUNET_NO == s->bulk_lock)
+               distribute_bandwidth_in_network (solver, new_net, NULL);
+              else
+               s->bulk_changes ++;
           }
           else
           {
@@ -1294,6 +1338,7 @@ GAS_proportional_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
   s->network_entries = GNUNET_malloc (dest_length * sizeof (struct Network));
   s->active_addresses = 0;
   s->total_addresses = 0;
+  s->bulk_lock = GNUNET_NO;
 
   for (c = 0; c < dest_length; c++)
   {
index c446d19c7e66a24f96db71070749ae144a86b2b7..4574383ab6f523e082169269ffcfb32071bac644 100644 (file)
@@ -819,7 +819,9 @@ GAS_addresses_add (struct GAS_Addresses_Handle *handle,
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Added new address for peer `%s' session id %u, %p\n",
                 GNUNET_i2s (peer), session_id, aa);
     /* Tell solver about new address */
+    handle->s_bulk_start (handle->solver);
     GAS_normalization_normalize_property (handle->addresses, aa, atsi, atsi_count);
+    handle->s_bulk_stop (handle->solver);
     handle->s_add (handle->solver, handle->addresses, aa, addr_net);
     /* Notify performance clients about new address */
     GAS_performance_notify_all_clients (&aa->peer,
@@ -863,7 +865,9 @@ GAS_addresses_add (struct GAS_Addresses_Handle *handle,
   }
 
   /* Notify solver about update with atsi information and session */
+  handle->s_bulk_start (handle->solver);
   GAS_normalization_normalize_property (handle->addresses, ea, atsi, atsi_count);
+  handle->s_bulk_stop (handle->solver);
   handle->s_update (handle->solver, handle->addresses, ea, session_id, ea->used, atsi_delta, atsi_delta_count);
   GNUNET_free_non_null (atsi_delta);
 
@@ -1415,8 +1419,9 @@ GAS_addresses_change_preference (struct GAS_Addresses_Handle *handle,
                   GNUNET_i2s (peer), client);
       return;
   }
-  /* Tell normalization about change, normalization will call callback if preference changed */
+
   handle->s_bulk_start (handle->solver);
+  /* Tell normalization about change, normalization will call callback if preference changed */
   GAS_normalization_normalize_preference (client, peer, kind, score_abs);
   handle->s_bulk_stop (handle->solver);
 }