stuff
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
index 903549cb7d537d87ad5f07876abacfba250bb624..f9a6421996162379310b26f68eaed8ad019ce664 100644 (file)
@@ -33,7 +33,6 @@
 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
 
 
-
 /**
  * Handle to cancel a transmission request.
  */
@@ -123,16 +122,26 @@ struct GSF_ConnectedPeer
    */
   struct GSF_PeerTransmitHandle *pth_tail;
 
+  /**
+   * Migration stop message in our queue, or NULL if we have none pending.
+   */
+  struct GSF_PeerTransmitHandle *migration_pth;
+
   /**
    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
    * NULL if we have successfully reserved 32k, otherwise non-NULL.
    */
   struct GNUNET_CORE_InformationRequestContext *irc;
 
+  /**
+   * Active requests from this neighbour.
+   */
+  struct GNUNET_CONTAINER_MulitHashMap *request_map;
+
   /**
    * ID of delay task for scheduling transmission.
    */
-  GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: unused!
+  GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: used in 'push' (ugh!)
 
   /**
    * Increase in traffic preference still to be submitted
@@ -282,12 +291,12 @@ peer_transmit_ready_cb (void *cls,
   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
                               cp->pth_tail,
                               pth);
-  if (pth->is_query)
+  if (GNUNET_YES == pth->is_query)
     {
       cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
       GNUNET_assert (0 < cp->ppd.pending_queries--);    
     }
-  else
+  else if (GNUNET_NO == pth->is_query)
     {
       GNUNET_assert (0 < cp->ppd.pending_replies--);
     }
@@ -389,6 +398,7 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
     cp->disk_trust = cp->trust = ntohl (trust);
   GNUNET_free (fn);
+  cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
   GNUNET_break (GNUNET_OK ==
                GNUNET_CONTAINER_multihashmap_put (cp_map,
                                                   &peer->hashPubKey,
@@ -442,7 +452,8 @@ GSF_handle_p2p_migration_stop_ (void *cls,
  * and will also not be called anymore after a call signalling
  * expiration.
  *
- * @param cls user-specified closure
+ * @param cls 'struct GSF_ConnectedPeer' of the peer that would
+ *            have liked an answer to the request
  * @param pr handle to the original pending request
  * @param data response data, NULL on request expiration
  * @param data_len number of bytes in data
@@ -453,12 +464,22 @@ handle_p2p_reply (void *cls,
                  const void *data,
                  size_t data_len)
 {
+  struct GSF_ConnectedPeer *cp = cls;
+
 #if SUPPORT_DELAYS  
   struct GNUNET_TIME_Relative art_delay;
 #endif
 
   /* FIXME: adapt code fragments below to new API! */
-
+  if (NULL == data)
+    {
+      /* FIXME: request expired! clean up! */
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# P2P searches active"),
+                               -1,
+                               GNUNET_NO);
+      return;
+    }
 
   /* reply will go over the network, check for cover traffic */
   if ( (prq->anonymity_level >  1) &&
@@ -515,9 +536,11 @@ handle_p2p_reply (void *cls,
 }
 
 
-
 /**
- * Handle P2P "QUERY" message.
+ * Handle P2P "QUERY" message.  Creates the pending request entry
+ * and sets up all of the data structures to that we will
+ * process replies properly.  Does not initiate forwarding or
+ * local database lookups.
  *
  * @param other the other peer involved (sender or receiver, NULL
  *        for loopback messages where we are both sender and receiver)
@@ -528,11 +551,13 @@ struct GSF_PendingRequest *
 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
                       const struct GNUNET_MessageHeader *message)
 {
-  /* FIXME: adapt old code to new API! */
-  struct PendingRequest *pr;
-  struct ConnectedPeer *cp;
-  struct ConnectedPeer *cps;
-  struct CheckDuplicateRequestClosure cdc;
+  struct GSF_PendingRequest *pr;
+  struct GSF_PendingRequestData *prd;
+  struct GSF_ConnectedPeer *cp;
+  struct GSF_ConnectedPeer *cps;
+  GNUNET_HashCode *namespace;
+  struct GNUNET_PeerIdentity *target;
+  enum GSF_PendingRequestOptions options;                           
   struct GNUNET_TIME_Relative timeout;
   uint16_t msize;
   const struct GetMessage *gm;
@@ -542,8 +567,9 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
   size_t bfsize;
   uint32_t ttl_decrement;
   int32_t priority;
+  int32_t ttl;
   enum GNUNET_BLOCK_Type type;
-  int have_ns;
+
 
   msize = ntohs(message->size);
   if (msize < sizeof (struct GetMessage))
@@ -615,7 +641,6 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
                                gettext_noop ("# requests dropped due to missing reverse route"),
                                1,
                                GNUNET_NO);
