Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
index 16ca6a57f12350cb164567e1fd95f5384bc35e78..b934f468fbddad7cb6ea1a635e9f6175c5b147dd 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "platform.h"
 #include "gnunet_util_lib.h"
+#include "gnunet_block_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_applications.h"
 #include "gnunet_set_service.h"
@@ -34,8 +35,6 @@
 #include "consensus_protocol.h"
 #include "consensus.h"
 
-#define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
-
 
 enum ReferendumVote
 {
@@ -65,11 +64,6 @@ enum EarlyStoppingPhase
 
 GNUNET_NETWORK_STRUCT_BEGIN
 
-
-struct ContestedPayload
-{
-};
-
 /**
  * Tuple of integers that together
  * identify a task uniquely.
@@ -147,6 +141,7 @@ GNUNET_NETWORK_STRUCT_END
 enum PhaseKind
 {
   PHASE_KIND_ALL_TO_ALL,
+  PHASE_KIND_ALL_TO_ALL_2,
   PHASE_KIND_GRADECAST_LEADER,
   PHASE_KIND_GRADECAST_ECHO,
   PHASE_KIND_GRADECAST_ECHO_GRADE,
@@ -492,6 +487,18 @@ struct ConsensusSession
    * State of our early stopping scheme.
    */
   int early_stopping;
+
+  /**
+   * Our set size from the first round.
+   */
+  uint64_t first_size;
+
+  uint64_t *first_sizes_received;
+
+  /**
+   * Bounded Eppstein lower bound.
+   */
+  uint64_t lower_bound;
 };
 
 /**
@@ -534,6 +541,7 @@ phasename (uint16_t phase)
   switch (phase)
   {
     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
+    case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
     case PHASE_KIND_FINISH: return "FINISH";
     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
@@ -669,16 +677,25 @@ send_to_client_iter (void *cls,
   if (NULL != element)
   {
     struct GNUNET_CONSENSUS_ElementMessage *m;
+    const struct ConsensusElement *ce;
+
+    GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
+    ce = element->data;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "marker is %u\n", (unsigned) ce->marker);
+
+    if (0 != ce->marker)
+      return GNUNET_YES;
 
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "P%d: sending element %s to client\n",
                 session->local_peer_idx,
                 debug_str_element (element));
 
-    ev = GNUNET_MQ_msg_extra (m, element->size,
+    ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
-    m->element_type = htons (element->element_type);
-    GNUNET_memcpy (&m[1], element->data, element->size);
+    m->element_type = ce->payload_type;
+    GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
     GNUNET_MQ_send (session->client_mq, ev);
   }
   else
@@ -848,7 +865,7 @@ rfn_vote (struct ReferendumEntry *rfn,
 }
 
 
-uint16_t
+static uint16_t
 task_other_peer (struct TaskEntry *task)
 {
   uint16_t me = task->step->session->local_peer_idx;
@@ -858,17 +875,33 @@ task_other_peer (struct TaskEntry *task)
 }
 
 
+static int
+cmp_uint64_t (const void *pa, const void *pb)
+{
+  uint64_t a = *(uint64_t *) pa;
+  uint64_t b = *(uint64_t *) pb;
+
+  if (a == b)
+    return 0;
+  if (a < b)
+    return -1;
+  return 1;
+}
+
+
 /**
  * Callback for set operation results. Called for each element
  * in the result set.
  *
  * @param cls closure
  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
+ * @param current_size current set size
  * @param status see enum GNUNET_SET_Status
  */
 static void
 set_result_cb (void *cls,
                const struct GNUNET_SET_Element *element,
+               uint64_t current_size,
                enum GNUNET_SET_Status status)
 {
   struct TaskEntry *task = cls;
@@ -878,6 +911,18 @@ set_result_cb (void *cls,
   struct ReferendumEntry *output_rfn = NULL;
   unsigned int other_idx;
   struct SetOpCls *setop;
+  const struct ConsensusElement *consensus_element = NULL;
+
+  if (NULL != element)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "P%u: got element of type %u, status %u\n",
+                session->local_peer_idx,
+                (unsigned) element->element_type,
+                (unsigned) status);
+    GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
+    consensus_element = element->data;
+  }
 
   setop = &task->cls.setop;
 
@@ -930,19 +975,53 @@ set_result_cb (void *cls,
       return;
   }
 
