cleaning up set handlers, eliminating 2nd level demultiplexing and improving use...
authorChristian Grothoff <christian@grothoff.org>
Sat, 11 Mar 2017 17:15:38 +0000 (18:15 +0100)
committerChristian Grothoff <christian@grothoff.org>
Sat, 11 Mar 2017 17:15:38 +0000 (18:15 +0100)
src/set/Makefile.am
src/set/gnunet-service-set.c
src/set/gnunet-service-set.h
src/set/gnunet-service-set_intersection.c
src/set/gnunet-service-set_union.c
src/set/set_api.c
src/set/test_set_union_copy.c

index cfe95bc1a62274b04e2d1fae06144038bd1c2a18..03c258352c219859d88249c40a11e1498d6598a1 100644 (file)
@@ -51,7 +51,7 @@ gnunet_set_ibf_profiler_LDADD = \
 
 gnunet_service_set_SOURCES = \
  gnunet-service-set.c gnunet-service-set.h \
- gnunet-service-set_union.c \
+ gnunet-service-set_union.c gnunet-service-set_union.h \
  gnunet-service-set_intersection.c \
  ibf.c ibf.h \
  gnunet-service-set_union_strata_estimator.c gnunet-service-set_union_strata_estimator.h \
index 454ad9784340715fddc7f369c573724f4464ca57..8f1506c6abc0e2ccc1934414294d337cc150b172 100644 (file)
@@ -24,6 +24,8 @@
  * @author Christian Grothoff
  */
 #include "gnunet-service-set.h"
+#include "gnunet-service-set_union.h"
+#include "gnunet-service-set_intersection.h"
 #include "gnunet-service-set_protocol.h"
 #include "gnunet_statistics_service.h"
 
@@ -476,6 +478,7 @@ _GSS_operation_destroy (struct Operation *op,
     op->channel = NULL;
     GNUNET_CADET_channel_destroy (channel);
   }
+
   if (GNUNET_YES == gc)
     collect_generation_garbage (set);
   /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