-     /* FIXME: try connect? */
       return GNUNET_OK;
     }
   /* note that we can really only check load here since otherwise
@@ -639,14 +664,9 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
              GNUNET_i2s (other),
              (unsigned int) bm);
 #endif
-  have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
-  pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
-                     (have_ns ? sizeof(GNUNET_HashCode) : 0));
-  if (have_ns)
-    {
-      pr->namespace = (GNUNET_HashCode*) &pr[1];
-      memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
-    }
+  namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
+  target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
+  options = 0;
   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
        GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
@@ -654,28 +674,21 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
       /* don't have BW to send to peer, or would likely take longer than we have for it,
         so at best indirect the query */
       priority = 0;
-      pr->forward_only = GNUNET_YES;
+      options |= GSF_PRO_FORWARD_ONLY;
     }
-  pr->type = type;
-  pr->mingle = ntohl (gm->filter_mutator);
-  if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
-    pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
-  pr->anonymity_level = 1;
-  pr->priority = (uint32_t) priority;
-  pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
-  pr->query = gm->query;
+  ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
   /* decrement ttl (always) */
   ttl_decrement = 2 * TTL_DECREMENT +
     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                              TTL_DECREMENT);
-  if ( (pr->ttl < 0) &&
-       (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
+  if ( (ttl < 0) &&
+       (((int32_t)(ttl - ttl_decrement)) > 0) )
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
                  GNUNET_i2s (other),
-                 pr->ttl,
+                 ttl,
                  ttl_decrement);
 #endif
       GNUNET_STATISTICS_update (stats,
@@ -683,74 +696,66 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
                                1,
                                GNUNET_NO);
       /* integer underflow => drop (should be very rare)! */      
-      GNUNET_free (pr);
       return GNUNET_OK;
     } 
-  pr->ttl -= ttl_decrement;
-  pr->start_time = GNUNET_TIME_absolute_get ();
-
-  /* get bloom filter */
-  if (bfsize > 0)
-    {
-      pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
-                                                 bfsize,
-                                                 BLOOMFILTER_K);
-      pr->bf_size = bfsize;
-    }
-  cdc.have = NULL;
-  cdc.pr = pr;
-  GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
-                                             &gm->query,
-                                             &check_duplicate_request_peer,
-                                             &cdc);
-  if (cdc.have != NULL)
-    {
-      if (cdc.have->start_time.abs_value + cdc.have->ttl >=
-         pr->start_time.abs_value + pr->ttl)
+  ttl -= ttl_decrement;
+
+  /* test if the request already exists */
+  pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
+                                         &gm->query);
+  if (pr != NULL) 
+    {      
+      prd = GSF_pending_request_get_data_ (pr);
+      if ( (prd->type == type) &&
+          ( (type != GNUNET_BLOCK_TYPE_SBLOCK) ||
+            (0 == memcmp (prd->namespace,
+                          namespace,
+                          sizeof (GNUNET_HashCode))) ) )
        {
-         /* existing request has higher TTL, drop new one! */
-         cdc.have->priority += pr->priority;
-         destroy_pending_request (pr);
+         if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl)
+           {
+             /* existing request has higher TTL, drop new one! */
+             prd->priority += priority;
 #if DEBUG_FS
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Have existing request with higher TTL, dropping new request.\n",
-                     GNUNET_i2s (other));
+             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                         "Have existing request with higher TTL, dropping new request.\n",
+                         GNUNET_i2s (other));
 #endif
-         GNUNET_STATISTICS_update (stats,
-                                   gettext_noop ("# requests dropped due to higher-TTL request"),
-                                   1,
-                                   GNUNET_NO);
-         return GNUNET_OK;
-       }
-      else
-       {
+             GNUNET_STATISTICS_update (stats,
+                                       gettext_noop ("# requests dropped due to higher-TTL request"),
+                                       1,
+                                       GNUNET_NO);
+             return GNUNET_OK;
+           }
          /* existing request has lower TTL, drop old one! */
-         pr->priority += cdc.have->priority;
-         /* Possible optimization: if we have applicable pending
-            replies in 'cdc.have', we might want to move those over
-            (this is a really rare special-case, so it is not clear
-            that this would be worth it) */
-         destroy_pending_request (cdc.have);
-         /* keep processing 'pr'! */
+         pr->priority += prd->priority;
+         GSF_pending_request_cancel_ (pr);
+         GNUNET_assert (GNUNET_YES ==
+                        GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
+                                                              &gm->query,
+                                                              pr));
        }
     }
