- }
-}
-
-
-static int
-handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
-{
- int num_buckets;
-
- /* FIXME: find out if we're still expecting the same ibf! */
-
- cpi->apparent_round = cpi->session->current_round;
- // FIXME: check header.size >= sizeof (DD)
- num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
- switch (cpi->ibf_state)
- {
- case IBF_STATE_NONE:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
- cpi->ibf_state = IBF_STATE_RECEIVING;
- cpi->ibf_order = digest->order;
- cpi->ibf_bucket_counter = 0;
- if (NULL != cpi->ibf)
- {
- ibf_destroy (cpi->ibf);
- cpi->ibf = NULL;
- }
- break;
- case IBF_STATE_ANTICIPATE_DIFF:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n",
- cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
- cpi->ibf_state = IBF_STATE_RECEIVING;
- cpi->ibf_order = digest->order;
- cpi->ibf_bucket_counter = 0;
- if (NULL != cpi->ibf)
- {
- ibf_destroy (cpi->ibf);
- cpi->ibf = NULL;
- }
- break;
- case IBF_STATE_RECEIVING:
- break;
- default:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
- return GNUNET_YES;
- }
-
- if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
- return GNUNET_YES;
- }
-
- if (NULL == cpi->ibf)
- cpi->ibf = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM);
-
- ibf_read_slice (&digest[1], cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
- cpi->ibf_bucket_counter += num_buckets;
-
- if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
- {
- cpi->ibf_state = IBF_STATE_DECODING;
- cpi->ibf_bucket_counter = 0;
- prepare_ibf (cpi);
- ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
- decode (cpi);
- }
- return GNUNET_YES;
-}
-
-
-/**
- * Insert an element into the consensus set of the specified session.
- * The element will not be copied, and freed when destroying the session.
- *
- * @param session session for new element
- * @param element element to insert
- */
-static void
-insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element)
-{
- struct GNUNET_HashCode hash;
- struct ElementInfo *e;
- struct IBF_Key ibf_key;
- int i;
-
- e = GNUNET_new (struct ElementInfo);
- e->element = element;
- e->element_hash = GNUNET_new (struct GNUNET_HashCode);
- GNUNET_CRYPTO_hash (e->element->data, e->element->size, e->element_hash);
- ibf_key = ibf_key_from_hashcode (e->element_hash);
- ibf_hashcode_from_key (ibf_key, &hash);
- strata_estimator_insert (session->se, &hash);
- GNUNET_CONTAINER_multihashmap_put (session->values, e->element_hash, e,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
- GNUNET_CONTAINER_multihashmap_put (session->ibf_key_map, &hash, e->element_hash,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
- for (i = 0; i <= MAX_IBF_ORDER; i++)
- {
- if (NULL == session->ibfs[i])
- continue;
- ibf_insert (session->ibfs[i], ibf_key);
- }
-}
-
-
-/**
- * Handle an element that another peer sent us
- */
-static int
-handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
-{
- struct GNUNET_CONSENSUS_Element *element;
- size_t size;
-
- switch (cpi->session->current_round)
- {
- case CONSENSUS_ROUND_COMPLETION:
- /* FIXME: check if we really expect the element */
- case CONSENSUS_ROUND_EXCHANGE:
- break;
- default:
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n");
- return GNUNET_YES;
- }
-
- size = ntohs (element_msg->size) - sizeof *element_msg;
-
- element = GNUNET_malloc (size + sizeof *element);
- element->size = size;
- memcpy (&element[1], &element_msg[1], size);
- element->data = &element[1];
-
- LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got element\n");
-
- insert_element (cpi->session, element);
-
- return GNUNET_YES;
-}
-
-
-/**
- * Handle a request for elements.
- *
- * @param cpi peer that is requesting the element
- * @param msg the element request message
- */
-static int
-handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
-{
- struct GNUNET_HashCode hashcode;
- struct IBF_Key *ibf_key;
- unsigned int num;
-
- /* element requests are allowed in every round */
-
- num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
-
- ibf_key = (struct IBF_Key *) &msg[1];
- while (num--)
- {
- ibf_hashcode_from_key (*ibf_key, &hashcode);
- GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi);
- ibf_key++;
- }
- return GNUNET_YES;
-}
-
-static int
-is_peer_connected (struct ConsensusPeerInformation *cpi)
-{
- if (NULL == cpi->mss.socket)
- return GNUNET_NO;
- return GNUNET_YES;
-}
-
-
-static void
-ensure_peer_connected (struct ConsensusPeerInformation *cpi)
-{
- if (NULL != cpi->mss.socket)
- return;
- cpi->mss.socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
- open_cb, cpi, GNUNET_STREAM_OPTION_END);
-}
-
-
-/**
- * If necessary, send a message to the peer, depending on the current
- * round.
- */
-static void
-embrace_peer (struct ConsensusPeerInformation *cpi)
-{
- if (GNUNET_NO == is_peer_connected (cpi))
- {
- ensure_peer_connected (cpi);
- return;
- }
- if (GNUNET_NO == cpi->hello)