stuff
authorChristian Grothoff <christian@grothoff.org>
Tue, 15 Feb 2011 15:30:06 +0000 (15:30 +0000)
committerChristian Grothoff <christian@grothoff.org>
Tue, 15 Feb 2011 15:30:06 +0000 (15:30 +0000)
src/fs/fs.h
src/fs/gnunet-service-fs_cp.c
src/fs/gnunet-service-fs_cp.h
src/fs/gnunet-service-fs_pr.c
src/fs/gnunet-service-fs_pr.h

index a9aa7ef74c1ff5a11ce231e3c308d086ae10562d..a5b15dcef91ef804dabe5d15711b94c2271906db 100644 (file)
@@ -2320,7 +2320,7 @@ struct GetMessage
    * The number should be in big-endian format when used
    * for mingling.
    */
-  int32_t filter_mutator GNUNET_PACKED;
+  uint32_t filter_mutator GNUNET_PACKED;
 
   /**
    * Which of the optional hash codes are present at the end of the
index d88598be7d683ab499695d094704bf2972a4a3cf..3ce03be2ed9761c46501a98178392a89da4d796c 100644 (file)
@@ -981,34 +981,49 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
  * @param cp responding peer (will be updated)
  * @param request_time time at which the original query was transmitted
  * @param request_priority priority of the original request
- * @param initiator_client local client on responsible for query (or NULL)
- * @param initiator_peer other peer responsible for query (or NULL)
  */
 void
 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
                              struct GNUNET_TIME_Absolute request_time,
-                             uint32_t request_priority,
-                             const struct GSF_LocalClient *initiator_client,
-                             const struct GSF_ConnectedPeer *initiator_peer)
+                             uint32_t request_priority)
 {
   struct GNUNET_TIME_Relative delay;
-  unsigned int i;
 
   delay = GNUNET_TIME_absolute_get_duration (request_time);  
   cp->ppd.avg_reply_delay = (cp->ppd.avg_reply_delay * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
   cp->ppd.avg_priority = (cp->avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
-  if (NULL != initiator_client)
-    {
-      cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
-    }
-  else if (NULL != initiator_peer)
-    {
-      GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
-      cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid;
-      GNUNET_PEER_change_rc (initiator_peer->pid, 1);
-    }
-  else
-    GNUNET_break (0);
+}
+
+
+/**
+ * Report on receiving a reply in response to an initiating client.
+ * Remember that this peer is good for this client.
+ *
+ * @param cp responding peer (will be updated)
+ * @param initiator_client local client on responsible for query
+ */
+void
+GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
+                                  const struct GSF_LocalClient *initiator_client)
+{
+  cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
+}
+
+
+/**
+ * Report on receiving a reply in response to an initiating peer.
+ * Remember that this peer is good for this initiating peer.
+ *
+ * @param cp responding peer (will be updated)
+ * @param initiator_peer other peer responsible for query
+ */
+void
+GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
+                                const struct GSF_ConnectedPeer *initiator_peer)
+{
+  GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
+  cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid;
+  GNUNET_PEER_change_rc (initiator_peer->pid, 1);
 }
 
 
index bc561f792ecd46ac85fbf2c2cb50ba4ecd4a4b6c..081a1d5ba07b35aead603fc6ebf4e10839d2e3a5 100644 (file)
@@ -204,15 +204,35 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth);
  * @param cp responding peer (will be updated)
  * @param request_time time at which the original query was transmitted
  * @param request_priority priority of the original request
- * @param initiator_client local client on responsible for query (or NULL)
- * @param initiator_peer other peer responsible for query (or NULL)
  */
 void
 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
                              struct GNUNET_TIME_Absolute request_time,
-                             uint32_t request_priority,
-                             const struct GSF_LocalClient *initiator_client,
-                             const struct GSF_ConnectedPeer *initiator_peer);
+                             uint32_t request_priority);
+
+
+/**
+ * Report on receiving a reply in response to an initiating client.
+ * Remember that this peer is good for this client.
+ *
+ * @param cp responding peer (will be updated)
+ * @param initiator_client local client on responsible for query
+ */
+void
+GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
+                                  const struct GSF_LocalClient *initiator_client);
+
+
+/**
+ * Report on receiving a reply in response to an initiating peer.
+ * Remember that this peer is good for this initiating peer.
+ *
+ * @param cp responding peer (will be updated)
+ * @param initiator_peer other peer responsible for query
+ */
+void
+GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
+                                const struct GSF_ConnectedPeer *initiator_peer);
 
 
 /**
index 4dc00f54c07764ab7c98af64fb82bbdec717ced6..aca63ac940d4cbc69b819f5d485e138b3758587c 100644 (file)
@@ -57,6 +57,11 @@ struct GSF_PendingRequest
    */
   struct GNUNET_CONTAINER_BloomFilter *bf;
 