-
-  pr->cp = cp;
+  
+  pr = GSF_pending_request_create (options,
+                                  type,
+                                  &gm->query,
+                                  namespace,
+                                  target,
+                                  (bf_size > 0) ? (const char*)&opt[bits] : NULL,
+                                  bf_size,
+                                  ntohl (gm->filter_mutator),
+                                  1 /* anonymity */
+                                  (uint32_t) priority,
+                                  ttl,
+                                  NULL, 0, /* replies_seen */
+                                  &handle_p2p_reply,
+                                  cp);
   GNUNET_break (GNUNET_OK ==
-               GNUNET_CONTAINER_multihashmap_put (query_request_map,
+               GNUNET_CONTAINER_multihashmap_put (cp->request_map,
                                                   &gm->query,
                                                   pr,
                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-  GNUNET_break (GNUNET_OK ==
-               GNUNET_CONTAINER_multihashmap_put (peer_request_map,
-                                                  &other->hashPubKey,
-                                                  pr,
-                                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-  
-  pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
-                                           pr,
-                                           pr->start_time.abs_value + pr->ttl);
-
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# P2P searches received"),
                            1,
@@ -759,83 +764,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
                            gettext_noop ("# P2P searches active"),
                            1,
                            GNUNET_NO);
-
-  /* calculate change in traffic preference */
-  cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
-  /* process locally */
-  if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
-    type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
-  timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
-                                          (pr->priority + 1)); 
-  if (GNUNET_YES != pr->forward_only)
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Handing request for `%s' to datastore\n",
-                 GNUNET_h2s (&gm->query));
-#endif
-      pr->qe = GNUNET_DATASTORE_get (dsh,
-                                    &gm->query,
-                                    type,                             
-                                    pr->priority + 1,
-                                    MAX_DATASTORE_QUEUE,                                
-                                    timeout,
-                                    &process_local_reply,
-                                    pr);
-      if (NULL == pr->qe)
-       {
-         GNUNET_STATISTICS_update (stats,
-                                   gettext_noop ("# requests dropped by datastore (queue length limit)"),
-                                   1,
-                                   GNUNET_NO);
-       }
-    }
-  else
-    {
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# requests forwarded due to high load"),
-                               1,
-                               GNUNET_NO);
-    }
-
-  /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
-  switch (pr->type)
-    {
-    case GNUNET_BLOCK_TYPE_FS_DBLOCK:
-    case GNUNET_BLOCK_TYPE_FS_IBLOCK:
-      /* only one result, wait for datastore */
-      if (GNUNET_YES != pr->forward_only)
-       {
-         GNUNET_STATISTICS_update (stats,
-                                   gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
-                                   1,
-                                   GNUNET_NO);
-         break;
-       }
-    default:
-      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
-                                            pr);
-    }
-
-  /* make sure we don't track too many requests */
-  if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
-    {
-      pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
-      GNUNET_assert (pr != NULL);
-      destroy_pending_request (pr);
-    }
-  return GNUNET_OK;
-
-
-
-  // FIXME!
-  // parse request
-  // setup pending request (use 'handle_p2p_reply')
-  // track pending request to cancel it on peer disconnect (!)
-  // return it!
-  // (actual planning & execution up to caller!)
-  return NULL;
+  return pr;
 }
 
 
@@ -858,9 +787,9 @@ peer_transmit_timeout (void *cls,
   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
                               cp->pth_tail,
                               pth);
-  if (pth->is_query)
+  if (GNUNET_YES == pth->is_query)
     GNUNET_assert (0 < cp->ppd.pending_queries--);    
-  else
+  else if (GNUNET_NO == pth->is_query)
     GNUNET_assert (0 < cp->ppd.pending_replies--);
   GNUNET_LOAD_update (cp->ppd.transmission_delay,
                      UINT64_MAX);
@@ -876,7 +805,7 @@ peer_transmit_timeout (void *cls,
  * the callback is invoked with a 'NULL' buffer.
  *
  * @param peer target peer
- * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO)
+ * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
  * @param priority how important is this request?
  * @param timeout when does this request timeout (call gmc with error)
  * @param size number of bytes we would like to send to the peer
@@ -933,9 +862,10 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
                                       pth);
   GNUNET_PEER_resolve (cp->pid,
                       &target);
-  if (is_query)
+  if (GNUNET_YES == is_query)
     {
       /* query, need reservation */
+      cp->ppd.pending_queries++;
       if (NULL == cp->irc)
        {
          /* reservation already done! */
@@ -957,9 +887,15 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
          is_ready = GNUNET_NO;
        }
     }
-  else
+  else if (GNUNET_NO == is_query)
     {
       /* no reservation needed for content */
+      cp->ppd.pending_replies++;
+      is_ready = GNUNET_YES;
+    }
+  else
+    {
+      /* not a query or content, no reservation needed */
       is_ready = GNUNET_YES;
     }
   if (is_ready)
@@ -1011,9 +947,9 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
                               cp->pth_tail,
                               pth);
-  if (pth->is_query)
+  if (GNUNET_YES == pth->is_query)
     GNUNET_assert (0 < cp->ppd.pending_queries--);    
-  else
+  else if (GNUNET_NO == pth->is_query)
     GNUNET_assert (0 < cp->ppd.pending_replies--);
   GNUNET_free (pth);
 }
@@ -1084,6 +1020,26 @@ GSF_peer_status_handler_ (void *cls,
 }
 
 
+/**
+ * Cancel all requests associated with the peer.
+ *
+ * @param cls unused
+ * @param query hash code of the request
+ * @param value the 'struct GSF_PendingRequest'
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+cancel_pending_request (void *cls,
+                       const GNUNET_HashCode *query,
+                       void *value)
+{
+  struct GSF_PendingRequest *pr = value;
+
+  GSF_pending_request_cancel_ (pr);
+  return GNUNET_OK;
+}
+
+
 /**
  * A peer disconnected from us.  Tear down the connected peer
  * record.
@@ -1104,11 +1060,21 @@ GSF_peer_disconnect_handler_ (void *cls,
   GNUNET_CONTAINER_multihashmap_remove (cp_map,
                                        &peer->hashPubKey,
                                        cp);
+  if (NULL != cp->migration_pth)
+    {
+      GSF_peer_transmit_cancel_ (cp->migration_pth);
+      cp->migration_pth = NULL;
+    }
   if (NULL != cp->irc)
     {
       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
       cp->irc = NULL;
     }
+  GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
+                                        &cancel_pending_request,
+                                        cp);
+  GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
+  cp->request_map = NULL;
   GSF_plan_notify_peer_disconnect_ (cp);
   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
@@ -1205,6 +1171,34 @@ GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
 }
 
 
+/**
+ * Assemble a migration stop message for transmission.
+ *
+ * @param cls the 'struct GSF_ConnectedPeer' to use
+ * @param size number of bytes we're allowed to write to buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
+ */
+static size_t
+create_migration_stop_message (void *cls,
+                              size_t size,
+                              void *buf)
+{
+  struct GSF_ConnectedPeer *cp = cls;
+  struct MigrationStopMessage msm;
+
+  cp->migration_pth = NULL;
+  if (NULL == buf)
+    return 0;
+  GNUNET_assert (size > sizeof (struct MigrationStopMessage));
+  msm.header.size = htons (sizeof (struct MigrationStopMessage));
+  msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+  msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block));
+  memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
+  return sizeof (struct MigrationStopMessage);
+}
+
+
 /**
  * Ask a peer to stop migrating data to us until the given point
  * in time.
@@ -1216,30 +1210,22 @@ void
 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
                           struct GNUNET_TIME_Relative block_time)
 {
-  struct PendingMessage *pm;
-  struct MigrationStopMessage *msm;
-
   if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value)
     return; /* already blocked */
   cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
-
-  /* FIXME: adapt old code below to new API! */
-  pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
-                     sizeof (struct MigrationStopMessage));
-  pm->msize = sizeof (struct MigrationStopMessage);
-  pm->priority = UINT32_MAX;
-  msm = (struct MigrationStopMessage*) &pm[1];
-  msm->header.size = htons (sizeof (struct MigrationStopMessage));
-  msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
-  msm->duration = GNUNET_TIME_relative_hton (block_time);
-  add_to_pending_messages_for_peer (cp,
-                                   pm,
-                                   NULL);
+  if (cp->migration_pth != NULL)
+    GSF_peer_transmit_cancel_ (cp->migration_pth);
+  cp->migration_pth 
+    = GSF_peer_transmit_ (cp,
+                         GNUNET_SYSERR,
+                         UINT32_MAX,
+                         GNUNET_TIME_UNIT_FOREVER_REL,
+                         sizeof (struct MigrationStopMessage),
+                         &create_migration_stop_message,
+                         cp);
 }
 
 
-
-
 /**
  * Write host-trust information to a file - flush the buffer entry!
  *