use long long
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
index 8790d8fbbbd81b25a1c07e30528c5bc523b1d895..1a4e71314a8f21a8593a895ac01487443a06787a 100644 (file)
  */
 
 #include "platform.h"
-#include "gnunet_block_lib.h"
-#include "gnunet_util_lib.h"
+#include "gnunet_constants.h"
 #include "gnunet_protocols.h"
-#include "gnunet_nse_service.h"
-#include "gnunet_core_service.h"
-#include "gnunet_datacache_lib.h"
-#include "gnunet_transport_service.h"
-#include "gnunet_hello_lib.h"
-#include "gnunet_dht_service.h"
 #include "gnunet_statistics_service.h"
-#include "dht_new.h"
-#include <fenv.h>
+#include "gnunet-service-dht.h"
 #include "gnunet-service-dht_clients.h"
 #include "gnunet-service-dht_datacache.h"
 #include "gnunet-service-dht_neighbours.h"
+#include "dht.h"
 
 
 /**
@@ -179,7 +172,7 @@ struct ClientQueryRecord
   /**
    * The type for the data for the GET request.
    */
-  enum GNUNET_BLOCK_Type msg_type;
+  enum GNUNET_BLOCK_Type type;
 
 };
 
@@ -220,7 +213,7 @@ static GNUNET_SCHEDULER_TaskIdentifier retry_task;
 static struct ClientList *
 find_active_client (struct GNUNET_SERVER_Client *client)
 {
-  struct ClientList *pos = client_list;
+  struct ClientList *pos = client_head;
   struct ClientList *ret;
 
   while (pos != NULL)
@@ -258,10 +251,11 @@ remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
   GNUNET_assert (GNUNET_YES ==
                 GNUNET_CONTAINER_multihashmap_remove (forward_map,
                                                       key, record));
-  GNUNET_CONTAINER_heap_remove_node (record->hnode);
-  GNUNET_ARRAY_append (record->seen_replies,
-                      record->seen_replies_count,
-                      0);
+  if (NULL != record->hnode)
+    GNUNET_CONTAINER_heap_remove_node (record->hnode);
+  GNUNET_array_grow (record->seen_replies,
+                    record->seen_replies_count,
+                    0);
   GNUNET_free (record);
   return GNUNET_YES;
 }
@@ -279,36 +273,24 @@ static void
 handle_client_disconnect (void *cls, 
                          struct GNUNET_SERVER_Client *client)
 {
-  struct ClientList *pos = client_list;
-  struct ClientList *found;
+  struct ClientList *pos;
   struct PendingMessage *reply;
 
-  found = NULL;
-  while (pos != NULL)
-  {
-    if (pos->client_handle == client)
-    {
-      GNUNET_CONTAINER_DLL_remove (client_head,
-                                  client_tail,
-                                  pos);
-      found = pos;
-      break;
-    }
-    pos = pos->next;
-  }
-  if (found == NULL)
-    return;
-  if (found->transmit_handle != NULL)
-    GNUNET_CONNECTION_notify_transmit_ready_cancel (found->transmit_handle);
-  while (NULL != (reply = found->pending_head))
+  pos = find_active_client (client);
+  GNUNET_CONTAINER_DLL_remove (client_head,
+                              client_tail,
+                              pos);
+  if (pos->transmit_handle != NULL)
+    GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle);
+  while (NULL != (reply = pos->pending_head))
     {
-      GNUNET_CONTAINER_DLL_remove (found->pending_head, found->pending_tail,
+      GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail,
                                    reply);
       GNUNET_free (reply);
     }
-  GNUNET_CONTAINER_multihashmap_iterate (forward_list.hashmap,
-                                        &remove_client_records, found);
-  GNUNET_free (found);
+  GNUNET_CONTAINER_multihashmap_iterate (forward_map,
+                                        &remove_client_records, pos);
+  GNUNET_free (pos);
 }
 
 
@@ -321,26 +303,36 @@ static void
 transmit_request (struct ClientQueryRecord *cqr)
 {
   int32_t reply_bf_mutator;
+  struct GNUNET_CONTAINER_BloomFilter *reply_bf;
+  struct GNUNET_CONTAINER_BloomFilter *peer_bf;
 
+  GNUNET_STATISTICS_update (GDS_stats,
+                            gettext_noop ("# GET requests from clients injected"), 1,
+                            GNUNET_NO);
   reply_bf_mutator = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                                                         UINT32_MAX);
   reply_bf = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
                                                 cqr->seen_replies,
                                                 cqr->seen_replies_count);
