/*
This file is part of GNUnet.
- (C) 2013, 2014 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2013, 2014 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
gcry_mpi_t product;
/**
- * how many elements we were supplied with from the client
+ * How many elements we were supplied with from the client (total
+ * count before intersection).
*/
uint32_t total;
/**
- * how many elements actually are used for the scalar product.
- * Size of the arrays in @e r and @e r_prime.
+ * How many elements actually are used for the scalar product.
+ * Size of the arrays in @e r and @e r_prime. Sometimes also
+ * reset to 0 and used as a counter!
*/
uint32_t used_element_count;
/**
- * already transferred elements (sent/received) for multipart messages, less or equal than @e used_element_count for
+ * Already transferred elements from client to us.
+ * Less or equal than @e total.
*/
- uint32_t transferred_element_count;
+ uint32_t client_received_element_count;
/**
- * Is this session active (#GNUNET_YES), Concluded (#GNUNET_NO), or
- * had an error (#GNUNET_SYSERR).
- * FIXME: replace with proper enum for status codes!
+ * Already transferred elements from Bob to us.
+ * Less or equal than @e total.
*/
- int32_t active;
+ uint32_t cadet_received_element_count;
+
+ /**
+ * State of this session. In
+ * #GNUNET_SCALARPRODUCT_STATUS_ACTIVE while operation is
+ * ongoing, afterwards in #GNUNET_SCALARPRODUCT_STATUS_SUCCESS or
+ * #GNUNET_SCALARPRODUCT_STATUS_FAILURE.
+ */
+ enum GNUNET_SCALARPRODUCT_ResponseStatus status;
/**
* Flag to prevent recursive calls to #destroy_service_session() from
* @return #GNUNET_OK (continue to iterate)
*/
static int
-free_element (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
+free_element_cb (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
{
struct GNUNET_SCALARPRODUCT_Element *e = value;
if (NULL != s->intersected_elements)
{
GNUNET_CONTAINER_multihashmap_iterate (s->intersected_elements,
- &free_element,
+ &free_element_cb,
s);
GNUNET_CONTAINER_multihashmap_destroy (s->intersected_elements);
s->intersected_elements = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending session-end notification with status %d to client for session %s\n",
- session->active,
+ session->status,
GNUNET_h2s (&session->session_id));
e = GNUNET_MQ_msg (msg,
GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
msg->product_length = htonl (0);
- msg->status = htonl (session->active);
+ msg->status = htonl (session->status);
GNUNET_MQ_send (session->client_mq,
e);
}
e = GNUNET_MQ_msg_extra (msg,
product_length,
GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
+ msg->status = htonl (GNUNET_SCALARPRODUCT_STATUS_SUCCESS);
msg->range = htonl (range);
msg->product_length = htonl (product_length);
if (NULL != product_exported)
"Peer disconnected, terminating session %s with peer %s\n",
GNUNET_h2s (&s->session_id),
GNUNET_i2s (&s->peer));
+ if (NULL != s->cadet_mq)
+ {
+ GNUNET_MQ_destroy (s->cadet_mq);
+ s->cadet_mq = NULL;
+ }
s->channel = NULL;
- if (GNUNET_YES == s->active)
+ if (GNUNET_SCALARPRODUCT_STATUS_ACTIVE == s->status)
{
/* We didn't get an answer yet, fail with error */
- s->active = GNUNET_SYSERR;
+ s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
prepare_client_end_notification (s);
}
}
const struct GNUNET_MessageHeader *message)
{
struct AliceServiceSession *s = *channel_ctx;
- const struct MultipartMessage *msg;
+ const struct BobCryptodataMultipartMessage *msg;
const struct GNUNET_CRYPTO_PaillierCiphertext *payload;
size_t i;
uint32_t contained;
return GNUNET_SYSERR;
}
msg_size = ntohs (message->size);
- if (sizeof (struct MultipartMessage) > msg_size)
+ if (sizeof (struct BobCryptodataMultipartMessage) > msg_size)
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
- msg = (const struct MultipartMessage *) message;
+ msg = (const struct BobCryptodataMultipartMessage *) message;
contained = ntohl (msg->contained_element_count);
- required_size = sizeof (struct MultipartMessage)
+ required_size = sizeof (struct BobCryptodataMultipartMessage)
+ 2 * contained * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
if ( (required_size != msg_size) ||
- (s->transferred_element_count + contained > s->used_element_count) )
+ (s->cadet_received_element_count + contained > s->used_element_count) )
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received %u additional crypto values from Bob\n",
+ (unsigned int) contained);
+
payload = (const struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
/* Convert each k[][perm] to its MPI_value */
for (i = 0; i < contained; i++)
{
- memcpy (&s->r[s->transferred_element_count + i],
+ memcpy (&s->r[s->cadet_received_element_count + i],
&payload[2 * i],
sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
- memcpy (&s->r_prime[s->transferred_element_count + i],
+ memcpy (&s->r_prime[s->cadet_received_element_count + i],
&payload[2 * i],
sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
}
- s->transferred_element_count += contained;
- if (s->transferred_element_count != s->used_element_count)
+ s->cadet_received_element_count += contained;
+ GNUNET_CADET_receive_done (s->channel);
+ if (s->cadet_received_element_count != s->used_element_count)
return GNUNET_OK;
s->product = compute_scalar_product (s);
const struct GNUNET_MessageHeader *message)
{
struct AliceServiceSession *s = *channel_ctx;
- const struct ServiceResponseMessage *msg;
+ const struct BobCryptodataMessage *msg;
const struct GNUNET_CRYPTO_PaillierCiphertext *payload;
uint32_t i;
uint32_t contained;
return GNUNET_SYSERR;
}
msg_size = ntohs (message->size);
- if (sizeof (struct ServiceResponseMessage) > msg_size)
+ if (sizeof (struct BobCryptodataMessage) > msg_size)
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
- msg = (const struct ServiceResponseMessage *) message;
+ msg = (const struct BobCryptodataMessage *) message;
contained = ntohl (msg->contained_element_count);
- required_size = sizeof (struct ServiceResponseMessage)
+ required_size = sizeof (struct BobCryptodataMessage)
+ 2 * contained * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)
+ 2 * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
if ( (msg_size != required_size) ||
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
- if ( (NULL == s->sorted_elements) ||
- (s->used_element_count != s->transferred_element_count) )
+ if (NULL == s->sorted_elements)
{
/* we're not ready yet, how can Bob be? */
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
+ if (s->total != s->client_received_element_count)
+ {
+ /* we're not ready yet, how can Bob be? */
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received %u crypto values from Bob\n",
+ (unsigned int) contained);
+
payload = (const struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
memcpy (&s->s,
&payload[0],
&payload[2 * i + 1],
sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
}
- s->transferred_element_count = contained;
+ s->cadet_received_element_count = contained;
+ GNUNET_CADET_receive_done (s->channel);
- if (s->transferred_element_count != s->used_element_count)
+ if (s->cadet_received_element_count != s->used_element_count)
{
/* More to come */
return GNUNET_OK;
element_cmp (const void *a,
const void *b)
{
- const struct MpiElement *ma = *(const struct MpiElement **) a;
- const struct MpiElement *mb = *(const struct MpiElement **) b;
+ const struct MpiElement *ma = a;
+ const struct MpiElement *mb = b;
return GNUNET_CRYPTO_hash_cmp (ma->key,
mb->key);
* Maximum number of elements we can put into a single cryptodata
* message
*/
-#define ELEMENT_CAPACITY ((GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct AliceCryptodataMessage)) / sizeof (struct GNUNET_CRYPTO_PaillierCiphertext))
+#define ELEMENT_CAPACITY ((GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE - 1 - sizeof (struct AliceCryptodataMessage)) / sizeof (struct GNUNET_CRYPTO_PaillierCiphertext))
/**
unsigned int i;
uint32_t todo_count;
gcry_mpi_t a;
+ uint32_t off;
s->sorted_elements
= GNUNET_malloc (GNUNET_CONTAINER_multihashmap_size (s->intersected_elements) *
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Finished intersection, %d items remain\n",
s->used_element_count);
- qsort (s->intersected_elements,
+ qsort (s->sorted_elements,
s->used_element_count,
sizeof (struct MpiElement),
&element_cmp);
-
- while (s->transferred_element_count < s->used_element_count)
+ off = 0;
+ while (off < s->used_element_count)
{
- todo_count = s->used_element_count - s->transferred_element_count;
+ todo_count = s->used_element_count - off;
if (todo_count > ELEMENT_CAPACITY)
todo_count = ELEMENT_CAPACITY;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending %u/%u crypto values to Bob\n",
+ (unsigned int) todo_count,
+ (unsigned int) s->used_element_count);
e = GNUNET_MQ_msg_extra (msg,
todo_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext),
msg->contained_element_count = htonl (todo_count);
payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
a = gcry_mpi_new (0);
- for (i = s->transferred_element_count; i < todo_count; i++)
+ for (i = off; i < off + todo_count; i++)
{
gcry_mpi_add (a,
s->sorted_elements[i].value,
my_offset);
- GNUNET_CRYPTO_paillier_encrypt (&my_pubkey,
- a,
- 3,
- &payload[i - s->transferred_element_count]);
+ GNUNET_assert (3 ==
+ GNUNET_CRYPTO_paillier_encrypt (&my_pubkey,
+ a,
+ 3,
+ &payload[i - off]));
}
gcry_mpi_release (a);
- s->transferred_element_count += todo_count;
+ off += todo_count;
GNUNET_MQ_send (s->cadet_mq,
e);
}
return;
case GNUNET_SET_STATUS_DONE:
s->intersection_op = NULL;
- s->intersection_set = NULL;
+ if (NULL != s->intersection_set)
+ {
+ GNUNET_SET_destroy (s->intersection_set);
+ s->intersection_set = NULL;
+ }
send_alices_cryptodata_message (s);
return;
case GNUNET_SET_STATUS_HALF_DONE:
s->intersection_listen = NULL;
}
s->intersection_op = NULL;
- s->intersection_set = NULL;
- s->active = GNUNET_SYSERR;
+ if (NULL != s->intersection_set)
+ {
+ GNUNET_SET_destroy (s->intersection_set);
+ s->intersection_set = NULL;
+ }
+ s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
prepare_client_end_notification (s);
return;
default:
if (NULL == s->intersection_op)
{
GNUNET_break (0);
- s->active = GNUNET_SYSERR;
+ s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
prepare_client_end_notification (s);
return;
}
GNUNET_SET_commit (s->intersection_op,
s->intersection_set))
{
- s->active = GNUNET_SYSERR;
+ GNUNET_break (0);
+ s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
prepare_client_end_notification (s);
return;
}
+ GNUNET_SET_destroy (s->intersection_set);
s->intersection_set = NULL;
+ GNUNET_SET_listen_cancel (s->intersection_listen);
s->intersection_listen = NULL;
}
GNUNET_CADET_OPTION_RELIABLE);
if (NULL == s->channel)
{
- s->active = GNUNET_SYSERR;
+ s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
prepare_client_end_notification (s);
return;
}
s);
if (NULL == s->intersection_listen)
{
- s->active = GNUNET_SYSERR;
+ s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
GNUNET_CADET_channel_destroy (s->channel);
s->channel = NULL;
prepare_client_end_notification (s);
e = GNUNET_MQ_msg (msg,
GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SESSION_INITIALIZATION);
msg->session_id = s->session_id;
+ msg->public_key = my_pubkey;
GNUNET_MQ_send (s->cadet_mq,
e);
}
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
{
- const struct ComputationMultipartMessage * msg;
+ const struct ComputationBobCryptodataMultipartMessage * msg;
struct AliceServiceSession *s;
uint32_t contained_count;
const struct GNUNET_SCALARPRODUCT_Element *elements;
return;
}
msize = ntohs (message->size);
- if (msize < sizeof (struct ComputationMultipartMessage))
+ if (msize < sizeof (struct ComputationBobCryptodataMultipartMessage))
{
GNUNET_break (0);
GNUNET_SERVER_receive_done (client,
GNUNET_SYSERR);
return;
}
- msg = (const struct ComputationMultipartMessage *) message;
+ msg = (const struct ComputationBobCryptodataMultipartMessage *) message;
contained_count = ntohl (msg->element_count_contained);
- if ( (msize != (sizeof (struct ComputationMultipartMessage) +
+ if ( (msize != (sizeof (struct ComputationBobCryptodataMultipartMessage) +
contained_count * sizeof (struct GNUNET_SCALARPRODUCT_Element))) ||
(0 == contained_count) ||
- (s->total == s->transferred_element_count) ||
- (s->total < s->transferred_element_count + contained_count) )
+ (s->total == s->client_received_element_count) ||
+ (s->total < s->client_received_element_count + contained_count) )
{
GNUNET_break_op (0);
GNUNET_SERVER_receive_done (client,
GNUNET_SYSERR);
return;
}
- s->transferred_element_count += contained_count;
+ s->client_received_element_count += contained_count;
elements = (const struct GNUNET_SCALARPRODUCT_Element *) &msg[1];
for (i = 0; i < contained_count; i++)
{
- if (0 == GNUNET_ntohll (elements[i].value))
- continue;
elem = GNUNET_new (struct GNUNET_SCALARPRODUCT_Element);
memcpy (elem,
&elements[i],
}
GNUNET_SERVER_receive_done (client,
GNUNET_OK);
- if (s->total != s->transferred_element_count)
+ if (s->total != s->client_received_element_count)
{
/* more to come */
return;
s = GNUNET_new (struct AliceServiceSession);
s->peer = msg->peer;
- s->active = GNUNET_YES;
+ s->status = GNUNET_SCALARPRODUCT_STATUS_ACTIVE;
s->client = client;
s->client_mq = GNUNET_MQ_queue_for_server_client (client);
s->total = total_count;
- s->transferred_element_count = contained_count;
+ s->client_received_element_count = contained_count;
s->session_id = msg->session_key;
elements = (const struct GNUNET_SCALARPRODUCT_Element *) &msg[1];
s->intersected_elements = GNUNET_CONTAINER_multihashmap_create (s->total,
s);
GNUNET_SERVER_receive_done (client,
GNUNET_OK);
- if (s->total != s->transferred_element_count)
+ if (s->total != s->client_received_element_count)
{
/* wait for multipart msg */
return;
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Shutting down, initiating cleanup.\n");
+ // FIXME: we have to cut our connections to CADET first!
if (NULL != my_cadet)
{
GNUNET_CADET_disconnect (my_cadet);
{
struct AliceServiceSession *s;
+ if (NULL == client)
+ return;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Client %p disconnected from us.\n",
client);