small API change: do no longer pass rarely needed GNUNET_SCHEDULER_TaskContext to...
[oweals/gnunet.git] / src / set / gnunet-service-set.c
index 75b33536daa94e546877b126dab9be7b9679f85c..e9555928a2ece493d0025b1a64c3bc3751227038 100644 (file)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      Copyright (C) 2013, 2014 Christian Grothoff (and other contributing authors)
+      Copyright (C) 2013, 2014 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -25,6 +25,7 @@
  */
 #include "gnunet-service-set.h"
 #include "gnunet-service-set_protocol.h"
+#include "gnunet_statistics_service.h"
 
 /**
  * How long do we hold on to an incoming channel if there is
@@ -137,6 +138,11 @@ static uint32_t lazy_copy_cookie = 1;
  */
 static uint32_t suggest_id = 1;
 
+/**
+ * Statistics handle.
+ */
+struct GNUNET_STATISTICS_Handle *_GSS_statistics;
+
 
 /**
  * Get set that is owned by the given client, if any.
@@ -339,9 +345,9 @@ is_element_of_generation (struct ElementEntry *ee,
 {
   struct MutationEvent *mut;
   int is_present;
+  unsigned int i;
 
-  if (NULL == ee->mutations)
-    return GNUNET_YES;
+  GNUNET_assert (NULL != ee->mutations);
 
   if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size))
   {
@@ -349,26 +355,42 @@ is_element_of_generation (struct ElementEntry *ee,
     return GNUNET_NO;
   }
 
-  is_present = GNUNET_YES;
+  is_present = GNUNET_NO;
 
-  // Could be made faster with binary search, but lists
-  // are small, so why bother.
-  for (mut = ee->mutations; 0 != mut->generation; mut++)
+  /* Could be made faster with binary search, but lists
+     are small, so why bother. */
+  for (i = 0; i < ee->mutations_size; i++)
   {
-    if ( (mut->generation > query_generation) ||
-         (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size)) )
+    mut = &ee->mutations[i];
+
+    if (mut->generation > query_generation)
+    {
+      /* The mutation doesn't apply to our generation
+         anymore.  We can'b break here, since mutations aren't
+         sorted by generation. */
+      continue;
+    }
+
+    if (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size))
     {
+      /* The generation is excluded (because it belongs to another
+         fork via a lazy copy) and thus mutations aren't considered
+         for membership testing. */
       continue;
     }
 
-    // This would be an inconsistency in how we manage mutations.
+    /* This would be an inconsistency in how we manage mutations. */
     if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) )
       GNUNET_assert (0);
 
+    /* Likewise. */
+    if ( (GNUNET_NO == is_present) && (GNUNET_NO == mut->added) )
+      GNUNET_assert (0);
+
     is_present = mut->added;
   }
 
-  return GNUNET_YES;
+  return is_present;
 }
 
 
@@ -383,6 +405,17 @@ _GSS_is_element_of_set (struct ElementEntry *ee,
 }
 
 
+static int
+is_element_of_iteration (struct ElementEntry *ee,
+                         struct Set *set)
+{
+  return is_element_of_generation (ee,
+                                   set->iter_generation,
+                                   set->excluded_generations,
+                                   set->excluded_generations_size);
+}
+
+
 int
 _GSS_is_element_of_operation (struct ElementEntry *ee,
                               struct Operation *op)
@@ -539,6 +572,7 @@ set_destroy (struct Set *set)
                                              NULL);
       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
       content->elements = NULL;
+      GNUNET_free (content);
     }
   }
   GNUNET_free_non_null (set->excluded_generations);
@@ -793,13 +827,13 @@ execute_add (struct Set *set,
 
   msg = (const struct GNUNET_SET_ElementMessage *) m;
   el.size = ntohs (m->size) - sizeof *msg;
+  el.data = &msg[1];
+  el.element_type = ntohs (msg->element_type);
+  GNUNET_SET_element_hash (&el, &hash);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Client inserts element of size %u\n",
+             "Client inserts element %s of size %u\n",
+              GNUNET_h2s (&hash),
               el.size);