-  GST_NEIGHBOURS_handle_get (cqr->msg_type,
+  peer_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
+                                              DHT_BLOOM_SIZE,
+                                              GNUNET_CONSTANTS_BLOOMFILTER_K);
+  GDS_NEIGHBOURS_handle_get (cqr->type,
                             cqr->msg_options,
                             cqr->replication,
+                            0 /* hop count */,
                             &cqr->key,
                             cqr->xquery,
                             cqr->xquery_size,
                             reply_bf,
                             reply_bf_mutator,
-                            NULL /* no peers blocked initially */);
-  GNUNET_CONTAINER_bloomfilter_destroy (reply_bf);
+                            peer_bf);
+  GNUNET_CONTAINER_bloomfilter_free (reply_bf);
+  GNUNET_CONTAINER_bloomfilter_free (peer_bf);
 
   /* exponential back-off for retries, max 1h */
   cqr->retry_frequency = 
-    GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_HOURS,
+    GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS,
                              GNUNET_TIME_relative_multiply (cqr->retry_frequency, 2));
   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
 }
@@ -368,7 +360,7 @@ transmit_next_request_task (void *cls,
     {
       cqr->hnode = NULL;
       delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
-      if (delay.value > 0)
+      if (delay.rel_value > 0)
        {
          cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
                                                     cqr->retry_time.abs_value);
@@ -378,6 +370,8 @@ transmit_next_request_task (void *cls,
          return;
        }
       transmit_request (cqr);
+      cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
+                                                cqr->retry_time.abs_value);
     }
 }
 
@@ -394,6 +388,7 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
                      const struct GNUNET_MessageHeader *message)
 {
   const struct GNUNET_DHT_ClientPutMessage *dht_msg;
+  struct GNUNET_CONTAINER_BloomFilter *peer_bf;
   uint16_t size;
   
   size = ntohs (message->size);
@@ -401,10 +396,14 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
     {
       GNUNET_break (0);
       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+      return;
     }
+  GNUNET_STATISTICS_update (GDS_stats,
+                            gettext_noop ("# PUT requests received from clients"), 1,
+                            GNUNET_NO);
   dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
   /* give to local clients */
-  GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
+  GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
                           &dht_msg->key,
                           0, NULL,
                           0, NULL,
@@ -412,20 +411,27 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
                           size - sizeof (struct GNUNET_DHT_ClientPutMessage),
                           &dht_msg[1]);
   /* store locally */
-  GST_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
+  GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
                            &dht_msg->key,
                            0, NULL,
                            ntohl (dht_msg->type),
                            size - sizeof (struct GNUNET_DHT_ClientPutMessage),
                            &dht_msg[1]);
   /* route to other peers */
-  GST_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
+  peer_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
+                                              DHT_BLOOM_SIZE,
+                                              GNUNET_CONSTANTS_BLOOMFILTER_K);
+  GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
                             ntohl (dht_msg->options),
                             ntohl (dht_msg->desired_replication_level),
                             GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
+                            0 /* hop count */,
+                            peer_bf,
                             &dht_msg->key,
+                            0, NULL,
                             &dht_msg[1],
                             size - sizeof (struct GNUNET_DHT_ClientPutMessage));
