- break;
- case IBF_STATE_RECEIVING:
- break;
- default:
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received ibf unexpectedly in state %d\n", cpi->ibf_state);
- mark_peer_bad (cpi);
- return GNUNET_NO;
- }
-
- if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n");
- mark_peer_bad (cpi);
- return GNUNET_NO;
- }
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets,
- cpi->ibf_bucket_counter, (1 << cpi->ibf_order));
-
- if (NULL == cpi->ibf)
- cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
-
- buf = (void *) &digest[1];
- ibf_read_slice (&buf, NULL, cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
-
- cpi->ibf_bucket_counter += num_buckets;
-
- if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n");
- cpi->ibf_state = IBF_STATE_DECODING;
- prepare_ibf (cpi);
- ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
- write_requests_and_elements (cpi, GNUNET_STREAM_OK, 0);
- }
- return GNUNET_YES;
-}
-
-
-static int
-handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
-{
- struct PendingElement *pending_element;
- struct GNUNET_CONSENSUS_Element *element;
- struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
- size_t size;
-
- size = ntohs (element_msg->size) - sizeof *element_msg;
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving element, size=%d\n", size);
-
- element = GNUNET_malloc (size + sizeof *element);
- element->size = size;
- memcpy (&element[1], &element_msg[1], size);
- element->data = &element[1];
-
- pending_element = GNUNET_malloc (sizeof *pending_element);
- pending_element->element = element;
- GNUNET_CONTAINER_DLL_insert_tail (cpi->session->approval_pending_head, cpi->session->approval_pending_tail, pending_element);
-
- client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg);
- client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
- client_element_msg->header.size = htons (size + sizeof *client_element_msg);
- memcpy (&client_element_msg[1], &element[1], size);
-
- queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg);
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element\n");
-
- send_next (cpi->session);
-
- return GNUNET_YES;
-}
-
-
-/**
- * Functions of this signature are called whenever writing operations
- * on a stream are executed
- *
- * @param cls the closure from GNUNET_STREAM_write
- * @param status the status of the stream at the time this function is called;
- * GNUNET_STREAM_OK if writing to stream was completed successfully;
- * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
- * (this doesn't mean that the data is never sent, the receiver may
- * have read the data but its ACKs may have been lost);
- * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
- * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
- * be processed.
- * @param size the number of bytes written
- */
-static void
-write_requested_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size)
-{
- struct ConsensusPeerInformation *cpi;
- cpi = cls;
- GNUNET_assert (NULL == cpi->wh);
- cpi->wh = NULL;
- if (NULL != cpi->requests_and_elements_head)
- {
- struct QueuedMessage *qm;
- qm = cpi->requests_and_elements_head;
- GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
-
- cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- write_requested_elements, cpi);
- GNUNET_assert (NULL != cpi->wh);
- }
-}
-
-
-/**
- * Handle a request for elements.
- * Only allowed in exchange-rounds.
- */
-static int
-handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
-{
- struct GNUNET_HashCode *hashcode;
- unsigned int num;
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request\n");
- num = ntohs (msg->header.size) / sizeof (struct GNUNET_HashCode);
- hashcode = (struct GNUNET_HashCode *) &msg[1];
- while (num--)
- {
- GNUNET_assert (IBF_STATE_ANTICIPATE_DIFF == cpi->ibf_state);
- GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, hashcode, send_element_iter, cpi);
- if (NULL == cpi->wh)
- write_requested_elements (cpi, GNUNET_STREAM_OK, 0);
- hashcode++;
- }
- return GNUNET_YES;
-}
-
-
-/**
- * Handle a HELLO-message, send when another peer wants to join a session where
- * our peer is a member. The session may or may not be inhabited yet.
- */
-static int
-handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
-{
- /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */
- struct ConsensusSession *session;
- session = sessions_head;
- while (NULL != session)
- {
- if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
- {
- int idx;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n");
- idx = get_peer_idx (inc->peer, session);
- GNUNET_assert (-1 != idx);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx);
- inc->cpi = &session->info[idx];
- GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing);
- inc->cpi->mst = inc->mst;
- inc->cpi->hello = GNUNET_YES;
- inc->cpi->socket = inc->socket;
-
- if ( (CONSENSUS_ROUND_A2A_EXCHANGE == session->current_round) &&
- (GNUNET_YES == inc->cpi->is_outgoing))