gcry_mpi_t product;
/**
- * How many elements we were supplied with from the client
+ * How many elements will be supplied in total from the client.
*/
uint32_t total;
/**
- * how many elements actually are used for the scalar product.
- * Size of the arrays in @e r and @e r_prime.
+ * Already transferred elements (received) for multipart
+ * messages from client. Always less than @e total.
+ */
+ uint32_t client_received_element_count;
+
+ /**
+ * How many elements actually are used for the scalar product.
+ * Size of the arrays in @e r and @e r_prime. Also sometimes
+ * used as an index into the arrays during construction.
*/
uint32_t used_element_count;
/**
- * already transferred elements (sent/received) for multipart messages, less or equal than @e used_element_count for
+ * Counts the number of values received from Alice by us.
+ * Always less than @e used_element_count.
+ */
+ uint32_t cadet_received_element_count;
+
+ /**
+ * Counts the number of values transmitted from us to Alice.
+ * Always less than @e used_element_count.
*/
- uint32_t transferred_element_count;
+ uint32_t cadet_transmitted_element_count;
/**
- * Is this session active (#GNUNET_YES), Concluded (#GNUNET_NO), or had an error (#GNUNET_SYSERR)
+ * State of this session. In
+ * #GNUNET_SCALARPRODUCT_STATUS_ACTIVE while operation is
+ * ongoing, afterwards in #GNUNET_SCALARPRODUCT_STATUS_SUCCESS or
+ * #GNUNET_SCALARPRODUCT_STATUS_FAILURE.
*/
- int32_t active;
+ enum GNUNET_SCALARPRODUCT_ResponseStatus status;
/**
* Are we already in #destroy_service_session()?
*/
static struct GNUNET_CADET_Handle *my_cadet;
-/**
- * Certain events (callbacks for server & cadet operations) must not
- * be queued after shutdown.
- */
-static int do_shutdown;
-
/**
}
+/**
+ * Callback used to free the elements in the map.
+ *
+ * @param cls NULL
+ * @param key key of the element
+ * @param value the value to free
+ */
+static int
+free_element_cb (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GNUNET_SCALARPRODUCT_Element *element = value;
+
+ GNUNET_free (element);
+ return GNUNET_OK;
+}
+
+
/**
* Destroy session state, we are done with it.
*
s));
if (NULL != s->intersected_elements)
{
- /* FIXME: free elements */
+ GNUNET_CONTAINER_multihashmap_iterate (s->intersected_elements,
+ &free_element_cb,
+ NULL);
GNUNET_CONTAINER_multihashmap_destroy (s->intersected_elements);
s->intersected_elements = NULL;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending session-end notification with status %d to client for session %s\n",
- session->active,
+ session->status,
GNUNET_h2s (&session->session_id));
e = GNUNET_MQ_msg (msg,
GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
msg->range = 0;
msg->product_length = htonl (0);
- msg->status = htonl (session->active);
+ msg->status = htonl (session->status);
GNUNET_MQ_send (session->client_mq,
e);
}
"Peer disconnected, terminating session %s with peer %s\n",
GNUNET_h2s (&in->session_id),
GNUNET_i2s (&in->peer));
+ if (NULL != in->cadet_mq)
+ {
+ GNUNET_MQ_destroy (in->cadet_mq);
+ in->cadet_mq = NULL;
+ }
in->channel = NULL;
if (NULL != (s = in->s))
{
- if (GNUNET_YES == s->active)
+ if (GNUNET_SCALARPRODUCT_STATUS_ACTIVE == s->status)
{
- s->active = GNUNET_SYSERR;
+ s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
prepare_client_end_notification (s);
}
}
{
struct BobServiceSession *session = cls;
- session->active = GNUNET_NO; /* that means, done */
+ session->status = GNUNET_SCALARPRODUCT_STATUS_SUCCESS;
prepare_client_end_notification (session);
}
/**
* Maximum count of elements we can put into a multipart message
*/
-#define ELEMENT_CAPACITY ((GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct MultipartMessage)) / sizeof (struct GNUNET_CRYPTO_PaillierCiphertext))
+#define ELEMENT_CAPACITY ((GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE - sizeof (struct BobCryptodataMultipartMessage)) / sizeof (struct GNUNET_CRYPTO_PaillierCiphertext))
/**
transmit_bobs_cryptodata_message_multipart (struct BobServiceSession *s)
{
struct GNUNET_CRYPTO_PaillierCiphertext *payload;
- struct MultipartMessage *msg;
+ struct BobCryptodataMultipartMessage *msg;
struct GNUNET_MQ_Envelope *e;
unsigned int i;
unsigned int j;
uint32_t todo_count;
- while (s->transferred_element_count != s->used_element_count)
+ while (s->cadet_transmitted_element_count != s->used_element_count)
{
- todo_count = s->used_element_count - s->transferred_element_count;
+ todo_count = s->used_element_count - s->cadet_transmitted_element_count;
if (todo_count > ELEMENT_CAPACITY / 2)
todo_count = ELEMENT_CAPACITY / 2;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending %u additional crypto values to Alice\n",
+ (unsigned int) todo_count);
e = GNUNET_MQ_msg_extra (msg,
todo_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * 2,
GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA_MULTIPART);
msg->contained_element_count = htonl (todo_count);
payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
- for (i = s->transferred_element_count, j = 0; i < s->transferred_element_count + todo_count; i++)
+ for (i = s->cadet_transmitted_element_count, j = 0; i < s->cadet_transmitted_element_count + todo_count; i++)
{
//r[i][p] and r[i][q]
memcpy (&payload[j++],
&s->r_prime[i],
sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
}
- s->transferred_element_count += todo_count;
- if (s->transferred_element_count == s->used_element_count)
+ s->cadet_transmitted_element_count += todo_count;
+ if (s->cadet_transmitted_element_count == s->used_element_count)
GNUNET_MQ_notify_sent (e,
&bob_cadet_done_cb,
s);
static void
transmit_bobs_cryptodata_message (struct BobServiceSession *s)
{
- struct ServiceResponseMessage *msg;
+ struct BobCryptodataMessage *msg;
struct GNUNET_MQ_Envelope *e;
struct GNUNET_CRYPTO_PaillierCiphertext *payload;
unsigned int i;
- s->transferred_element_count = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ServiceResponseMessage)) /
+ s->cadet_transmitted_element_count = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BobCryptodataMessage)) /
(sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * 2) - 2;
- if (s->transferred_element_count > s->used_element_count)
- s->transferred_element_count = s->used_element_count;
+ if (s->cadet_transmitted_element_count > s->used_element_count)
+ s->cadet_transmitted_element_count = s->used_element_count;
e = GNUNET_MQ_msg_extra (msg,
- (2 + s->transferred_element_count * 2)
+ (2 + s->cadet_transmitted_element_count * 2)
* sizeof (struct GNUNET_CRYPTO_PaillierCiphertext),
GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA);
- msg->total_element_count = htonl (s->total);
- msg->used_element_count = htonl (s->used_element_count);
- msg->contained_element_count = htonl (s->transferred_element_count);
- msg->key = s->session_id;
+ msg->contained_element_count = htonl (s->cadet_transmitted_element_count);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending %u/%u crypto values to Alice\n",
+ (unsigned int) s->cadet_transmitted_element_count,
+ (unsigned int) s->used_element_count);
payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
memcpy (&payload[0],
payload = &payload[2];
// convert k[][]
- for (i = 0; i < s->transferred_element_count; i++)
+ for (i = 0; i < s->cadet_transmitted_element_count; i++)
{
//k[i][p] and k[i][q]
memcpy (&payload[i * 2],
&s->r_prime[i],
sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
}
- if (s->transferred_element_count == s->used_element_count)
+ if (s->cadet_transmitted_element_count == s->used_element_count)
GNUNET_MQ_notify_sent (e,
&bob_cadet_done_cb,
s);
&a[q[i]],
&r_prime[i]));
}
+ gcry_mpi_release (tmp);
// Calculate S' = E(SUM( r_i^2 ))
tmp = compute_square_sum (rand, count);
tmp,
1,
&session->s_prime);
+ gcry_mpi_release (tmp);
// Calculate S = E(SUM( (r_i + b_i)^2 ))
for (i = 0; i < count; i++)
tmp,
1,
&session->s);
+ gcry_mpi_release (tmp);
session->r = r;
session->r_prime = r_prime;
// release rand, b and a
for (i = 0; i < count; i++)
gcry_mpi_release (rand[i]);
- gcry_mpi_release (tmp);
GNUNET_free (session->e_a);
session->e_a = NULL;
GNUNET_free (p);
element_cmp (const void *a,
const void *b)
{
- const struct MpiElement *ma = *(const struct MpiElement **) a;
- const struct MpiElement *mb = *(const struct MpiElement **) b;
+ const struct MpiElement *ma = a;
+ const struct MpiElement *mb = b;
return GNUNET_CRYPTO_hash_cmp (ma->key,
mb->key);
transmit_cryptographic_reply (struct BobServiceSession *s)
{
/* TODO: code duplication with Alice! */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received everything, building reply for Alice\n");
s->sorted_elements
= GNUNET_malloc (GNUNET_CONTAINER_multihashmap_size (s->intersected_elements) *
sizeof (struct MpiElement));
GNUNET_CONTAINER_multihashmap_iterate (s->intersected_elements,
©_element_cb,
s);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Finished intersection, %d items remain\n",
- s->used_element_count);
- qsort (s->intersected_elements,
+ qsort (s->sorted_elements,
s->used_element_count,
sizeof (struct MpiElement),
&element_cmp);
if ( (msize != msg_length) ||
(0 == contained_elements) ||
(contained_elements > UINT16_MAX) ||
- (max < contained_elements + s->transferred_element_count) )
+ (max < contained_elements + s->cadet_received_element_count) )
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received %u crypto values from Alice\n",
+ (unsigned int) contained_elements);
payload = (const struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
if (NULL == s->e_a)
s->e_a = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) *
max);
- memcpy (&s->e_a[s->transferred_element_count],
+ memcpy (&s->e_a[s->cadet_received_element_count],
payload,
sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * contained_elements);
- s->transferred_element_count += contained_elements;
+ s->cadet_received_element_count += contained_elements;
- if ( (s->transferred_element_count == max) &&
+ if ( (s->cadet_received_element_count == max) &&
(NULL == s->intersection_op) )
{
/* intersection has finished also on our side, and
CADET response(s) */
transmit_cryptographic_reply (s);
}
+ GNUNET_CADET_receive_done (s->cadet->channel);
return GNUNET_OK;
}
/* this element has been removed from the set */
se = GNUNET_CONTAINER_multihashmap_get (s->intersected_elements,
element->data);
+ GNUNET_assert (NULL != se);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Removed element with key %s and value %lld\n",
GNUNET_h2s (&se->key),
return;
case GNUNET_SET_STATUS_DONE:
s->intersection_op = NULL;
- s->intersection_set = NULL;
- if (s->transferred_element_count ==
+ GNUNET_break (NULL == s->intersection_set);
+ GNUNET_CADET_receive_done (s->cadet->channel);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Finished intersection, %d items remain\n",
+ GNUNET_CONTAINER_multihashmap_size (s->intersected_elements));
+ if (s->client_received_element_count ==
GNUNET_CONTAINER_multihashmap_size (s->intersected_elements))
{
/* CADET transmission from Alice is also already done,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Set intersection failed!\n");
s->intersection_op = NULL;
- s->intersection_set = NULL;
- s->active = GNUNET_SYSERR;
+ if (NULL != s->intersection_set)
+ {
+ GNUNET_SET_destroy (s->intersection_set);
+ s->intersection_set = NULL;
+ }
+ s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
prepare_client_end_notification (s);
return;
default:
start_intersection (struct BobServiceSession *s)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Got session with key %s and a matching element set, processing.\n",
- GNUNET_h2s (&s->session_id));
+ "Got session with key %s and %u elements, starting intersection.\n",
+ GNUNET_h2s (&s->session_id),
+ (unsigned int) s->total);
s->intersection_op
= GNUNET_SET_prepare (&s->cadet->peer,
GNUNET_SET_RESULT_REMOVED,
&cb_intersection_element_removed,
s);
- GNUNET_SET_commit (s->intersection_op,
- s->intersection_set);
+ if (GNUNET_OK !=
+ GNUNET_SET_commit (s->intersection_op,
+ s->intersection_set))
+ {
+ GNUNET_break (0);
+ s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
+ prepare_client_end_notification (s);
+ return;
+ }
+ GNUNET_SET_destroy (s->intersection_set);
+ s->intersection_set = NULL;
}
/* pair them up */
in->s = s;
s->cadet = in;
- if (s->transferred_element_count == s->total)
+ if (s->client_received_element_count == s->total)
start_intersection (s);
return GNUNET_OK;
}
in = GNUNET_new (struct CadetIncomingSession);
in->peer = *initiator;
in->channel = channel;
+ in->cadet_mq = GNUNET_CADET_mq_create (in->channel);
return in;
}
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
{
- const struct ComputationMultipartMessage * msg;
+ const struct ComputationBobCryptodataMultipartMessage * msg;
struct BobServiceSession *s;
uint32_t contained_count;
const struct GNUNET_SCALARPRODUCT_Element *elements;
return;
}
msize = ntohs (message->size);
- if (msize < sizeof (struct ComputationMultipartMessage))
+ if (msize < sizeof (struct ComputationBobCryptodataMultipartMessage))
{
GNUNET_break (0);
GNUNET_SERVER_receive_done (client,
GNUNET_SYSERR);
return;
}
- msg = (const struct ComputationMultipartMessage *) message;
+ msg = (const struct ComputationBobCryptodataMultipartMessage *) message;
contained_count = ntohl (msg->element_count_contained);
- if ( (msize != (sizeof (struct ComputationMultipartMessage) +
+ if ( (msize != (sizeof (struct ComputationBobCryptodataMultipartMessage) +
contained_count * sizeof (struct GNUNET_SCALARPRODUCT_Element))) ||
(0 == contained_count) ||
(UINT16_MAX < contained_count) ||
- (s->total == s->transferred_element_count) ||
- (s->total < s->transferred_element_count + contained_count) )
+ (s->total == s->client_received_element_count) ||
+ (s->total < s->client_received_element_count + contained_count) )
{
GNUNET_break_op (0);
GNUNET_SERVER_receive_done (client,
&set_elem,
NULL, NULL);
}
- s->transferred_element_count += contained_count;
+ s->client_received_element_count += contained_count;
GNUNET_SERVER_receive_done (client,
GNUNET_OK);
- if (s->total != s->transferred_element_count)
+ if (s->total != s->client_received_element_count)
{
/* more to come */
return;
}
s = GNUNET_new (struct BobServiceSession);
- s->active = GNUNET_YES;
+ s->status = GNUNET_SCALARPRODUCT_STATUS_ACTIVE;
s->client = client;
s->client_mq = GNUNET_MQ_queue_for_server_client (client);
s->total = total_count;
- s->transferred_element_count = contained_count;
+ s->client_received_element_count = contained_count;
s->session_id = msg->session_key;
GNUNET_break (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_put (client_sessions,
s);
GNUNET_SERVER_receive_done (client,
GNUNET_YES);
- if (s->total != s->transferred_element_count)
+ if (s->total != s->client_received_element_count)
{
/* multipart msg */
return;
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Shutting down, initiating cleanup.\n");
- do_shutdown = GNUNET_YES;
- // FIXME: is there a need to shutdown active sessions?
+ // FIXME: do we have to cut our connections to CADET first?
if (NULL != my_cadet)
{
GNUNET_CADET_disconnect (my_cadet);