+  GNUNET_CONTAINER_bloomfilter_free (peer_bf);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -444,8 +450,6 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
                      const struct GNUNET_MessageHeader *message)
 {
   const struct GNUNET_DHT_ClientGetMessage *get;
-  const struct GNUNET_MessageHeader *enc_msg;
-
   struct ClientQueryRecord *cqr;
   size_t xquery_size;
   const char* xquery;
@@ -461,7 +465,9 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
   get = (const struct GNUNET_DHT_ClientGetMessage *) message;
   xquery = (const char*) &get[1];
-
+  GNUNET_STATISTICS_update (GDS_stats,
+                            gettext_noop ("# GET requests received from clients"), 1,
+                            GNUNET_NO);
   
   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
   cqr->key = get->key;
@@ -475,8 +481,8 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
   cqr->xquery_size = xquery_size;
   cqr->replication = ntohl (get->desired_replication_level);
   cqr->msg_options = ntohl (get->options);
-  cqr->msg_type = ntohl (get->type);  
-  GNUNET_CONTAINER_multihashmap_put (forward_map, KEY, cqr,
+  cqr->type = ntohl (get->type);  
+  GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   /* start remote requests */
   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
@@ -484,7 +490,7 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
   /* perform local lookup */
   GDS_DATACACHE_handle_get (&get->key,
-                           cqr->msg_type,
+                           cqr->type,
                            cqr->xquery,
                            xquery_size,
                            NULL, 0);
@@ -493,9 +499,9 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
 
 
 /**
- * Closure for 'remove_by_uid'.
+ * Closure for 'remove_by_unique_id'.
  */
-struct RemoveByUidContext
+struct RemoveByUniqueIdContext
 {
   /**
    * Client that issued the removal request.
@@ -505,26 +511,26 @@ struct RemoveByUidContext
   /**
    * Unique ID of the request.
    */
-  uint64_t uid;
+  uint64_t unique_id;
 };
 
 
 /**
  * Iterator over hash map entries that frees all entries 
- * that match the given client and UID.
+ * that match the given client and unique ID.
  *
- * @param cls UID and client to search for in source routes
+ * @param cls unique ID and client to search for in source routes
  * @param key current key code
  * @param value value in the hash map, a ClientQueryRecord
  * @return GNUNET_YES (we should continue to iterate)
  */
 static int
-remove_by_uid (void *cls, const GNUNET_HashCode * key, void *value)
+remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value)
 {
-  const struct RemoveByUidContext *ctx = cls;
+  const struct RemoveByUniqueIdContext *ctx = cls;
   struct ClientQueryRecord *record = value;
 
-  if (record->uid != ctx->uid)
+  if (record->unique_id != ctx->unique_id)
     return GNUNET_YES;
   return remove_client_records (ctx->client, key, record);
 }
@@ -545,12 +551,16 @@ handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
 {
   const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
     (const struct GNUNET_DHT_ClientGetStopMessage *) message;
+  struct RemoveByUniqueIdContext ctx;
   
+  GNUNET_STATISTICS_update (GDS_stats,
+                            gettext_noop ("# GET STOP requests received from clients"), 1,
+                            GNUNET_NO);
   ctx.client = find_active_client (client);
-  ctx.uid = &dht_stop_msg.unique_id);
+  ctx.unique_id = dht_stop_msg->unique_id;
   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
                                              &dht_stop_msg->key,
-                                             &remove_by_uid,
+                                             &remove_by_unique_id,
                                              &ctx);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -693,7 +703,7 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
   struct ForwardReplyContext *frc = cls;
   struct ClientQueryRecord *record = value;
   struct PendingMessage *pm;
-  struct ReplyMessage *reply;
+  struct GNUNET_DHT_ClientResultMessage *reply;
   enum GNUNET_BLOCK_EvaluationResult eval;
   int do_free;
   GNUNET_HashCode ch;
@@ -701,7 +711,12 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
   
   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
        (record->type != frc->type) )
-    return GNUNET_YES; /* type mismatch */
+    {
+      GNUNET_STATISTICS_update (GDS_stats,
+                               gettext_noop ("# Key match, type mismatches in REPLY to CLIENT"), 1,
+                               GNUNET_NO);
+      return GNUNET_YES; /* type mismatch */
+    }
   GNUNET_CRYPTO_hash (frc->data,
                      frc->data_size,
                      &ch);
@@ -709,7 +724,12 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
     if (0 == memcmp (&record->seen_replies[i],
                     &ch,
                     sizeof (GNUNET_HashCode)))
-      return GNUNET_YES; /* duplicate */             
+      {
+       GNUNET_STATISTICS_update (GDS_stats,
+                                 gettext_noop ("# Duplicate REPLIES to CLIENT request dropped"), 1,
+                                 GNUNET_NO);
+       return GNUNET_YES; /* duplicate */             
+      }
   eval =
     GNUNET_BLOCK_evaluate (GDS_block_context, 
                           record->type, key, 
@@ -724,7 +744,7 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
     do_free = GNUNET_YES;
     break;
   case GNUNET_BLOCK_EVALUATION_OK_MORE:
-    GNUNET_ARRAY_append (record->seen_replies,
+    GNUNET_array_append (record->seen_replies,
                         record->seen_replies_count,
                         ch);
     do_free = GNUNET_NO;
@@ -744,9 +764,12 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
     return GNUNET_NO;
   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Unsupported block type (%u) in request!\n",
+                _("Unsupported block type (%u) in request!\n"),
                 record->type);
     return GNUNET_NO;
+  default:
+    GNUNET_break (0);
+    return GNUNET_NO;
   }
   if (GNUNET_NO == frc->do_copy)
     {
@@ -763,7 +786,10 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
              sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
       pm->next = pm->prev = NULL;
     }