+  /**
+   * Entry for this pending request in the expiration heap, or NULL.
+   */
+  struct GNUNET_CONTAINER_HeapNode *hnode;
+
   /**
    * Number of valid entries in the 'replies_seen' array.
    */
@@ -70,7 +75,7 @@ struct GSF_PendingRequest
   /**
    * Mingle value we currently use for the bf.
    */
-  int32_t mingle;
+  uint32_t mingle;
                            
 };
 
@@ -158,8 +163,8 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr)
     return GNUNET_NO; /* size not changed */
   if (pr->bf != NULL)
     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
-  pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
-                                                  UINT32_MAX);
+  pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
+                                        UINT32_MAX);
   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
                                              nsize,
                                              BLOOMFILTER_K);
@@ -202,7 +207,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
                             const struct GNUNET_PeerIdentity *target,
                             const char *bf_data,
                             size_t bf_size,
-                            int32_t mingle,
+                            uint32_t mingle,
                             uint32_t anonymity_level,
                             uint32_t priority,
                             int32_t ttl,
@@ -212,7 +217,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
                             void *rh_cls)
 {
   struct GSF_PendingRequest *pr;
-
+  struct GSF_PendingRequest *dpr;
   
   pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
   pr->public_data.query = *query;
@@ -228,6 +233,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
     }
   pr->public_data.anonymity_level = anonymity_data;
   pr->public_data.priority = priority;
+  pr->public_data.original_priority = priority;
   pr->public_data.options = options;
   pr->public_data.type = type;  
   pr->public_data.start_time = GNUNET_TIME_absolute_get ();
@@ -265,25 +271,26 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
                                     query,
                                     pr,
                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  // FIXME: if not a local query, we also need to track the
-  // total number of external queries we currently have and
-  // bound it => need an additional heap!
-
-  pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
-                                           pr,
-                                           pr->start_time.abs_value + pr->ttl);
-
-
-
-  /* make sure we don't track too many requests */
-  if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
+  if (0 != (options & GSF_PRO_REQUEST_EXPIRES))
     {
-      pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
-      GNUNET_assert (pr != NULL);
-      destroy_pending_request (pr);
+      pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
+                                               pr,
+                                               pr->ttl.abs_value);
+      /* make sure we don't track too many requests */
+      while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
+       {
+         dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
+         GNUNET_assert (dpr != NULL);
+         if (pr == dpr)
+           break; /* let the request live briefly... */
+         dpr->rh (dpr->rh_cls,
+                  dpr,
+                  GNUNET_TIME_UNIT_FOREVER_ABS,
+                  NULL, 0,
+                  GNUNET_SYSERR);
+         GSF_pending_request_cancel_ (dpr);
+       }
     }
-
-
   return pr;
 }
 
@@ -348,8 +355,8 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
        {
          /* we're not the initiator, but the initiator did not give us
             any bloom-filter, so we need to create one on-the-fly */
-         pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
-                                                          UINT32_MAX);
+         pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
+                                                UINT32_MAX);
          pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count),
                                                      pr->mingle,
                                                      BLOOMFILTER_K);
@@ -464,6 +471,9 @@ clean_request (void *cls,
   GNUNET_free_non_null (pr->replies_seen);
   if (NULL != pr->bf)
     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+  if (NULL != pr->hnode)
+    GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
+                                      pr->hnode);
   GNUNET_free (pr);
   return GNUNET_YES;
 }
@@ -517,7 +527,7 @@ struct ProcessReplyClosure
   /**
    * Who gave us this reply? NULL for local host (or DHT)
    */
