authentication of ciphertexts (+ seed)
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index 9e53d2fd56138a1fa59902f1ebf23c67312ac950..2bfdeb6743fe7270783640ee28f11dff10d6cbe4 100644 (file)
@@ -28,8 +28,6 @@
  * - consider more precise latency estimation (per-peer & request) -- again load API?
  * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
  * - introduce random latency in processing
- * - tell other peers to stop migration if our PUTs fail (or if
- *   we don't support migration per configuration?)
  * - more statistics
  */
 #include "platform.h"
@@ -189,6 +187,12 @@ struct ConnectedPeer
    */
   struct GNUNET_TIME_Absolute migration_blocked;
 
+  /**
+   * Time until when we blocked this peer from migrating
+   * data to us.
+   */
+  struct GNUNET_TIME_Absolute last_migration_block;
+
   /**
    * Handle for an active request for transmission to this
    * peer, or NULL.
@@ -752,9 +756,14 @@ static int active_migration;
 static double current_priorities;
 
 /**
- * Datastore load tracking.
+ * Datastore 'GET' load tracking.
  */
-static struct GNUNET_LOAD_Value *datastore_load;
+static struct GNUNET_LOAD_Value *datastore_get_load;
+
+/**
+ * Datastore 'PUT' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_put_load;
 
 
 /**
@@ -769,7 +778,7 @@ update_datastore_delays (struct GNUNET_TIME_Absolute start)
   struct GNUNET_TIME_Relative delay;
 
   delay = GNUNET_TIME_absolute_get_duration (start);
-  GNUNET_LOAD_update (datastore_load,
+  GNUNET_LOAD_update (datastore_get_load,
                      delay.value);
 }
 
@@ -1126,12 +1135,20 @@ destroy_pending_message (struct PendingMessage *pm,
   TransmissionContinuation cont;
   void *cont_cls;
 
-  GNUNET_assert (pml->pm == pm);
-  GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
-  cont = pm->cont;
-  cont_cls = pm->cont_cls;
-  destroy_pending_message_list_entry (pml);
-  cont (cont_cls, tpid);  
+  if (pml != NULL)
+    {
+      GNUNET_assert (pml->pm == pm);
+      GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
+      cont = pm->cont;
+      cont_cls = pm->cont_cls;
+      destroy_pending_message_list_entry (pml);
+    }
+  else
+    {
+      GNUNET_free (pm);
+    }
+  if (cont != NULL)
+    cont (cont_cls, tpid);  
 }
 
 
@@ -1636,8 +1653,10 @@ shutdown_task (void *cls,
   GNUNET_assert (0 == mig_size);
   GNUNET_DHT_disconnect (dht_handle);
   dht_handle = NULL;
-  GNUNET_LOAD_value_free (datastore_load);
-  datastore_load = NULL;
+  GNUNET_LOAD_value_free (datastore_get_load);
+  datastore_get_load = NULL;
+  GNUNET_LOAD_value_free (datastore_put_load);
+  datastore_put_load = NULL;
   GNUNET_BLOCK_context_destroy (block_ctx);
   block_ctx = NULL;
   GNUNET_CONFIGURATION_destroy (block_cfg);
@@ -1793,14 +1812,17 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
 
   GNUNET_assert (pm->next == NULL);
   GNUNET_assert (pm->pml == NULL);    
-  pml = GNUNET_malloc (sizeof (struct PendingMessageList));
-  pml->req = pr;
-  pml->target = cp;
-  pml->pm = pm;
-  pm->pml = pml;  
-  GNUNET_CONTAINER_DLL_insert (pr->pending_head,
-                              pr->pending_tail,
-                              pml);
+  if (pr != NULL)
+    {
+      pml = GNUNET_malloc (sizeof (struct PendingMessageList));
+      pml->req = pr;
+      pml->target = cp;
+      pml->pm = pm;
+      pm->pml = pml;  
+      GNUNET_CONTAINER_DLL_insert (pr->pending_head,
+                                  pr->pending_tail,
+                                  pml);
+    }
   pos = cp->pending_messages_head;
   while ( (pos != NULL) &&
          (pm->priority < pos->priority) )
@@ -2560,6 +2582,11 @@ struct ProcessReplyClosure
    * Did we finish processing the associated request?
    */ 
   int finished;
+
+  /**
+   * Did we find a matching request?
+   */
+  int request_found;
 };
 
 