-  el.data = &msg[1];
-  GNUNET_CRYPTO_hash (el.data,
-                      el.size,
-                      &hash);
 
   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
                                           &hash);
@@ -812,31 +846,31 @@ execute_add (struct Set *set,
             el.data,
             el.size);
     ee->element.data = &ee[1];
+    ee->element.element_type = el.element_type;
     ee->remote = GNUNET_NO;
     ee->mutations = NULL;
     ee->mutations_size = 0;
     ee->element_hash = hash;
-  } else if (GNUNET_YES == _GSS_is_element_of_set (ee, set)) {
+    GNUNET_break (GNUNET_YES ==
+                  GNUNET_CONTAINER_multihashmap_put (set->content->elements,
+                                                     &ee->element_hash,
+                                                     ee,
+                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  }
+  else if (GNUNET_YES == _GSS_is_element_of_set (ee, set))
+  {
     /* same element inserted twice */
-    GNUNET_break (0);
     return;
   }
 
-  if (0 != set->current_generation)
   {
     struct MutationEvent mut = {
       .generation = set->current_generation,
       .added = GNUNET_YES
     };
     GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
-    ee->mutations_size += 1;
   }
 
-  GNUNET_break (GNUNET_YES ==
-                GNUNET_CONTAINER_multihashmap_put (set->content->elements,
-                                                   &ee->element_hash,
-                                                   ee,
-                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   set->vt->add (set->state, ee);
 }
 
@@ -858,29 +892,20 @@ execute_remove (struct Set *set,
              "Client removes element of size %u\n",
               el.size);
   el.data = &msg[1];
-  GNUNET_CRYPTO_hash (el.data,
-                      el.size,
-                      &hash);
+  el.element_type = ntohs (msg->element_type);
+  GNUNET_SET_element_hash (&el, &hash);
   ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
                                           &hash);
   if (NULL == ee)
   {
-    /* Client tried to remove non-existing element */
-    GNUNET_break (0);
+    /* Client tried to remove non-existing element. */
     return;
   }
   if (GNUNET_NO == _GSS_is_element_of_set (ee, set))
   {
     /* Client tried to remove element twice */
-    GNUNET_break (0);
     return;
   }
-  else if (0 == set->current_generation)
-  {
-    // If current_generation is 0, then there are no running set operations
-    // or lazy copies, thus we can safely remove the element.
-    (void) GNUNET_CONTAINER_multihashmap_remove_all (set->content->elements, &hash);
-  }
   else
   {
     struct MutationEvent mut = {
@@ -888,7 +913,6 @@ execute_remove (struct Set *set,
       .added = GNUNET_NO
     };
     GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
-    ee->mutations_size += 1;
   }
   set->vt->remove (set->state, ee);
 }
@@ -936,6 +960,9 @@ send_client_element (struct Set *set)
   struct GNUNET_SET_IterResponseMessage *msg;
 
   GNUNET_assert (NULL != set->iter);
+
+again:
+
   ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
                                                      NULL,
                                                      (const void **) &ee);
@@ -948,7 +975,7 @@ send_client_element (struct Set *set)
     GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
     set->iter = NULL;
     set->iteration_id++;
-    
+
     GNUNET_assert (set->content->iterator_count > 0);
     set->content->iterator_count -= 1;
 
@@ -975,6 +1002,10 @@ send_client_element (struct Set *set)
   else
   {
     GNUNET_assert (NULL != ee);
+
+    if (GNUNET_NO == is_element_of_iteration (ee, set))
+      goto again;
+
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Sending iteration element on %p.\n",
                 (void *) set);
@@ -984,7 +1015,7 @@ send_client_element (struct Set *set)
     memcpy (&msg[1],
             ee->element.data,
             ee->element.size);
-    msg->element_type = ee->element.element_type;
+    msg->element_type = htons (ee->element.element_type);
     msg->iteration_id = htons (set->iteration_id);
   }
   GNUNET_MQ_send (set->client_mq, ev);
