clean up internal set API, avoid copying context message needlessly
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
index f9abab253cd2da9771f9281d8829de126abece0a..5926b25f597f6c1515d43f79676f13a7c4780405 100644 (file)
@@ -73,38 +73,43 @@ enum UnionOperationPhase
    * We sent the request message, and expect a strata estimator
    */
   PHASE_EXPECT_SE,
+
   /**
-   * We sent the strata estimator, and expect an IBF. This phase is entered once 
+   * We sent the strata estimator, and expect an IBF. This phase is entered once
    * upon initialization and later via PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
-   * 
+   *
    * After receiving the complete IBF, we enter PHASE_EXPECT_ELEMENTS
    */
   PHASE_EXPECT_IBF,
+
   /**
    * Continuation for multi part IBFs.
    */
   PHASE_EXPECT_IBF_CONT,
+
   /**
    * We are sending request and elements,
    * and thus only expect elements from the other peer.
-   * 
+   *
    * We are currently decoding an IBF until it can no longer be decoded,
    * we currently send requests and expect elements
    * The remote peer is in PHASE_EXPECT_ELEMENTS_AND_REQUESTS
    */
   PHASE_EXPECT_ELEMENTS,
+
   /**
    * We are expecting elements and requests, and send
    * requested elements back to the other peer.
-   * 
+   *
    * We are in this phase if we have SENT an IBF for the remote peer to decode.
    * We expect requests, send elements or could receive an new IBF, which takes
    * us via PHASE_EXPECT_IBF to phase PHASE_EXPECT_ELEMENTS
-   * 
+   *
    * The remote peer is thus in:
-   * PHASE_EXPECT_ELEMENTS 
+   * PHASE_EXPECT_ELEMENTS
    */
   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
+
   /**
    * The protocol is over.
    * Results may still have to be sent to the client.
@@ -114,15 +119,10 @@ enum UnionOperationPhase
 
 
 /**
- * State of an evaluate operation
- * with another peer.
+ * State of an evaluate operation with another peer.
  */
 struct OperationState
 {
-  /**
-   * Number of ibf buckets received
-   */
-  unsigned int ibf_buckets_received;
 
   /**
    * Copy of the set's strata estimator at the time of
@@ -161,6 +161,12 @@ struct OperationState
    * Did we send the client that we are done?
    */
   int client_done_sent;
+
+  /**
+   * Number of ibf buckets received
+   */
+  unsigned int ibf_buckets_received;
+
 };
 
 
@@ -257,7 +263,7 @@ destroy_key_to_element_iter (void *cls,
 
 /**
  * Destroy the union operation.  Only things specific to the union operation are destroyed.
- * 
+ *
  * @param op union operation to destroy
  */
 static void
@@ -312,7 +318,7 @@ fail_union_operation (struct Operation *op)
   msg->request_id = htonl (op->spec->client_request_id);
   msg->element_type = htons (0);
   GNUNET_MQ_send (op->spec->set->client_mq, ev);
-  _GSS_operation_destroy (op);
+  _GSS_operation_destroy (op, GNUNET_YES);
 }
 
 
@@ -338,45 +344,6 @@ get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt)
 }
 
 
-/**
- * Send a request for the evaluate operation to a remote peer
- *
- * @param op operation with the other peer
- */
-static void
-send_operation_request (struct Operation *op)
-{
-  struct GNUNET_MQ_Envelope *ev;
-  struct OperationRequestMessage *msg;
-
-  ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
-                                op->spec->context_msg);
-
-  if (NULL == ev)
-  {
-    /* the context message is too large */
-    GNUNET_break (0);
-    GNUNET_SERVER_client_disconnect (op->spec->set->client);
-    return;
-  }
-  msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
-  msg->app_id = op->spec->app_id;
-  msg->salt = htonl (op->spec->salt);
-  GNUNET_MQ_send (op->mq, ev);
-
-  if (NULL != op->spec->context_msg)
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
-  else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
-
-  if (NULL != op->spec->context_msg)
-  {
-    GNUNET_free (op->spec->context_msg);
-    op->spec->context_msg = NULL;
-  }
-}
-
-
 /**
  * Iterator to create the mapping between ibf keys
  * and element entries.
@@ -478,7 +445,8 @@ op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash
  * @param ee the element entry
  */
 static void
-op_register_element (struct Operation *op, struct ElementEntry *ee)
+op_register_element (struct Operation *op,
+                     struct ElementEntry *ee)
 {
   int ret;
   struct IBF_Key ibf_key;
@@ -760,8 +728,9 @@ send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key)
 
   send_cls.ibf_key = ibf_key;
   send_cls.op = op;
-  GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val,
-                                                &send_element_iterator, &send_cls);
+  (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
+                                                       (uint32_t) ibf_key.key_val,
+                                                       &send_element_iterator, &send_cls);
 }
 
 
@@ -823,7 +792,7 @@ decode_and_send (struct Operation *op)
       next_order++;
       if (next_order <= MAX_IBF_ORDER)
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                    "decoding failed, sending larger ibf (size %u)\n",
                     1<<next_order);
         send_ibf (op, next_order);
@@ -853,7 +822,7 @@ decode_and_send (struct Operation *op)
       struct GNUNET_MQ_Envelope *ev;
       struct GNUNET_MessageHeader *msg;
 
-      /* It may be nice to merge multiple requests, but with mesh's corking it is not worth
+      /* It may be nice to merge multiple requests, but with cadet's corking it is not worth
        * the effort additional complexity. */
       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
@@ -963,7 +932,7 @@ send_client_element (struct Operation *op,
   }
   rm->result_status = htons (GNUNET_SET_STATUS_OK);
   rm->request_id = htonl (op->spec->client_request_id);