-  struct ConnectedPeer *sender;
+  struct GSF_ConnectedPeer *sender;
 
   /**
    * When the reply expires.
@@ -577,41 +587,9 @@ update_request_performance_data (struct ProcessReplyClosure *prq,
 
   if (prq->sender == NULL)
     return;      
-  /* FIXME: adapt code to new API... */
-  for (i=0;i<pr->used_targets_off;i++)
-    if (pr->used_targets[i].pid == prq->sender->pid)
-      break;
-  if (i < pr->used_targets_off)
-    {
-      cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
-      prq->sender->avg_delay.rel_value
-       = (prq->sender->avg_delay.rel_value * 
-          (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
-      prq->sender->avg_priority
-       = (prq->sender->avg_priority * 
-          (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
-    }
-  if (pr->cp != NULL)
-    {
-      GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
-                            [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
-                            -1);
-      GNUNET_PEER_change_rc (pr->cp->pid, 1);
-      prq->sender->last_p2p_replies
-       [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
-       = pr->cp->pid;
-    }
-  else
-    {
-      if (NULL != prq->sender->last_client_replies
-         [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
-       GNUNET_SERVER_client_drop (prq->sender->last_client_replies
-                                  [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
-      prq->sender->last_client_replies
-       [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
-       = pr->client_request_list->client_list->client;
-      GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
-    }
+  GSF_peer_update_performance_ (prq->sender,
+                               pr->start_time,
+                               prq->priority);
 }
                                
 
@@ -636,6 +614,7 @@ process_reply (void *cls,
   struct PutMessage *pm;
   struct ConnectedPeer *cp;
   size_t msize;
+  GNUNET_HashCode chash;
 
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -661,6 +640,7 @@ process_reply (void *cls,
       update_request_performance_data (prq, pr);
       break;
     case GNUNET_BLOCK_EVALUATION_OK_LAST:
+      /* short cut: stop processing early, no BF-update, etc. */
       update_request_performance_data (prq, pr);
       GNUNET_LOAD_update (rt_entry_lifetime,
                          GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
@@ -694,18 +674,11 @@ process_reply (void *cls,
                  prq->type);
       return GNUNET_NO;
     }
-  /* FIXME: adapt code to new API! */
-  if (pr->client_request_list != NULL)
-    {
-      if (pr->replies_seen_size == pr->replies_seen_off)
-       GNUNET_array_grow (pr->replies_seen,
-                          pr->replies_seen_size,
-                          pr->replies_seen_size * 2 + 4);      
-      GNUNET_CRYPTO_hash (prq->data,
-                         prq->size,
-                         &pr->replies_seen[pr->replies_seen_off++]);         
-      refresh_bloomfilter (pr);
-    }
+  /* update bloomfilter */
+  GNUNET_CRYPTO_hash (prq->data,
+                     prq->size,
+                     &chash);
+  GSF_pending_request_update_ (pr, &chash, 1);
   if (NULL == prq->sender)
     {
 #if DEBUG_FS
@@ -718,11 +691,12 @@ process_reply (void *cls,
                                1,
                                GNUNET_NO);      
     }
-  prq->priority += pr->remaining_priority;
-  pr->remaining_priority = 0;
-  pr->results_found++;
+  prq->priority += pr->public_data.original_priority;
+  pr->public_data.remaining_priority = 0;
+  pr->public_data.original_priority = 0;
+  pr->public_data.results_found++;
   prq->request_found = GNUNET_YES;
-  /* finally, pass on to other peers / local clients */
+  /* finally, pass on to other peer / local client */
   pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES);
   return GNUNET_YES;
 }
index 2cb7cb8436f306c4d3f3b7c4a10e399b47cdd66d..4357565c75705246b869846f9d49559828739814 100644 (file)
@@ -111,6 +111,11 @@ struct GSF_PendingRequestData
    */
   uint32_t priority;
 
+  /**
+   * Priority that this request (originally) had for us.
+   */
+  uint32_t original_priority;
+
   /**
    * Options for the request.
    */
@@ -121,6 +126,11 @@ struct GSF_PendingRequestData
    */
   enum GNUNET_BLOCK_Type type;
 
+  /**
+   * Number of results we have found for this request so far.
+   */
+  unsigned int results_found;
+
   /**
    * Is the 'target' value set to a valid peer identity?
    */
@@ -182,7 +192,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
                             const struct GNUNET_PeerIdentity *target,
                             const char *bf_data,
                             size_t bf_size,
-                            int32_t mingle,
+                            uint32_t mingle,
                             uint32_t anonymity_level,
                             uint32_t priority,
                             int32_t ttl,