-  reply = (struct ReplyMessage*) &pm[1];  
+  GNUNET_STATISTICS_update (GDS_stats,
+                            gettext_noop ("# RESULTS queued for clients"), 1,
+                            GNUNET_NO);
+  reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1];  
   reply->unique_id = record->unique_id;
   add_pending_message (record->client, pm);
   if (GNUNET_YES == do_free)
@@ -788,7 +814,7 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
  * @param data application payload data
  */
 void
-GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
+GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
                         const GNUNET_HashCode *key,
                         unsigned int get_path_length,
                         const struct GNUNET_PeerIdentity *get_path,
@@ -800,14 +826,19 @@ GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
 {
   struct ForwardReplyContext frc;
   struct PendingMessage *pm;
-  struct ReplyMessage *reply;
+  struct GNUNET_DHT_ClientResultMessage *reply;
   struct GNUNET_PeerIdentity *paths;
   size_t msize;
 
   if (NULL ==
-      GNUNET_CONTAINER_multihashmap_get (foward_map, key))
+      GNUNET_CONTAINER_multihashmap_get (forward_map, key))
+  {
+    GNUNET_STATISTICS_update (GDS_stats,
+                             gettext_noop ("# REPLIES ignored for CLIENTS (no match)"), 1,
+                             GNUNET_NO);
     return; /* no matching request, fast exit! */
-  msize = sizeof(struct ReplyMessage) + data_size + 
+  }
+  msize = sizeof(struct GNUNET_DHT_ClientResultMessage) + data_size + 
     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
     {
@@ -816,7 +847,7 @@ GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
       return;
     }
   pm = (struct PendingMessage *) GNUNET_malloc (msize + sizeof (struct PendingMessage));
-  reply = (struct ReplyMessage*) &pm[1];
+  reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1];
   pm->msg = &reply->header;
   reply->header.size = htons ((uint16_t) msize);
   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
@@ -827,10 +858,10 @@ GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
   reply->expiration = GNUNET_TIME_absolute_hton (expiration);
   reply->key = *key;
   paths = (struct GNUNET_PeerIdentity*) &reply[1];
-  mempcy (paths, get_path, 
-         sizeof (struct GNUNET_PeerIdentity) * get_path_length);
-  mempcy (&paths[get_path_length], 
-         put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
+  memcpy (paths, put_path, 
+         sizeof (struct GNUNET_PeerIdentity) * put_path_length);
+  memcpy (&paths[put_path_length], 
+         get_path, sizeof (struct GNUNET_PeerIdentity) * get_path_length);
   memcpy (&paths[get_path_length + put_path_length],
          data, 
          data_size);
@@ -839,13 +870,16 @@ GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
   frc.data = data;
   frc.data_size = data_size;
   frc.type = type;
-  GNUNET_CONTAINER_multihashmap_get_multiple (foward_map, key,
+  GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, key,
                                              &forward_reply,
                                              &frc);
   if (GNUNET_NO == frc.do_copy)
     {
       /* did not match any of the requests, free! */
-      GNUNET_free (buf);
+      GNUNET_STATISTICS_update (GDS_stats,
+                               gettext_noop ("# REPLIES ignored for CLIENTS (no match)"), 1,
+                               GNUNET_NO);
+      GNUNET_free (pm);
     }
 }
 
@@ -856,7 +890,7 @@ GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
  * @param server the initialized server
  */
 void 
-GDS_CLIENT_init (struct GNUNET_SERVER_Handle *server)
+GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
 {
   static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
     {&handle_dht_local_put, NULL, 
@@ -865,7 +899,7 @@ GDS_CLIENT_init (struct GNUNET_SERVER_Handle *server)
      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
     {&handle_dht_local_get_stop, NULL,
      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, 
-     sizeof (struct GNUNET_DHT_StopMessage) },
+     sizeof (struct GNUNET_DHT_ClientGetStopMessage) },
     {NULL, NULL, 0, 0}
   };
   forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
@@ -879,7 +913,7 @@ GDS_CLIENT_init (struct GNUNET_SERVER_Handle *server)
  * Shutdown client subsystem.
  */
 void
-GDS_CLIENT_done ()
+GDS_CLIENTS_done ()
 {
   GNUNET_assert (client_head == NULL);
   GNUNET_assert (client_tail == NULL);