@@ -2715,6 +2742,7 @@ process_reply (void *cls,
   prq->priority += pr->remaining_priority;
   pr->remaining_priority = 0;
   pr->results_found++;
+  prq->request_found = GNUNET_YES;
   if (NULL != pr->client_request_list)
     {
       GNUNET_STATISTICS_update (stats,
@@ -2800,7 +2828,19 @@ put_migration_continuation (void *cls,
                            int success,
                            const char *msg)
 {
-  /* FIXME */
+  struct GNUNET_TIME_Absolute *start = cls;
+  struct GNUNET_TIME_Relative delay;
+  
+  delay = GNUNET_TIME_absolute_get_duration (*start);
+  GNUNET_free (start);
+  GNUNET_LOAD_update (datastore_put_load,
+                     delay.value);
+  if (GNUNET_OK == success)
+    return;
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# datastore 'put' failures"),
+                           1,
+                           GNUNET_NO);
 }
 
 
@@ -2830,6 +2870,12 @@ handle_p2p_put (void *cls,
   struct GNUNET_TIME_Absolute expiration;
   GNUNET_HashCode query;
   struct ProcessReplyClosure prq;
+  struct GNUNET_TIME_Absolute *start;
+  struct GNUNET_TIME_Relative block_time;  
+  double putl;
+  struct ConnectedPeer *cp; 
+  struct PendingMessage *pm;
+  struct MigrationStopMessage *msm;
 
   msize = ntohs (message->size);
   if (msize < sizeof (struct PutMessage))
@@ -2876,6 +2922,7 @@ handle_p2p_put (void *cls,
   prq.expiration = expiration;
   prq.priority = 0;
   prq.finished = GNUNET_NO;
+  prq.request_found = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
                                              &query,
                                              &process_reply,
@@ -2893,6 +2940,8 @@ handle_p2p_put (void *cls,
                  GNUNET_h2s (&query),
                  prq.priority);
 #endif
+      start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
+      *start = GNUNET_TIME_absolute_get ();
       GNUNET_DATASTORE_put (dsh,
                            0, &query, dsize, &put[1],
                            type, prq.priority, 1 /* anonymity */, 
@@ -2900,7 +2949,36 @@ handle_p2p_put (void *cls,
                            1 + prq.priority, MAX_DATASTORE_QUEUE,
                            GNUNET_CONSTANTS_SERVICE_TIMEOUT,
                            &put_migration_continuation, 
-                           NULL);
+                           start);
+    }
+  putl = GNUNET_LOAD_get_load (datastore_put_load);
+  if ( (GNUNET_NO == prq.request_found) &&
+       ( (GNUNET_YES != active_migration) ||
+                (putl > 2.0) ) )
+    {
+      cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                             &other->hashPubKey);
+      if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
+       return GNUNET_OK; /* already blocked */
+      /* We're too busy; send MigrationStop message! */
+      if (GNUNET_YES != active_migration) 
+       putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
+      block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                                 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                                                  (unsigned int) (60000 * putl * putl)));
+      
+      cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
+      pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
+                         sizeof (struct MigrationStopMessage));
+      pm->msize = sizeof (struct MigrationStopMessage);
+      pm->priority = UINT32_MAX;
+      msm = (struct MigrationStopMessage*) &pm[1];
+      msm->header.size = htons (sizeof (struct MigrationStopMessage));
+      msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+      msm->duration = GNUNET_TIME_relative_hton (block_time);
+      add_to_pending_messages_for_peer (cp,
+                                       pm,
+                                       NULL);
     }
   return GNUNET_OK;
 }
@@ -2925,7 +3003,18 @@ handle_p2p_migration_stop (void *cls,
                           struct GNUNET_TIME_Relative latency,
                           uint32_t distance)
 {
-  // FIXME!
+  struct ConnectedPeer *cp; 
+  const struct MigrationStopMessage *msm;
+
+  msm = (const struct MigrationStopMessage*) message;
+  cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                         &other->hashPubKey);
+  if (cp == NULL)
+    {
+      GNUNET_break (0);
+      return GNUNET_OK;
+    }
+  cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
   return GNUNET_OK;
 }
 
@@ -3110,6 +3199,7 @@ process_local_reply (void *cls,
   prq.type = type;
   prq.priority = priority;  
   prq.finished = GNUNET_NO;
+  prq.request_found = GNUNET_NO;
   process_reply (&prq, key, pr);
   if ( (old_rf == 0) &&
        (pr->results_found == 1) )
@@ -3798,7 +3888,8 @@ run (void *cls,
       GNUNET_SCHEDULER_shutdown (sched);
       return;
     }
-  datastore_load = GNUNET_LOAD_value_init ();
+  datastore_get_load = GNUNET_LOAD_value_init ();
+  datastore_put_load = GNUNET_LOAD_value_init ();
   block_cfg = GNUNET_CONFIGURATION_create ();
   GNUNET_CONFIGURATION_set_value_string (block_cfg,
                                         "block",
@@ -3821,8 +3912,10 @@ run (void *cls,
       block_ctx = NULL;
       GNUNET_CONFIGURATION_destroy (block_cfg);
       block_cfg = NULL;
-      GNUNET_LOAD_value_free (datastore_load);
-      datastore_load = NULL;
+      GNUNET_LOAD_value_free (datastore_get_load);
+      datastore_get_load = NULL;
+      GNUNET_LOAD_value_free (datastore_put_load);
+      datastore_put_load = NULL;
       return;   
     }
 }