Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
index d2dfe049b16e960e98e84bb3518c6ec315edde25..f46713c3102d25b403b3aea2ae369d6915a0e9d0 100644 (file)
@@ -85,6 +85,7 @@ enum UnionOperationPhase
    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
    *
    * XXX: could use better wording.
+   * XXX: repurposed to also expect a "request full set" message, should be renamed
    *
    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
    */
@@ -202,6 +203,12 @@ struct OperationState
    * Total number of elements received from the other peer.
    */
   uint32_t received_total;
+
+  /**
+   * Initial size of our set, just before
+   * the operation started.
+   */
+  uint64_t initial_size;
 };
 
 
@@ -441,7 +448,7 @@ op_get_element (struct Operation *op,
 {
   int ret;
   struct IBF_Key ibf_key;
-  struct GetElementContext ctx = { 0 };
+  struct GetElementContext ctx = {{{ 0 }} , 0};
 
   ctx.hash = *element_hash;
 
@@ -708,7 +715,7 @@ send_strata_estimator (struct Operation *op)
 {
   const struct StrataEstimator *se = op->state->se;
   struct GNUNET_MQ_Envelope *ev;
-  struct GNUNET_MessageHeader *strata_msg;
+  struct StrataEstimatorMessage *strata_msg;
   char *buf;
   size_t len;
   uint16_t type;
@@ -720,13 +727,14 @@ send_strata_estimator (struct Operation *op)
     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
   else
     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
-  ev = GNUNET_MQ_msg_header_extra (strata_msg,
-                                   len,
-                                   type);
+  ev = GNUNET_MQ_msg_extra (strata_msg,
+                            len,
+                            type);
   GNUNET_memcpy (&strata_msg[1],
           buf,
           len);
   GNUNET_free (buf);
+  strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
   GNUNET_MQ_send (op->mq,
                   ev);
   op->state->phase = PHASE_EXPECT_IBF;
@@ -753,7 +761,8 @@ get_order_from_difference (unsigned int diff)
     ibf_order++;
   if (ibf_order > MAX_IBF_ORDER)
     ibf_order = MAX_IBF_ORDER;
-  return ibf_order;
+  // add one for correction
+  return ibf_order + 1;
 }
 
 
@@ -773,11 +782,13 @@ send_element_iterator (void *cls,
 {
   struct Operation *op = cls;
   struct GNUNET_SET_ElementMessage *emsg;
-  struct GNUNET_SET_Element *el = value;
+  struct ElementEntry *ee = value;
+  struct GNUNET_SET_Element *el = &ee->element;
   struct GNUNET_MQ_Envelope *ev;
 
+
   ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
-  emsg->element_type = htonl (el->element_type);
+  emsg->element_type = htons (el->element_type);
   GNUNET_memcpy (&emsg[1], el->data, el->size);
   GNUNET_MQ_send (op->mq, ev);
   return GNUNET_YES;
@@ -814,7 +825,9 @@ handle_p2p_strata_estimator (void *cls,
 {
   struct Operation *op = cls;
   struct StrataEstimator *remote_se;
-  int diff;
+  struct StrataEstimatorMessage *msg = (void *) mh;
+  unsigned int diff;
+  uint64_t other_size;
   size_t len;
 
   GNUNET_STATISTICS_update (_GSS_statistics,
@@ -824,11 +837,11 @@ handle_p2p_strata_estimator (void *cls,
 
   if (op->state->phase != PHASE_EXPECT_SE)
   {
-    fail_union_operation (op);
     GNUNET_break (0);
+    fail_union_operation (op);
     return GNUNET_SYSERR;
   }
-  len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
+  len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
   if ( (GNUNET_NO == is_compressed) &&
        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
   {
@@ -836,6 +849,7 @@ handle_p2p_strata_estimator (void *cls,
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
+  other_size = GNUNET_ntohll (msg->set_size);
   remote_se = strata_estimator_create (SE_STRATA_COUNT,
                                        SE_IBF_SIZE,
                                        SE_IBF_HASH_NUM);
@@ -846,7 +860,7 @@ handle_p2p_strata_estimator (void *cls,
     return GNUNET_SYSERR;
   }
   if (GNUNET_OK !=
-      strata_estimator_read (&mh[1],
+      strata_estimator_read (&msg[1],
                              len,
                              is_compressed,
                              remote_se))
@@ -859,6 +873,10 @@ handle_p2p_strata_estimator (void *cls,
   GNUNET_assert (NULL != op->state->se);
   diff = strata_estimator_difference (remote_se,
                                       op->state->se);
+
+  if (diff > 200)
+    diff = diff * 3 / 2; 
+
   strata_estimator_destroy (remote_se);
   strata_estimator_destroy (op->state->se);
   op->state->se = NULL;
@@ -867,16 +885,42 @@ handle_p2p_strata_estimator (void *cls,
        diff,
        1<<get_order_from_difference (diff));
 
-  if (diff > GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements) / 2)
+  if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
+  {
+    GNUNET_break (0);
+    fail_union_operation (op);
+    return GNUNET_SYSERR;
+  }
+
+
+  if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4))
   {
     LOG (GNUNET_ERROR_TYPE_INFO,
          "Sending full set (diff=%d, own set=%u)\n",
          diff,
-         GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
-    send_full_set (op);
+         op->state->initial_size);
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# of full sends",
+                              1,
+                              GNUNET_NO);
+    if (op->state->initial_size <= other_size)
+    {
+      send_full_set (op);
+    }
+    else
+    {
+      struct GNUNET_MQ_Envelope *ev;
+      op->state->phase = PHASE_EXPECT_IBF;
+      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
+      GNUNET_MQ_send (op->mq, ev);
+    }
   }
   else
   {
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# of ibf sends",
+                              1,
+                              GNUNET_NO);
     if (GNUNET_OK !=
         send_ibf (op,
                   get_order_from_difference (diff)))
@@ -1255,7 +1299,8 @@ send_client_element (struct Operation *op,
   }
   rm->result_status = htons (status);
   rm->request_id = htonl (op->spec->client_request_id);
-  rm->element_type = element->element_type;
+  rm->element_type = htons (element->element_type);
+  rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
   GNUNET_memcpy (&rm[1], element->data, element->size);
   GNUNET_MQ_send (op->spec->set->client_mq, ev);
 }
@@ -1278,6 +1323,7 @@ send_done_and_destroy (void *cls)
   rm->request_id = htonl (op->spec->client_request_id);
   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
   rm->element_type = htons (0);
+  rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
   GNUNET_MQ_send (op->spec->set->client_mq, ev);
   /* Will also call the union-specific cancel function. */
   _GSS_operation_destroy (op, GNUNET_YES);
@@ -1522,9 +1568,15 @@ handle_p2p_full_element (void *cls,
     }
   }
 
-  if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
+  if ( (GNUNET_YES == op->spec->byzantine) && 
+       (op->state->received_total > 384 + op->state->received_fresh * 4) && 
+       (op->state->received_fresh < op->state->received_total / 6) )
   {
     /* The other peer gave us lots of old elements, there's something wrong. */
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "Other peer sent only %llu/%llu fresh elements, failing operation\n",
+         (unsigned long long) op->state->received_fresh,
+         (unsigned long long) op->state->received_total);
     GNUNET_break_op (0);
     fail_union_operation (op);
     return;
@@ -1610,6 +1662,32 @@ send_missing_elements_iter (void *cls,
   return GNUNET_YES;
 }
 
+
+/**
+ * Handle a 
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_request_full (void *cls,
+                         const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+
+  if (PHASE_EXPECT_IBF != op->state->phase)
+  {
+    fail_union_operation (op);
+    GNUNET_break_op (0);
+    return;
+  }
+
+  // FIXME: we need to check that our set is larger than the
+  // byzantine_lower_bound by some threshold
+  send_full_set (op);
+}
+
+
 /**
  * Handle a "full done" message.
  *
@@ -1915,6 +1993,7 @@ union_evaluate (struct Operation *op,
          "sent op request without context message\n");
 
   initialize_key_to_element (op);
+  op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
 }
 
 
@@ -1945,6 +2024,7 @@ union_accept (struct Operation *op)
   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
   op->state->salt_receive = op->state->salt_send = 42;
   initialize_key_to_element (op);
+  op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
   /* kick off the operation */
   send_strata_estimator (op);
 }
@@ -2070,6 +2150,9 @@ union_handle_p2p_message (struct Operation *op,
     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
       handle_p2p_full_done (op, mh);
       break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
+      handle_p2p_request_full (op, mh);
+      break;
     default:
       /* Something wrong with cadet's message handlers? */
       GNUNET_assert (0);