@@ -682,7 +685,7 @@ client_disconnect_cb (void *cls,
     {
       struct Operation *curr = op;
       op = op->next;
-      if ( (GNUNET_YES == curr->is_incoming) && 
+      if ( (GNUNET_YES == curr->is_incoming) &&
            (curr->listener == listener) )
         incoming_destroy (curr);
     }
@@ -732,6 +735,38 @@ incoming_suggest (struct Operation *incoming,
 }
 
 
+/**
+ * Check a request for a set operation from another peer.
+ *
+ * @param cls the operation state
+ * @param msg the received message
+ * @return #GNUNET_OK if the channel should be kept alive,
+ *         #GNUNET_SYSERR to destroy the channel
+ */
+static int
+check_incoming_msg (void *cls,
+                    const struct OperationRequestMessage *msg)
+{
+  struct Operation *op = cls;
+  const struct GNUNET_MessageHeader *nested_context;
+
+  /* double operation request */
+  if (NULL != op->spec)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  nested_context = GNUNET_MQ_extract_nested_mh (msg);
+  if ( (NULL != nested_context) &&
+       (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
 /**
  * Handle a request for a set operation from another peer.  Checks if we
  * have a listener waiting for such a request (and in that case initiates
@@ -744,42 +779,23 @@ incoming_suggest (struct Operation *incoming,
  * our virtual table and subsequent msgs would be routed differently (as
  * we then know what type of operation this is).
  *
- * @param op the operation state
- * @param mh the received message
+ * @param cls the operation state
+ * @param msg the received message
  * @return #GNUNET_OK if the channel should be kept alive,
  *         #GNUNET_SYSERR to destroy the channel
  */
-static int
-handle_incoming_msg (struct Operation *op,
-                     const struct GNUNET_MessageHeader *mh)
+static void
+handle_incoming_msg (void *cls,
+                     const struct OperationRequestMessage *msg)
 {
-  const struct OperationRequestMessage *msg;
+  struct Operation *op = cls;
   struct Listener *listener = op->listener;
   struct OperationSpecification *spec;
   const struct GNUNET_MessageHeader *nested_context;
 
-  msg = (const struct OperationRequestMessage *) mh;
   GNUNET_assert (GNUNET_YES == op->is_incoming);
-  if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  /* double operation request */
-  if (NULL != op->spec)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
   spec = GNUNET_new (struct OperationSpecification);
   nested_context = GNUNET_MQ_extract_nested_mh (msg);
-  if ( (NULL != nested_context) &&
-       (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
-  {
-    GNUNET_break_op (0);
-    GNUNET_free (spec);
-    return GNUNET_SYSERR;
-  }
   /* Make a copy of the nested_context (application-specific context
      information that is opaque to set) so we can pass it to the
      listener later on */
@@ -792,7 +808,6 @@ handle_incoming_msg (struct Operation *op,
   spec->peer = op->peer;
   spec->remote_element_count = ntohl (msg->element_count);
   op->spec = spec;
-
   listener = op->listener;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received P2P operation request (op %u, port %s) for active listener\n",
@@ -800,7 +815,6 @@ handle_incoming_msg (struct Operation *op,
               GNUNET_h2s (&listener->app_id));
   incoming_suggest (op,
                     listener);
-  return GNUNET_OK;
 }
 
 
@@ -1103,9 +1117,11 @@ handle_client_create_set (void *cls,
   {
   case GNUNET_SET_OPERATION_INTERSECTION:
     set->vt = _GSS_intersection_vt ();
+    set->type = OT_INTERSECTION;
     break;
   case GNUNET_SET_OPERATION_UNION:
     set->vt = _GSS_union_vt ();
+    set->type = OT_UNION;
     break;
   default:
     GNUNET_free (set);
@@ -1196,7 +1212,6 @@ channel_new_cb (void *cls,
                 const struct GNUNET_PeerIdentity *source)
 {
   static const struct SetVT incoming_vt = {
-    .msg_handler = &handle_incoming_msg,
     .peer_disconnect = &handle_incoming_disconnect
   };
   struct Listener *listener = cls;
@@ -1290,60 +1305,6 @@ channel_window_cb (void *cls,
   /* FIXME: not implemented, we could do flow control here... */
 }
 
-/**
- * FIXME: hack-job. Migrate to proper handler array use!
- *
- * @param cls local state associated with the channel.
- * @param message The actual message.
- */
-static int
-check_p2p_message (void *cls,
-                   const struct GNUNET_MessageHeader *message)
-{
-  return GNUNET_OK;
-}
-
-
-/**
- * FIXME: hack-job. Migrate to proper handler array use!
- *
- * Functions with this signature are called whenever a message is
- * received via a cadet channel.
- *
- * The msg_handler is a virtual table set in initially either when a peer
- * creates a new channel with us, or once we create a new channel
- * ourselves (evaluate).
- *
- * Once we know the exact type of operation (union/intersection), the vt is
- * replaced with an operation specific instance (_GSS_[op]_vt).
- *
- * @param cls local state associated with the channel.
- * @param message The actual message.
- */
-static void
-handle_p2p_message (void *cls,
-                    const struct GNUNET_MessageHeader *message)
-{
-  struct Operation *op = cls;
-  int ret;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Dispatching cadet message (type: %u)\n",
-              ntohs (message->type));
-  /* do this before the handler, as the handler might kill the channel */
-  GNUNET_CADET_receive_done (op->channel);
-  if (NULL != op->vt)
-    ret = op->vt->msg_handler (op,
-                               message);
-  else
-    ret = GNUNET_SYSERR;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Handled cadet message (type: %u)\n",
-              ntohs (message->type));
-  if (GNUNET_OK != ret)
-    GNUNET_CADET_channel_destroy (op->channel);
-}
-
 
 /**
  * Called when a client wants to create a new listener.
@@ -1357,66 +1318,66 @@ handle_client_listen (void *cls,
 {
   struct GNUNET_SERVICE_Client *client = cls;
   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (incoming_msg,
                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
-                           struct GNUNET_MessageHeader,
+                           struct OperationRequestMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_ibf,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
-                           struct GNUNET_MessageHeader,
+                           struct IBFMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_elements,
                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
-                           struct GNUNET_MessageHeader,
+                           struct GNUNET_SET_ElementMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_offer,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
                            struct GNUNET_MessageHeader,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_inquiry,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
-                           struct GNUNET_MessageHeader,
+                           struct InquiryMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_demand,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
                            struct GNUNET_MessageHeader,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
-                           struct GNUNET_MessageHeader,
-                           NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
-                           struct GNUNET_MessageHeader,
-                           NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
-                           struct GNUNET_MessageHeader,
-                           NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_fixed_size (union_p2p_done,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
+                             struct GNUNET_MessageHeader,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                             struct GNUNET_MessageHeader,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+                             struct GNUNET_MessageHeader,
+                             NULL),
+    GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
-                           struct GNUNET_MessageHeader,
+                           struct StrataEstimatorMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
-                           struct GNUNET_MessageHeader,
+                           struct StrataEstimatorMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_full_element,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
-                           struct GNUNET_MessageHeader,
+                           struct GNUNET_SET_ElementMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
-                           struct GNUNET_MessageHeader,
-                           NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
+                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
+                             struct IntersectionElementInfoMessage,
+                             NULL),
+    GNUNET_MQ_hd_var_size (intersection_p2p_bf,
                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
-                           struct GNUNET_MessageHeader,
-                           NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
-                           struct GNUNET_MessageHeader,
+                           struct BFMessage,
                            NULL),
+    GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
+                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
+                             struct IntersectionDoneMessage,
+                             NULL),
     GNUNET_MQ_handler_end ()
   };
   struct Listener *listener;
@@ -1623,66 +1584,66 @@ handle_client_evaluate (void *cls,
   struct GNUNET_SERVICE_Client *client = cls;
   struct Operation *op = GNUNET_new (struct Operation);
   const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (incoming_msg,
                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
-                           struct GNUNET_MessageHeader,
+                           struct OperationRequestMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_ibf,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
-                           struct GNUNET_MessageHeader,
+                           struct IBFMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_elements,
                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
-                           struct GNUNET_MessageHeader,
+                           struct GNUNET_SET_ElementMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_offer,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
                            struct GNUNET_MessageHeader,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_inquiry,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
-                           struct GNUNET_MessageHeader,
+                           struct InquiryMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_demand,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
                            struct GNUNET_MessageHeader,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
-                           struct GNUNET_MessageHeader,
-                           op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_fixed_size (union_p2p_done,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
+                             struct GNUNET_MessageHeader,
+                             op),
+    GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                             struct GNUNET_MessageHeader,
+                             op),
+    GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+                             struct GNUNET_MessageHeader,
+                             op),
+    GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
-                           struct GNUNET_MessageHeader,
+                           struct StrataEstimatorMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
-                           struct GNUNET_MessageHeader,
-                           op),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
-                           struct GNUNET_MessageHeader,
-                           op),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
-                           struct GNUNET_MessageHeader,
+                           struct StrataEstimatorMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_full_element,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
-                           struct GNUNET_MessageHeader,
-                           op),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
-                           struct GNUNET_MessageHeader,
+                           struct GNUNET_SET_ElementMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
+                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
+                             struct IntersectionElementInfoMessage,
+                             op),
+    GNUNET_MQ_hd_var_size (intersection_p2p_bf,
                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
-                           struct GNUNET_MessageHeader,
-                           op),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
-                           struct GNUNET_MessageHeader,
+                           struct BFMessage,
                            op),
+    GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
+                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
+                             struct IntersectionDoneMessage,
+                             op),
     GNUNET_MQ_handler_end ()
   };
   struct Set *set;
@@ -1717,7 +1678,7 @@ handle_client_evaluate (void *cls,
   // mutations won't interfer with the running operation.
   op->generation_created = set->current_generation;
   advance_generation (set);
-
+  op->type = set->type;
   op->vt = set->vt;
   GNUNET_CONTAINER_DLL_insert (set->ops_head,
                                set->ops_tail,
@@ -1886,9 +1847,11 @@ handle_client_copy_lazy_connect (void *cls,
   {
   case GNUNET_SET_OPERATION_INTERSECTION:
     set->vt = _GSS_intersection_vt ();
+    set->type = OT_INTERSECTION;
     break;
   case GNUNET_SET_OPERATION_UNION:
     set->vt = _GSS_union_vt ();
+    set->type = OT_UNION;
     break;
   default:
     GNUNET_assert (0);
@@ -2057,6 +2020,7 @@ handle_client_accept (void *cls,
   advance_generation (set);
 
   op->vt = set->vt;
+  op->type = set->type;
   op->vt->accept (op);
   GNUNET_SERVICE_client_continue (client);
 }
index 68d8fe81f60a6a75fcffdaa7247765fca3a1ffc8..c981430ef03cbc740a0074c3779e17f3775c96d6 100644 (file)
@@ -212,20 +212,6 @@ typedef void
                    const struct GNUNET_MessageHeader *opaque_context);
 
 
-/**
- * Signature of functions that implement the message handling for
- * the different set operations.
- *
- * @param op operation state
- * @param msg received message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to
- *         destroy the operation and the tunnel
- */
-typedef int
-(*MsgHandlerImpl) (struct Operation *op,
-                   const struct GNUNET_MessageHeader *msg);
-
-
 /**
  * Signature of functions that implement operation cancellation
  *
@@ -275,11 +261,6 @@ struct SetVT
    */
   DestroySetImpl destroy_set;
 
-  /**
-   * Callback for handling operation-specific messages.
-   */
-  MsgHandlerImpl msg_handler;
-
   /**
    * Callback for handling the remote peer's disconnect.
    */
@@ -363,6 +344,27 @@ struct ElementEntry
 struct Listener;
 
 
+/**
+ * Possible set operations.
+ */
+enum OperationType {
+  /**
+   * Operation type unknown.
+   */
+  OT_UNKNOWN = 0,
+
+  /**
+   * We are performing a union.
+   */
+  OT_UNION,
+
+  /**
+   * We are performing an intersection.
+   */
+  OT_INTERSECTION
+};
+
+
 /**
  * Operation context used to execute a set operation.
  */
@@ -426,6 +428,11 @@ struct Operation
    */
   struct GNUNET_SCHEDULER_Task *timeout_task;
 
+  /**
+   * What type of operation is this?
+   */
+  enum OperationType type;
+
   /**
    * Unique request id for the request from a remote peer, sent to the
    * client, which will accept or reject the request.  Set to '0' iff
@@ -581,6 +588,11 @@ struct Set
    */
   struct Operation *ops_tail;
 
+  /**
+   * What type of operation is this set for?
+   */
+  enum OperationType type;
+
   /**
    * Current generation, that is, number of previously executed
    * operations and lazy copies on the underlying set content.
index 9fe1eabe64e98753f2db24796c01d7fe8431b1d1..b298f7b41acfb1d2dadc8c4217a7846ffa208587 100644 (file)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      Copyright (C) 2013, 2014 GNUnet e.V.
+      Copyright (C) 2013-2017 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -28,6 +28,7 @@
 #include "gnunet-service-set.h"
 #include "gnunet_block_lib.h"
 #include "gnunet-service-set_protocol.h"
+#include "gnunet-service-set_intersection.h"
 #include <gcrypt.h>
 
 
@@ -550,6 +551,8 @@ send_remaining_elements (void *cls)
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Sending done and destroy because iterator ran out\n");
     op->keep--;
+    GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
+    op->state->full_result_iter = NULL;
     send_client_done_and_destroy (op);
     return;
   }
@@ -627,9 +630,6 @@ process_bf (struct Operation *op)
   case PHASE_COUNT_SENT:
     /* This is the first BF being sent, build our initial map with
        filtering in place */
-    op->state->my_elements
-      = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
-                                              GNUNET_YES);
     op->state->my_element_count = 0;
     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
                                            &filtered_map_initialization,
@@ -664,42 +664,54 @@ process_bf (struct Operation *op)
 }
 
 
+/**
+ * Check an BF message from a remote peer.
+ *
+ * @param cls the intersection operation
+ * @param msg the header of the message
+ * @return #GNUNET_OK if @a msg is well-formed
+ */
+int
+check_intersection_p2p_bf (void *cls,
+                           const struct BFMessage *msg)
+{
+  struct Operation *op = cls;
+
+  if (OT_INTERSECTION != op->type)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
 /**
  * Handle an BF message from a remote peer.
  *
  * @param cls the intersection operation
- * @param mh the header of the message
+ * @param msg the header of the message
  */
-static void
-handle_p2p_bf (void *cls,
-               const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_bf (void *cls,
+                            const struct BFMessage *msg)
 {
   struct Operation *op = cls;
-  const struct BFMessage *msg;
   uint32_t bf_size;
   uint32_t chunk_size;
   uint32_t bf_bits_per_element;
-  uint16_t msize;
 
-  msize = htons (mh->size);
-  if (msize < sizeof (struct BFMessage))
-  {
-    GNUNET_break_op (0);
-    fail_intersection_operation (op);
-    return;
-  }
-  msg = (const struct BFMessage *) mh;
   switch (op->state->phase)
   {
   case PHASE_INITIAL:
     GNUNET_break_op (0);
     fail_intersection_operation (op);
-    break;
+    return;
   case PHASE_COUNT_SENT:
   case PHASE_BF_EXCHANGE:
     bf_size = ntohl (msg->bloomfilter_total_length);
     bf_bits_per_element = ntohl (msg->bits_per_element);
-    chunk_size = msize - sizeof (struct BFMessage);
+    chunk_size = htons (msg->header.size) - sizeof (struct BFMessage);
     op->state->other_xor = msg->element_xor_hash;
     if (bf_size == chunk_size)
     {
@@ -717,7 +729,7 @@ handle_p2p_bf (void *cls,
       op->state->salt = ntohl (msg->sender_mutator);
       op->spec->remote_element_count = ntohl (msg->sender_element_count);
       process_bf (op);
-      return;
+      break;
     }
     /* multipart chunk */
     if (NULL == op->state->bf_data)
@@ -764,8 +776,9 @@ handle_p2p_bf (void *cls,
   default:
     GNUNET_break_op (0);
     fail_intersection_operation (op);
-    break;
+    return;
   }
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -836,6 +849,7 @@ static void
 begin_bf_exchange (struct Operation *op)
 {
   op->state->phase = PHASE_BF_EXCHANGE;
+  GNUNET_assert (NULL == op->state->my_elements);
   op->state->my_elements
     = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
                                             GNUNET_YES);
@@ -853,20 +867,18 @@ begin_bf_exchange (struct Operation *op)
  * @param cls the intersection operation
  * @param mh the header of the message
  */
-static void
-handle_p2p_element_info (void *cls,
-                         const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_element_info (void *cls,
+                                      const struct IntersectionElementInfoMessage *msg)
 {
   struct Operation *op = cls;
-  const struct IntersectionElementInfoMessage *msg;
 
-  if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
+  if (OT_INTERSECTION != op->type)
   {
     GNUNET_break_op (0);
     fail_intersection_operation(op);
     return;
   }
-  msg = (const struct IntersectionElementInfoMessage *) mh;
   op->spec->remote_element_count = ntohl (msg->sender_element_count);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received remote element count (%u), I have %u\n",
@@ -884,6 +896,7 @@ handle_p2p_element_info (void *cls,
   }
   GNUNET_break (NULL == op->state->remote_bf);
   begin_bf_exchange (op);
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -955,28 +968,26 @@ filter_all (void *cls,
  * @param cls the intersection operation
  * @param mh the message
  */
-static void
-handle_p2p_done (void *cls,
-                 const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_done (void *cls,
+                              const struct IntersectionDoneMessage *idm)
 {
   struct Operation *op = cls;
-  const struct IntersectionDoneMessage *idm;
 
-  if (PHASE_BF_EXCHANGE != op->state->phase)
+  if (OT_INTERSECTION != op->type)
   {
-    /* wrong phase to conclude? FIXME: Or should we allow this
-       if the other peer has _initially_ already an empty set? */
     GNUNET_break_op (0);
-    fail_intersection_operation (op);
+    fail_intersection_operation(op);
     return;
   }
-  if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage))
+  if (PHASE_BF_EXCHANGE != op->state->phase)
   {
+    /* wrong phase to conclude? FIXME: Or should we allow this
+       if the other peer has _initially_ already an empty set? */
     GNUNET_break_op (0);
     fail_intersection_operation (op);
     return;
   }
-  idm = (const struct IntersectionDoneMessage *) mh;
   if (0 == ntohl (idm->final_element_count))
   {
     /* other peer determined empty set is the intersection,
@@ -1000,6 +1011,7 @@ handle_p2p_done (void *cls,
               op->state->my_element_count);
   op->state->phase = PHASE_FINISHED;
   finish_and_destroy (op);
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -1064,11 +1076,11 @@ intersection_accept (struct Operation *op)
   op->state->phase = PHASE_INITIAL;
   op->state->my_element_count
     = op->spec->set->state->current_set_element_count;
+  GNUNET_assert (NULL == op->state->my_elements);
   op->state->my_elements
-    = GNUNET_CONTAINER_multihashmap_create
-    (GNUNET_MIN (op->state->my_element_count,
-                 op->spec->remote_element_count),
-     GNUNET_YES);
+    = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count,
+                                                        op->spec->remote_element_count),
+                                            GNUNET_YES);
   if (op->spec->remote_element_count < op->state->my_element_count)
   {
     /* If the other peer (Alice) has fewer elements than us (Bob),
@@ -1082,43 +1094,6 @@ intersection_accept (struct Operation *op)
 }
 
 
-/**
- * Dispatch messages for a intersection operation.
- *
- * @param op the state of the intersection evaluate operation
- * @param mh the received message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- *         #GNUNET_OK otherwise
- */
-static int
-intersection_handle_p2p_message (struct Operation *op,
-                                 const struct GNUNET_MessageHeader *mh)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received p2p message (t: %u, s: %u)\n",
-              ntohs (mh->type), ntohs (mh->size));
-  switch (ntohs (mh->type))
-  {
-    /* this message handler is not active until after we received an
-     * operation request message, thus the ops request is not handled here
-     */
-  case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
-    handle_p2p_element_info (op, mh);
-    break;
-  case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
-    handle_p2p_bf (op, mh);
-    break;
-  case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
-    handle_p2p_done (op, mh);
-    break;
-  default:
-    /* something wrong with cadet's message handlers? */
-    GNUNET_assert (0);
-  }
-  return GNUNET_OK;
-}
-
-
 /**
  * Handler for peer-disconnects, notifies the client about the aborted
  * operation.  If we did not expect anything from the other peer, we
@@ -1168,6 +1143,11 @@ intersection_op_cancel (struct Operation *op)
     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
     op->state->my_elements = NULL;
   }
+  if (NULL != op->state->full_result_iter)
+  {
+    GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
+    op->state->full_result_iter = NULL;
+  }
   GNUNET_free (op->state);
   op->state = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1245,7 +1225,6 @@ _GSS_intersection_vt ()
 {
   static const struct SetVT intersection_vt = {
     .create = &intersection_set_create,
-    .msg_handler = &intersection_handle_p2p_message,
     .add = &intersection_add,
     .remove = &intersection_remove,
     .destroy_set = &intersection_set_destroy,
index b5b60207411c1ed3c6a417e8139b454d35f6d12c..200bd4b8ec5db43f4e6d67f1dce49007cf64d55c 100644 (file)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      Copyright (C) 2013-2016 GNUnet e.V.
+      Copyright (C) 2013-2017 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
 */
 /**
  * @file set/gnunet-service-set_union.c
-
  * @brief two-peer set operations
  * @author Florian Dold
+ * @author Christian Grothoff
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_statistics_service.h"
 #include "gnunet-service-set.h"
 #include "ibf.h"
+#include "gnunet-service-set_union.h"
 #include "gnunet-service-set_union_strata_estimator.h"
 #include "gnunet-service-set_protocol.h"
 #include <gcrypt.h>
@@ -813,42 +814,56 @@ send_full_set (struct Operation *op)
  * Handle a strata estimator from a remote peer
  *
  * @param cls the union operation
- * @param mh the message
- * @param is_compressed #GNUNET_YES if the estimator is compressed
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- *         #GNUNET_OK otherwise
+ * @param msg the message
  */
-static int
-handle_p2p_strata_estimator (void *cls,
-                             const struct GNUNET_MessageHeader *mh,
-                             int is_compressed)
+int
+check_union_p2p_strata_estimator (void *cls,
+                                  const struct StrataEstimatorMessage *msg)
 {
   struct Operation *op = cls;
-  struct StrataEstimator *remote_se;
-  struct StrataEstimatorMessage *msg = (void *) mh;
-  unsigned int diff;
-  uint64_t other_size;
+  int is_compressed;
   size_t len;
 
-  GNUNET_STATISTICS_update (_GSS_statistics,
-                            "# bytes of SE received",
-                            ntohs (mh->size),
-                            GNUNET_NO);
-
   if (op->state->phase != PHASE_EXPECT_SE)
   {
     GNUNET_break (0);
-    fail_union_operation (op);
     return GNUNET_SYSERR;
   }
-  len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
+  is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
+  len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
   if ( (GNUNET_NO == is_compressed) &&
        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
   {
-    fail_union_operation (op);
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle a strata estimator from a remote peer
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+void
+handle_union_p2p_strata_estimator (void *cls,
+                                   const struct StrataEstimatorMessage *msg)
+{
+  struct Operation *op = cls;
+  struct StrataEstimator *remote_se;
+  unsigned int diff;
+  uint64_t other_size;
+  size_t len;
+  int is_compressed;
+
+  is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# bytes of SE received",
+                            ntohs (msg->header.size),
+                            GNUNET_NO);
+  len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
   other_size = GNUNET_ntohll (msg->set_size);
   remote_se = strata_estimator_create (SE_STRATA_COUNT,
                                        SE_IBF_SIZE,
@@ -857,7 +872,7 @@ handle_p2p_strata_estimator (void *cls,
   {
     /* insufficient resources, fail */
     fail_union_operation (op);
-    return GNUNET_SYSERR;
+    return;
   }
   if (GNUNET_OK !=
       strata_estimator_read (&msg[1],
@@ -866,18 +881,16 @@ handle_p2p_strata_estimator (void *cls,
                              remote_se))
   {
     /* decompression failed */
-    fail_union_operation (op);
     strata_estimator_destroy (remote_se);
-    return GNUNET_SYSERR;
+    fail_union_operation (op);
+    return;
   }
   GNUNET_assert (NULL != op->state->se);
   diff = strata_estimator_difference (remote_se,
                                       op->state->se);
 
   if (diff > 200)
-    diff = diff * 3 / 2; 
-
-
+    diff = diff * 3 / 2;
 
   strata_estimator_destroy (remote_se);
   strata_estimator_destroy (op->state->se);
@@ -885,12 +898,14 @@ handle_p2p_strata_estimator (void *cls,
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "got se diff=%d, using ibf size %d\n",
        diff,
-       1<<get_order_from_difference (diff));
+       1U << get_order_from_difference (diff));
 
   {
     char *set_debug;
+
     set_debug = getenv ("GNUNET_SET_BENCHMARK");
-    if ( (NULL != set_debug) && (0 == strcmp (set_debug, "1")) )
+    if ( (NULL != set_debug) &&
+         (0 == strcmp (set_debug, "1")) )
     {
       FILE *f = fopen ("set.log", "a");
       fprintf (f, "%llu\n", (unsigned long long) diff);
@@ -898,15 +913,16 @@ handle_p2p_strata_estimator (void *cls,
     }
   }
 
-  if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
+  if ( (GNUNET_YES == op->spec->byzantine) &&
+       (other_size < op->spec->byzantine_lower_bound) )
   {
     GNUNET_break (0);
     fail_union_operation (op);
-    return GNUNET_SYSERR;
+    return;
   }
 
-
-  if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4))
+  if ( (GNUNET_YES == op->spec->force_full) ||
+       (diff > op->state->initial_size / 4))
   {
     LOG (GNUNET_ERROR_TYPE_INFO,
          "Sending full set (diff=%d, own set=%u)\n",
@@ -923,6 +939,7 @@ handle_p2p_strata_estimator (void *cls,
     else
     {
       struct GNUNET_MQ_Envelope *ev;
+
       op->state->phase = PHASE_EXPECT_IBF;
       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
       GNUNET_MQ_send (op->mq, ev);
@@ -942,11 +959,10 @@ handle_p2p_strata_estimator (void *cls,
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                   "Failed to send IBF, closing connection\n");
       fail_union_operation (op);
-      return GNUNET_SYSERR;
+      return;
     }
   }
-
-  return GNUNET_OK;
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -1164,99 +1180,116 @@ decode_and_send (struct Operation *op)
 
 
 /**
- * Handle an IBF message from a remote peer.
+ * Check an IBF message from a remote peer.
  *
  * Reassemble the IBF from multiple pieces, and
  * process the whole IBF once possible.
  *
  * @param cls the union operation
- * @param mh the header of the message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- *         #GNUNET_OK otherwise
+ * @param msg the header of the message
+ * @return #GNUNET_OK if @a msg is well-formed
  */
-static int
-handle_p2p_ibf (void *cls,
-                const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_ibf (void *cls,
+                     const struct IBFMessage *msg)
 {
   struct Operation *op = cls;
-  const struct IBFMessage *msg;
   unsigned int buckets_in_message;
 
-  if (ntohs (mh->size) < sizeof (struct IBFMessage))
+  if (OT_UNION != op->type)
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
     return GNUNET_SYSERR;
   }
-  msg = (const struct IBFMessage *) mh;
-  if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
-       (op->state->phase == PHASE_EXPECT_IBF) )
+  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
+  if (0 == buckets_in_message)
   {
-    op->state->phase = PHASE_EXPECT_IBF_CONT;
-    GNUNET_assert (NULL == op->state->remote_ibf);
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Creating new ibf of size %u\n",
-         1 << msg->order);
-    op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
-    op->state->salt_receive = ntohl (msg->salt);
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
-    if (NULL == op->state->remote_ibf)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Failed to parse remote IBF, closing connection\n");
-      fail_union_operation (op);
-      return GNUNET_SYSERR;
-    }
-    op->state->ibf_buckets_received = 0;
-    if (0 != ntohl (msg->offset))
-    {
-      GNUNET_break_op (0);
-      fail_union_operation (op);
-      return GNUNET_SYSERR;
-    }
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
   }
-  else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
+  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (op->state->phase == PHASE_EXPECT_IBF_CONT)
   {
     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
     {
       GNUNET_break_op (0);
-      fail_union_operation (op);
       return GNUNET_SYSERR;
     }
     if (1<<msg->order != op->state->remote_ibf->size)
     {
       GNUNET_break_op (0);
-      fail_union_operation (op);
       return GNUNET_SYSERR;
     }
     if (ntohl (msg->salt) != op->state->salt_receive)
     {
       GNUNET_break_op (0);
-      fail_union_operation (op);
       return GNUNET_SYSERR;
     }
   }
-  else
+  else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
+            (op->state->phase != PHASE_EXPECT_IBF) )
   {
-    GNUNET_assert (0);
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
   }
 
-  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
+  return GNUNET_OK;
+}
 
-  if (0 == buckets_in_message)
+
+/**
+ * Handle an IBF message from a remote peer.
+ *
+ * Reassemble the IBF from multiple pieces, and
+ * process the whole IBF once possible.
+ *
+ * @param cls the union operation
+ * @param msg the header of the message
+ */
+void
+handle_union_p2p_ibf (void *cls,
+                      const struct IBFMessage *msg)
+{
+  struct Operation *op = cls;
+  unsigned int buckets_in_message;
+
+  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
+  if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
+       (op->state->phase == PHASE_EXPECT_IBF) )
   {
-    GNUNET_break_op (0);
-    fail_union_operation (op);
-    return GNUNET_SYSERR;
+    op->state->phase = PHASE_EXPECT_IBF_CONT;
+    GNUNET_assert (NULL == op->state->remote_ibf);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Creating new ibf of size %u\n",
+         1 << msg->order);
+    op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
+    op->state->salt_receive = ntohl (msg->salt);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Receiving new IBF with salt %u\n",
+         op->state->salt_receive);
+    if (NULL == op->state->remote_ibf)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to parse remote IBF, closing connection\n");
+      fail_union_operation (op);
+      return;
+    }
+    op->state->ibf_buckets_received = 0;
+    if (0 != ntohl (msg->offset))
+    {
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return;
+    }
   }
-
-  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
+  else
   {
-    GNUNET_break_op (0);
-    fail_union_operation (op);
-    return GNUNET_SYSERR;
+    GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
   }
-
   GNUNET_assert (NULL != op->state->remote_ibf);
 
   ibf_read_slice (&msg[1],
@@ -1276,10 +1309,11 @@ handle_p2p_ibf (void *cls,
       /* Internal error, best we can do is shut down */
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                   "Failed to decode IBF, closing connection\n");
-      return GNUNET_SYSERR;
+      fail_union_operation (op);
+      return;
     }
   }
-  return GNUNET_OK;
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -1343,6 +1377,11 @@ send_done_and_destroy (void *cls)
 }
 
 
+/**
+ * Tests if the operation is finished, and if so notify.
+ *
+ * @param op operation to check
+ */
 static void
 maybe_finish (struct Operation *op)
 {
@@ -1382,46 +1421,59 @@ maybe_finish (struct Operation *op)
 
 
 /**
- * Handle an element message from a remote peer.
- * Sent by the other peer either because we decoded an IBF and placed a demand,
- * or because the other peer switched to full set transmission.
+ * Check an element message from a remote peer.
  *
  * @param cls the union operation
- * @param mh the message
+ * @param emsg the message
  */
-static void
-handle_p2p_elements (void *cls,
-                     const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_elements (void *cls,
+                          const struct GNUNET_SET_ElementMessage *emsg)
 {
   struct Operation *op = cls;
-  struct ElementEntry *ee;
-  const struct GNUNET_SET_ElementMessage *emsg;
-  uint16_t element_size;
 
-  if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
+  if (OT_UNION != op->type)
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
-  if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+  if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
+  return GNUNET_OK;
+}
+
 
-  emsg = (const struct GNUNET_SET_ElementMessage *) mh;
+/**
+ * Handle an element message from a remote peer.
+ * Sent by the other peer either because we decoded an IBF and placed a demand,
+ * or because the other peer switched to full set transmission.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+void
+handle_union_p2p_elements (void *cls,
+                           const struct GNUNET_SET_ElementMessage *emsg)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  struct KeyEntry *ke;
+  uint16_t element_size;
 
-  element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+  element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
-  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
+  GNUNET_memcpy (&ee[1],
+                 &emsg[1],
+                 element_size);
   ee->element.size = element_size;
   ee->element.data = &ee[1];
   ee->element.element_type = ntohs (emsg->element_type);
   ee->remote = GNUNET_YES;
-  GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
-
+  GNUNET_SET_element_hash (&ee->element,
+                           &ee->element_hash);
   if (GNUNET_NO ==
       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
                                             &ee->element_hash,
@@ -1429,7 +1481,6 @@ handle_p2p_elements (void *cls,
   {
     /* We got something we didn't demand, since it's not in our map. */
     GNUNET_break_op (0);
-    GNUNET_free (ee);
     fail_union_operation (op);
     return;
   }
@@ -1448,10 +1499,9 @@ handle_p2p_elements (void *cls,
                             1,
                             GNUNET_NO);
 
-  op->state->received_total += 1;
-
-  struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+  op->state->received_total++;
 
+  ke = op_get_element (op, &ee->element_hash);
   if (NULL != ke)
   {
     /* Got repeated element.  Should not happen since
@@ -1467,7 +1517,7 @@ handle_p2p_elements (void *cls,
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Registering new element from remote peer\n");
-    op->state->received_fresh += 1;
+    op->state->received_fresh++;
     op_register_element (op, ee, GNUNET_YES);
     /* only send results immediately if the client wants it */
     switch (op->spec->result_mode)
@@ -1485,43 +1535,57 @@ handle_p2p_elements (void *cls,
     }
   }
 
-  if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
+  if ( (op->state->received_total > 8) &&
+       (op->state->received_fresh < op->state->received_total / 3) )
   {
     /* The other peer gave us lots of old elements, there's something wrong. */
     GNUNET_break_op (0);
     fail_union_operation (op);
     return;
   }
-
+  GNUNET_CADET_receive_done (op->channel);
   maybe_finish (op);
 }
 
 
 /**
- * Handle an element message from a remote peer.
+ * Check a full element message from a remote peer.
  *
  * @param cls the union operation
- * @param mh the message
+ * @param emsg the message
  */
-static void
-handle_p2p_full_element (void *cls,
-                         const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_full_element (void *cls,
+                              const struct GNUNET_SET_ElementMessage *emsg)
 {
   struct Operation *op = cls;
-  struct ElementEntry *ee;
-  const struct GNUNET_SET_ElementMessage *emsg;
-  uint16_t element_size;
 
-  if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+  if (OT_UNION != op->type)
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
+  // FIXME: check that we expect full elements here?
+  return GNUNET_OK;
+}
 
-  emsg = (const struct GNUNET_SET_ElementMessage *) mh;
 
-  element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+/**
+ * Handle an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+void
+handle_union_p2p_full_element (void *cls,
+                               const struct GNUNET_SET_ElementMessage *emsg)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  struct KeyEntry *ke;
+  uint16_t element_size;
+
+  element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
   ee->element.size = element_size;
@@ -1544,10 +1608,9 @@ handle_p2p_full_element (void *cls,
                             1,
                             GNUNET_NO);
 
-  op->state->received_total += 1;
-
-  struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+  op->state->received_total++;
 
+  ke = op_get_element (op, &ee->element_hash);
   if (NULL != ke)
   {
     /* Got repeated element.  Should not happen since
@@ -1563,7 +1626,7 @@ handle_p2p_full_element (void *cls,
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Registering new element from remote peer\n");
-    op->state->received_fresh += 1;
+    op->state->received_fresh++;
     op_register_element (op, ee, GNUNET_YES);
     /* only send results immediately if the client wants it */
     switch (op->spec->result_mode)
@@ -1581,8 +1644,8 @@ handle_p2p_full_element (void *cls,
     }
   }
 
-  if ( (GNUNET_YES == op->spec->byzantine) && 
-       (op->state->received_total > 384 + op->state->received_fresh * 4) && 
+  if ( (GNUNET_YES == op->spec->byzantine) &&
+       (op->state->received_total > 384 + op->state->received_fresh * 4) &&
        (op->state->received_fresh < op->state->received_total / 6) )
   {
     /* The other peer gave us lots of old elements, there's something wrong. */
@@ -1594,51 +1657,73 @@ handle_p2p_full_element (void *cls,
     fail_union_operation (op);
     return;
   }
+  GNUNET_CADET_receive_done (op->channel);
 }
 
+
 /**
  * Send offers (for GNUNET_Hash-es) in response
  * to inquiries (for IBF_Key-s).
  *
  * @param cls the union operation
- * @param mh the message
+ * @param msg the message
  */
-static void
-handle_p2p_inquiry (void *cls,
-                    const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_inquiry (void *cls,
+                         const struct InquiryMessage *msg)
 {
   struct Operation *op = cls;
-  const struct IBF_Key *ibf_key;
   unsigned int num_keys;
-  struct InquiryMessage *msg;
 
-  /* look up elements and send them */
+  if (OT_UNION != op->type)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
-  num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
-      / sizeof (struct IBF_Key);
-  if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
+  num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
+    / sizeof (struct IBF_Key);
+  if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
       != num_keys * sizeof (struct IBF_Key))
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
+  return GNUNET_OK;
+}
 
-  msg = (struct InquiryMessage *) mh;
 
+/**
+ * Send offers (for GNUNET_Hash-es) in response
+ * to inquiries (for IBF_Key-s).
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+void
+handle_union_p2p_inquiry (void *cls,
+                          const struct InquiryMessage *msg)
+{
+  struct Operation *op = cls;
+  const struct IBF_Key *ibf_key;
+  unsigned int num_keys;
+
+  num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
+    / sizeof (struct IBF_Key);
   ibf_key = (const struct IBF_Key *) &msg[1];
   while (0 != num_keys--)
   {
     struct IBF_Key unsalted_key;
+
     unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
     send_offers_for_key (op, unsalted_key);
     ibf_key++;
   }
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -1677,27 +1762,34 @@ send_missing_elements_iter (void *cls,
 
 
 /**
- * Handle a 
+ * Handle a request for full set transmission.
  *
  * @parem cls closure, a set union operation
  * @param mh the demand message
  */
-static void
-handle_p2p_request_full (void *cls,
-                         const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_request_full (void *cls,
+                               const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
 
-  if (PHASE_EXPECT_IBF != op->state->phase)
+  if (OT_UNION != op->type)
   {
+    GNUNET_break_op (0);
     fail_union_operation (op);
+    return;
+  }
+  if (PHASE_EXPECT_IBF != op->state->phase)
+  {
     GNUNET_break_op (0);
+    fail_union_operation (op);
     return;
   }
 
   // FIXME: we need to check that our set is larger than the
   // byzantine_lower_bound by some threshold
   send_full_set (op);
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -1707,56 +1799,97 @@ handle_p2p_request_full (void *cls,
  * @parem cls closure, a set union operation
  * @param mh the demand message
  */
-static void
-handle_p2p_full_done (void *cls,
-                      const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_full_done (void *cls,
+                            const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
 
-  if (PHASE_EXPECT_IBF == op->state->phase)
+  switch (op->state->phase)
   {
-    struct GNUNET_MQ_Envelope *ev;
-
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
+  case PHASE_EXPECT_IBF:
+    {
+      struct GNUNET_MQ_Envelope *ev;
 
-    /* send all the elements that did not come from the remote peer */
-    GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
-                                             &send_missing_elements_iter,
-                                             op);
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "got FULL DONE, sending elements that other peer is missing\n");
 
-    ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
-    GNUNET_MQ_send (op->mq, ev);
-    op->state->phase = PHASE_DONE;
+      /* send all the elements that did not come from the remote peer */
+      GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+                                               &send_missing_elements_iter,
+                                               op);
 
-    /* we now wait until the other peer shuts the tunnel down*/
+      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+      GNUNET_MQ_send (op->mq, ev);
+      op->state->phase = PHASE_DONE;
+      /* we now wait until the other peer shuts the tunnel down*/
+    }
+    break;
+  case PHASE_FULL_SENDING:
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "got FULL DONE, finishing\n");
+      /* We sent the full set, and got the response for that.  We're done. */
+      op->state->phase = PHASE_DONE;
+      GNUNET_CADET_receive_done (op->channel);
+      send_done_and_destroy (op);
+      return;
+    }
+    break;
+  default:
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Handle full done phase is %u\n",
+                (unsigned) op->state->phase);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
   }
-  else if (PHASE_FULL_SENDING == op->state->phase)
+  GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Check a demand by the other peer for elements based on a list
+ * of `struct GNUNET_HashCode`s.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ * @return #GNUNET_OK if @a mh is well-formed
+ */
+int
+check_union_p2p_demand (void *cls,
+                        const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  unsigned int num_hashes;
+
+  if (OT_UNION != op->type)
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
-    /* We sent the full set, and got the response for that.  We're done. */
-    op->state->phase = PHASE_DONE;
-    send_done_and_destroy (op);
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
   }
-  else
+  num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+    / sizeof (struct GNUNET_HashCode);
+  if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+      != num_hashes * sizeof (struct GNUNET_HashCode))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
+  return GNUNET_OK;
 }
 
 
 /**
  * Handle a demand by the other peer for elements based on a list
- * of GNUNET_HashCode-s.
+ * of `struct GNUNET_HashCode`s.
  *
  * @parem cls closure, a set union operation
  * @param mh the demand message
  */
-static void
-handle_p2p_demand (void *cls,
-                   const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_demand (void *cls,
+                         const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
   struct ElementEntry *ee;
@@ -1767,19 +1900,12 @@ handle_p2p_demand (void *cls,
 
   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
     / sizeof (struct GNUNET_HashCode);
-  if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
-      != num_hashes * sizeof (struct GNUNET_HashCode))
-  {
-    GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
-  }
-
   for (hash = (const struct GNUNET_HashCode *) &mh[1];
        num_hashes > 0;
        hash++, num_hashes--)
   {
-    ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
+    ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
+                                            hash);
     if (NULL == ee)
     {
       /* Demand for non-existing element. */
@@ -1823,31 +1949,35 @@ handle_p2p_demand (void *cls,
         break;
     }
   }
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
 /**
- * Handle offers (of GNUNET_HashCode-s) and
- * respond with demands (of GNUNET_HashCode-s).
+ * Check offer (of `struct GNUNET_HashCode`s).
  *
  * @param cls the union operation
  * @param mh the message
+ * @return #GNUNET_OK if @a mh is well-formed
  */
-static void
-handle_p2p_offer (void *cls,
-                    const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_offer (void *cls,
+                        const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
-  const struct GNUNET_HashCode *hash;
   unsigned int num_hashes;
 
+  if (OT_UNION != op->type)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
   /* look up elements and send them */
   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
        (op->state->phase != PHASE_INVENTORY_ACTIVE))
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
     / sizeof (struct GNUNET_HashCode);
@@ -1855,10 +1985,29 @@ handle_p2p_offer (void *cls,
       != num_hashes * sizeof (struct GNUNET_HashCode))
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
+  return GNUNET_OK;
+}
 
+
+/**
+ * Handle offers (of `struct GNUNET_HashCode`s) and
+ * respond with demands (of `struct GNUNET_HashCode`s).
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+void
+handle_union_p2p_offer (void *cls,
+                        const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  const struct GNUNET_HashCode *hash;
+  unsigned int num_hashes;
+
+  num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+    / sizeof (struct GNUNET_HashCode);
   for (hash = (const struct GNUNET_HashCode *) &mh[1];
        num_hashes > 0;
        hash++, num_hashes--)
@@ -1897,6 +2046,7 @@ handle_p2p_offer (void *cls,
     *(struct GNUNET_HashCode *) &demands[1] = *hash;
     GNUNET_MQ_send (op->mq, ev);
   }
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -1906,16 +2056,22 @@ handle_p2p_offer (void *cls,
  * @param cls the union operation
  * @param mh the message
  */
-static void
-handle_p2p_done (void *cls,
-                 const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_done (void *cls,
+                       const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
 
-  if (op->state->phase == PHASE_INVENTORY_PASSIVE)
+  if (OT_UNION != op->type)
   {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+  switch (op->state->phase)
+  {
+  case PHASE_INVENTORY_PASSIVE:
     /* We got all requests, but still have to send our elements in response. */
-
     op->state->phase = PHASE_FINISH_WAITING;
 
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1929,11 +2085,10 @@ handle_p2p_done (void *cls,
      * all our demands are satisfied, so that the active
      * peer can quit if we gave him everything.
      */
+    GNUNET_CADET_receive_done (op->channel);
     maybe_finish (op);
     return;
-  }
-  if (op->state->phase == PHASE_INVENTORY_ACTIVE)
-  {
+  case PHASE_INVENTORY_ACTIVE:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "got DONE (as active partner), waiting to finish\n");
     /* All demands of the other peer are satisfied,
@@ -1944,11 +2099,14 @@ handle_p2p_done (void *cls,
      * to the other peer once our demands are met.
      */
     op->state->phase = PHASE_FINISH_CLOSING;
+    GNUNET_CADET_receive_done (op->channel);
     maybe_finish (op);
     return;
+  default:
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
   }
-  GNUNET_break_op (0);
-  fail_union_operation (op);
 }
 
 
@@ -2118,62 +2276,6 @@ union_set_destroy (struct SetState *set_state)
 }
 
 
-/**
- * Dispatch messages for a union operation.
- *
- * @param op the state of the union evaluate operation
- * @param mh the received message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- *         #GNUNET_OK otherwise
- */
-int
-union_handle_p2p_message (struct Operation *op,
-                          const struct GNUNET_MessageHeader *mh)
-{
-  //LOG (GNUNET_ERROR_TYPE_DEBUG,
-  //            "received p2p message (t: %u, s: %u)\n",
-  //            ntohs (mh->type),
-  //            ntohs (mh->size));
-  switch (ntohs (mh->type))
-  {
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
-      return handle_p2p_ibf (op, mh);
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
-      return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
-      return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
-    case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
-      handle_p2p_elements (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
-      handle_p2p_full_element (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
-      handle_p2p_inquiry (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
-      handle_p2p_done (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
-      handle_p2p_offer (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
-      handle_p2p_demand (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
-      handle_p2p_full_done (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
-      handle_p2p_request_full (op, mh);
-      break;
-    default:
-      /* Something wrong with cadet's message handlers? */
-      GNUNET_assert (0);
-  }
-  return GNUNET_OK;
-}
-
-
 /**
  * Handler for peer-disconnects, notifies the client
  * about the aborted operation in case the op was not concluded.
@@ -2240,7 +2342,6 @@ _GSS_union_vt ()
 {
   static const struct SetVT union_vt = {
     .create = &union_set_create,
-    .msg_handler = &union_handle_p2p_message,
     .add = &union_add,
     .remove = &union_remove,
     .destroy_set = &union_set_destroy,
index 04a4e49108f6a7d2661f369d989fb78314f5839c..bc428f9f6df873a3b221fd6df1d0d08fd7d78085 100644 (file)
@@ -76,6 +76,8 @@ struct GNUNET_SET_Handle
 
   /**
    * Should the set be destroyed once all operations are gone?
+   * #GNUNET_SYSERR if #GNUNET_SET_destroy() must raise this flag,
+   * #GNUNET_YES if #GNUNET_SET_destroy() did raise this flag.
    */
   int destroy_requested;
 
@@ -345,11 +347,13 @@ handle_iter_done (void *cls,
 
   if (NULL == iter)
     return;
+  set->destroy_requested = GNUNET_SYSERR;
   set->iterator = NULL;
   set->iteration_id++;
   iter (set->iterator_cls,
         NULL);
-
+  if (GNUNET_SYSERR == set->destroy_requested)
+    set->destroy_requested = GNUNET_NO;
   if (GNUNET_YES == set->destroy_requested)
     GNUNET_SET_destroy (set);
 }
@@ -736,7 +740,9 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
   /* destroying set while iterator is active is currently
      not supported; we should expand the API to allow
      clients to explicitly cancel the iteration! */
-  if ( (NULL != set->ops_head) || (NULL != set->iterator) )
+  if ( (NULL != set->ops_head) ||
+       (NULL != set->iterator) ||
+       (GNUNET_SYSERR == set->destroy_requested) )
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Set operations are pending, delaying set destruction\n");
@@ -809,7 +815,7 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
         msg->force_delta = GNUNET_YES;
         break;
       default:
-        LOG (GNUNET_ERROR_TYPE_ERROR, 
+        LOG (GNUNET_ERROR_TYPE_ERROR,
              "Option with type %d not recognized\n", (int) opt->type);
     }
   }
index c887a89588f309be408c2b47dc3adc641190a72f..a1eba63115f416651852b540aed3cc9b1c260af7 100644 (file)
@@ -122,6 +122,7 @@ check_count_iter (void *cls,
       return GNUNET_NO;
     }
     ci_cls->cont (ci_cls->cont_cls);
+    GNUNET_free (ci_cls);
     return GNUNET_NO;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,