@@ -1024,13 +1055,15 @@ handle_client_iterate (void *cls,
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Iterating set %p with %u elements\n",
+              "Iterating set %p in gen %u with %u content elements\n",
               (void *) set,
+              set->current_generation,
               GNUNET_CONTAINER_multihashmap_size (set->content->elements));
   GNUNET_SERVER_receive_done (client,
                               GNUNET_OK);
   set->content->iterator_count += 1;
   set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
+  set->iter_generation = set->current_generation;
   send_client_element (set);
 }
 
@@ -1080,6 +1113,13 @@ handle_client_create_set (void *cls,
   }
   set->operation = ntohl (msg->operation);
   set->state = set->vt->create ();
+  if (NULL == set->state)
+  {
+    /* initialization failed (i.e. out of memory) */
+    GNUNET_free (set);
+    GNUNET_SERVER_client_disconnect (client);
+    return;
+  }
   set->content = GNUNET_new (struct SetContent);
   set->content->refcount = 1;
   set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
@@ -1263,8 +1303,6 @@ advance_generation (struct Set *set)
   GNUNET_array_append (set->excluded_generations,
                        set->excluded_generations_size,
                        r);
-
-  set->excluded_generations_size += 1;
 }
 
 /**
@@ -1466,7 +1504,7 @@ handle_client_copy_lazy_connect (void *cls,
     {
       found = GNUNET_YES;
       break;
-    } 
+    }
   }
 
   if (GNUNET_NO == found)
@@ -1509,6 +1547,17 @@ handle_client_copy_lazy_connect (void *cls,
   set->state = set->vt->copy_state (cr->source_set);
   set->content = cr->source_set->content;
   set->content->refcount += 1;
+
+  set->current_generation = cr->source_set->current_generation;
+  set->excluded_generations_size = cr->source_set->excluded_generations_size;
+  set->excluded_generations = GNUNET_memdup (cr->source_set->excluded_generations,
+                                             set->excluded_generations_size * sizeof (struct GenerationRange));
+
+  /* Advance the generation of the new set, so that mutations to the
+     of the cloned set and the source set are independent. */
+  advance_generation (set);
+
+
   set->client = client;
   set->client_mq = GNUNET_MQ_queue_for_server_client (client);
   GNUNET_CONTAINER_DLL_insert (sets_head,
@@ -1664,8 +1713,7 @@ handle_client_accept (void *cls,
  * @param tc context information (why was this task triggered now)
  */
 static void
-shutdown_task (void *cls,
-               const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls)
 {
   while (NULL != incoming_head)
     incoming_destroy (incoming_head);
@@ -1681,6 +1729,7 @@ shutdown_task (void *cls,
     GNUNET_CADET_disconnect (cadet);
     cadet = NULL;
   }
+  GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "handled shutdown request\n");
 }
@@ -1697,13 +1746,14 @@ shutdown_task (void *cls,
  * @param tc context information (why was this task triggered now)
  */
 static void
-incoming_timeout_cb (void *cls,
-                     const struct GNUNET_SCHEDULER_TaskContext *tc)
+incoming_timeout_cb (void *cls)
 {
   struct Operation *incoming = cls;
+  const struct GNUNET_SCHEDULER_TaskContext *tc;
 
   incoming->timeout_task = NULL;
   GNUNET_assert (GNUNET_YES == incoming->is_incoming);
+  tc = GNUNET_SCHEDULER_get_task_context ();
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1932,9 +1982,13 @@ run (void *cls,
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
-    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, 0},
@@ -1949,6 +2003,7 @@ run (void *cls,
                                    &handle_client_disconnect, NULL);
   GNUNET_SERVER_add_handlers (server,
                               server_handlers);
+  _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
   cadet = GNUNET_CADET_connect (cfg, NULL,
                                 &channel_new_cb,
                                 &channel_end_cb,