-  rm->element_type = element->type;
+  rm->element_type = element->element_type;
   memcpy (&rm[1], element->data, element->size);
   GNUNET_MQ_send (op->spec->set->client_mq, ev);
 }
@@ -981,12 +950,16 @@ send_done_and_destroy (void *cls)
   struct Operation *op = cls;
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *rm;
+  int keep = op->keep;
+
   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
   rm->request_id = htonl (op->spec->client_request_id);
   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
   rm->element_type = htons (0);
   GNUNET_MQ_send (op->spec->set->client_mq, ev);
-  _GSS_operation_destroy (op);
+  _GSS_operation_destroy (op, GNUNET_YES);
+  if (GNUNET_YES == keep)
+    GNUNET_free (op);
 }
 
 
@@ -1030,7 +1003,7 @@ send_remaining_elements (void *cls)
     }
     rm->result_status = htons (GNUNET_SET_STATUS_OK);
     rm->request_id = htonl (op->spec->client_request_id);
-    rm->element_type = element->type;
+    rm->element_type = element->element_type;
     memcpy (&rm[1], element->data, element->size);
     if (ke->next_colliding == NULL)
     {
@@ -1059,8 +1032,10 @@ finish_and_destroy (struct Operation *op)
 
   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
   {
+    /* prevent that the op is free'd by the tunnel end handler */
+    op->keep = GNUNET_YES;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
-    GNUNET_assert (NULL == op->state->full_result_iter); 
+    GNUNET_assert (NULL == op->state->full_result_iter);
     op->state->full_result_iter =
         GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
     send_remaining_elements (op);
@@ -1083,7 +1058,8 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
   struct ElementEntry *ee;
   uint16_t element_size;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "got element from peer\n");
 
   if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
        (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
@@ -1093,16 +1069,19 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
     return;
   }
   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
-  ee = GNUNET_malloc (sizeof *ee + element_size);
+  ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
   memcpy (&ee[1], &mh[1], element_size);
   ee->element.size = element_size;
   ee->element.data = &ee[1];
   ee->remote = GNUNET_YES;
-  GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
+  GNUNET_CRYPTO_hash (ee->element.data,
+                      ee->element.size,
+                      &ee->element_hash);
 
   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got existing element from peer\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "got existing element from peer\n");
     GNUNET_free (ee);
     return;
   }
@@ -1121,7 +1100,8 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
  * @param mh the message
  */
 static void
-handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_p2p_element_requests (void *cls,
+                             const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
   struct IBF_Key *ibf_key;
@@ -1188,21 +1168,47 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
 
 
 /**
- * Evaluate a union operation with
- * a remote peer.
+ * Initiate operation to evaluate a set union with a remote peer.
  *
- * @param op operation to evaluate
+ * @param op operation to perform (to be initialized)
+ * @param opaque_context message to be transmitted to the listener
+ *        to convince him to accept, may be NULL
  */
 static void
-union_evaluate (struct Operation *op)
+union_evaluate (struct Operation *op,
+                const struct GNUNET_MessageHeader *opaque_context)
 {
+  struct GNUNET_MQ_Envelope *ev;
+  struct OperationRequestMessage *msg;
+
   op->state = GNUNET_new (struct OperationState);
-  // copy the current generation's strata estimator for this operation
+  /* copy the current generation's strata estimator for this operation */
   op->state->se = strata_estimator_dup (op->spec->set->state->se);
   /* we started the operation, thus we have to send the operation request */
   op->state->phase = PHASE_EXPECT_SE;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating union operation\n");
-  send_operation_request (op);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Initiating union operation evaluation\n");
+  ev = GNUNET_MQ_msg_nested_mh (msg,
+                                GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
+                                opaque_context);
+  if (NULL == ev)
+  {
+    /* the context message is too large */
+    GNUNET_break (0);
+    GNUNET_SERVER_client_disconnect (op->spec->set->client);
+    return;
+  }
+  msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
+  msg->app_id = op->spec->app_id;
+  msg->salt = htonl (op->spec->salt);
+  GNUNET_MQ_send (op->mq, ev);
+
+  if (NULL != opaque_context)
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "sent op request with context message\n");
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "sent op request without context message\n");
 }
 
 
@@ -1225,10 +1231,10 @@ union_accept (struct Operation *op)
 
 /**
  * Create a new set supporting the union operation
- * 
+ *
  * We maintain one strata estimator per set and then manipulate it over the
  * lifetime of the set, as recreating a strata estimator would be expensive.
- * 
+ *
  * @return the newly created set
  */
 static struct SetState *
@@ -1321,16 +1327,16 @@ union_handle_p2p_message (struct Operation *op,
       handle_p2p_done (op, mh);
       break;
     default:
-      /* something wrong with mesh's message handlers? */
+      /* something wrong with cadet's message handlers? */
       GNUNET_assert (0);
   }
   return GNUNET_OK;
 }
 
 /**
- * handler for peer-disconnects, notifies the client 
+ * handler for peer-disconnects, notifies the client
  * about the aborted operation in case the op was not concluded
- * 
+ *
  * @param op the destroyed operation
  */
 static void
@@ -1346,8 +1352,9 @@ union_peer_disconnect (struct Operation *op)
     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
     msg->element_type = htons (0);
     GNUNET_MQ_send (op->spec->set->client_mq, ev);
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
-    _GSS_operation_destroy (op);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "other peer disconnected prematurely\n");
+    _GSS_operation_destroy (op, GNUNET_YES);
     return;
   }
   // else: the session has already been concluded
@@ -1360,7 +1367,7 @@ union_peer_disconnect (struct Operation *op)
 /**
  * Get the table with implementing functions for
  * set union.
- * 
+ *
  * @return the operation specific VTable
  */
 const struct SetVT *