-  if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
+  if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
   {
-    if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "P%u: got some marker\n",
+                  session->local_peer_idx);
+    if ( (GNUNET_YES == setop->transceive_contested) &&
+         (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
     {
       GNUNET_assert (NULL != output_rfn);
       rfn_contest (output_rfn, task_other_peer (task));
       return;
     }
+
+    if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
+    {
+
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                  "P%u: got size marker\n",
+                  session->local_peer_idx);
+
+
+      struct ConsensusSizeElement *cse = (void *) consensus_element;
+
+      if (cse->sender_index == other_idx)
+      {
+        if (NULL == session->first_sizes_received)
+          session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
+        session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
+
+        uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
+        qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
+        session->lower_bound = copy[session->num_peers / 3 + 1];
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: lower bound %llu\n",
+                    session->local_peer_idx,
+                    (long long) session->lower_bound);
+      }
+      return;
+    }
+
+    return;
   }
 
   switch (status)
   {
     case GNUNET_SET_STATUS_ADD_LOCAL:
+      GNUNET_assert (NULL != consensus_element);
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Adding element in Task {%s}\n",
                   debug_str_task_key (&task->key));
@@ -989,9 +1068,10 @@ set_result_cb (void *cls,
       // XXX: add result to structures in task
       break;
     case GNUNET_SET_STATUS_ADD_REMOTE:
+      GNUNET_assert (NULL != consensus_element);
       if (GNUNET_YES == setop->do_not_remove)
         break;
-      if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
+      if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
         break;
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Removing element in Task {%s}\n",
@@ -1047,6 +1127,10 @@ set_result_cb (void *cls,
       {
         rfn_commit (output_rfn, task_other_peer (task));
       }
+      if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
+      {
+        session->first_size = current_size;
+      }
       finish_task (task);
       break;
     case GNUNET_SET_STATUS_FAILURE:
@@ -1069,6 +1153,7 @@ enum EvilnessType
   EVILNESS_CRAM_LEAD,
   EVILNESS_CRAM_ECHO,
   EVILNESS_SLACK,
+  EVILNESS_SLACK_A2A,
 };
 
 enum EvilnessSubType
@@ -1161,6 +1246,10 @@ get_evilness (struct ConsensusSession *session, struct Evilness *evil)
       {
         evil->type = EVILNESS_SLACK;
       }
+      if (0 == strcmp ("slack-a2a", evil_type_str))
+      {
+        evil->type = EVILNESS_SLACK_A2A;
+      }
       else if (0 == strcmp ("cram-all", evil_type_str))
       {
         evil->type = EVILNESS_CRAM_ALL;
@@ -1226,6 +1315,31 @@ commit_set (struct ConsensusSession *session,
   set = lookup_set (session, &setop->input_set);
   GNUNET_assert (NULL != set);
 
+  if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
+  {
+    struct GNUNET_SET_Element element;
+    struct ConsensusElement ce = { 0 };
+    ce.marker = CONSENSUS_MARKER_CONTESTED;
+    element.data = &ce;
+    element.size = sizeof (struct ConsensusElement);
+    element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
+    GNUNET_SET_add_element (set->h, &element, NULL, NULL);
+  }
+
+  if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
+  {
+    struct GNUNET_SET_Element element;
+    struct ConsensusSizeElement cse = { 0 };
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting size marker\n");
+    cse.ce.marker = CONSENSUS_MARKER_SIZE;
+    cse.size = GNUNET_htonll (session->first_size);
+    cse.sender_index = session->local_peer_idx;
+    element.data = &cse;
+    element.size = sizeof (struct ConsensusSizeElement);
+    element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
+    GNUNET_SET_add_element (set->h, &element, NULL, NULL);
+  }
+
 #ifdef EVIL
   {
     unsigned int i;
@@ -1267,21 +1381,21 @@ commit_set (struct ConsensusSession *session,
         }
         for (i = 0; i < evil.num; i++)
         {
-          struct GNUNET_HashCode hash;
           struct GNUNET_SET_Element element;
-          element.data = &hash;
-          element.size = sizeof (struct GNUNET_HashCode);
-          element.element_type = 0;
+          struct ConsensusStuffedElement se = { 0 };
+          element.data = &se;
+          element.size = sizeof (struct ConsensusStuffedElement);
+          element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
 
           if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
           {
             /* Always generate a new element. */
-            GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &hash);
+            GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
           }
           else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
           {
             /* Always cram the same elements, derived from counter. */
-            GNUNET_CRYPTO_hash (&i, sizeof (i), &hash);
+            GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
           }
           else
           {
@@ -1308,6 +1422,19 @@ commit_set (struct ConsensusSession *session,
                     "P%u: evil peer: slacking\n",
                     (unsigned int) session->local_peer_idx);
         /* Do nothing. */
+      case EVILNESS_SLACK_A2A:
+        if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
+             (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
+        {
+          struct GNUNET_SET_Handle *empty_set;
+          empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+          GNUNET_SET_commit (setop->op, empty_set);
+          GNUNET_SET_destroy (empty_set);
+        }
+        else
+        {
+          GNUNET_SET_commit (setop->op, set->h);
+        }
         break;
       case EVILNESS_NONE:
         GNUNET_SET_commit (setop->op, set->h);
@@ -1315,15 +1442,6 @@ commit_set (struct ConsensusSession *session,
     }
   }
 #else
-  if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
-  {
-    struct GNUNET_SET_Element element;
-    struct ContestedPayload payload;
-    element.data = &payload;
-    element.size = sizeof (struct ContestedPayload);
-    element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
-    GNUNET_SET_add_element (set->h, &element, NULL, NULL);
-  }
   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
   {
     GNUNET_SET_commit (setop->op, set->h);
@@ -2007,12 +2125,18 @@ task_start_reconcile (struct TaskEntry *task)
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
 
+    struct GNUNET_SET_Option opts[] = {
+      { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
+      { GNUNET_SET_OPTION_END },
+    };
+
     // XXX: maybe this should be done while
     // setting up tasks alreays?
     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
                                     &session->global_id,
                                     &rcm.header,
                                     GNUNET_SET_RESULT_SYMMETRIC,
+                                    opts,
                                     set_result_cb,
                                     task);
 
@@ -2437,8 +2561,14 @@ set_listen_cb (void *cls,
   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
                     (task->key.peer2 == session->local_peer_idx)));
 
+  struct GNUNET_SET_Option opts[] = {
+    { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
+    { GNUNET_SET_OPTION_END },
+  };
+
   task->cls.setop.op = GNUNET_SET_accept (request,
                                           GNUNET_SET_RESULT_SYMMETRIC,
+                                          opts,
                                           set_result_cb,
                                           task);
 
@@ -2834,11 +2964,42 @@ construct_task_graph (struct ConsensusSession *session)
     put_task (session->taskmap, &task);
   }
 
+  round += 1;
   prev_step = step;
-  step = NULL;
+  step = create_step (session, round, GNUNET_NO);;
+#ifdef GNUNET_EXTRA_LOGGING
+  step->debug_name = GNUNET_strdup ("all to all 2");
+#endif
+  step_depend_on (step, prev_step);
+
+
+  for (i = 0; i < n; i++)
+  {
+    uint16_t p1;
+    uint16_t p2;
+
+    p1 = me;
+    p2 = i;
+    arrange_peers (&p1, &p2, n);
+    task = ((struct TaskEntry) {
+      .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
+      .step = step,
+      .start = task_start_reconcile,
+      .cancel = task_cancel_reconcile,
+    });
+    task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
+    task.cls.setop.output_set = task.cls.setop.input_set;
+    task.cls.setop.do_not_remove = GNUNET_YES;
+    put_task (session->taskmap, &task);
+  }
 
   round += 1;
 
+  prev_step = step;
+  step = NULL;
+
+
+
   /* Byzantine union */
 
   /* sequential repetitions of the gradecasts */
@@ -3039,9 +3200,9 @@ handle_client_insert (void *cls,
                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
 {
   struct ConsensusSession *session = cls;
-  struct GNUNET_SET_Element *element;
   ssize_t element_size;
   struct GNUNET_SET_Handle *initial_set;
+  struct ConsensusElement *ce;
 
   if (GNUNET_YES == session->conclude_started)
   {
@@ -3049,12 +3210,18 @@ handle_client_insert (void *cls,
     GNUNET_SERVICE_client_drop (session->client);
     return;
   }
+
   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
-  element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
-  element->element_type = msg->element_type;
-  element->size = element_size;
-  GNUNET_memcpy (&element[1], &msg[1], element_size);
-  element->data = &element[1];
+  ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
+  GNUNET_memcpy (&ce[1], &msg[1], element_size);
+  ce->payload_type = msg->element_type;
+
+  struct GNUNET_SET_Element element = {
+    .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
+    .size = sizeof (struct ConsensusElement) + element_size,
+    .data = ce,
+  };
+
   {
     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
     struct SetEntry *entry;
@@ -3064,26 +3231,22 @@ handle_client_insert (void *cls,
     GNUNET_assert (NULL != entry);
     initial_set = entry->h;
   }
+
   session->num_client_insert_pending++;
   GNUNET_SET_add_element (initial_set,
-                          element,
+                          &element,
                           &client_insert_done,
                           session);
 
 #ifdef GNUNET_EXTRA_LOGGING
   {
-    struct GNUNET_HashCode hash;
-
-    GNUNET_SET_element_hash (element,
-                             &hash);
-
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "P%u: element %s added\n",
                 session->local_peer_idx,
-                GNUNET_h2s (&hash));
+                debug_str_element (&element));
   }
 #endif
-  GNUNET_free (element);
+  GNUNET_free (ce);
   GNUNET_SERVICE_client_continue (session->client);
 }