respect new CADET limit
[oweals/gnunet.git] / src / scalarproduct / gnunet-service-scalarproduct_bob.c
index 879e6abbd327e6d4bf24eeab7bd8327e23f0caef..2c6d607e58d45164a1f4fd20f547c4c2237bbefe 100644 (file)
@@ -145,28 +145,42 @@ struct BobServiceSession
   gcry_mpi_t product;
 
   /**
-   * How many elements we were supplied with from the client
+   * How many elements will be supplied in total from the client.
    */
   uint32_t total;
 
   /**
-   * how many elements actually are used for the scalar product.
-   * Size of the arrays in @e r and @e r_prime.
+   * Already transferred elements (received) for multipart
+   * messages from client. Always less than @e total.
+   */
+  uint32_t client_received_element_count;
+
+  /**
+   * How many elements actually are used for the scalar product.
+   * Size of the arrays in @e r and @e r_prime.  Also sometimes
+   * used as an index into the arrays during construction.
    */
   uint32_t used_element_count;
 
   /**
-   * Already transferred elements (sent/received) for multipart
-   * messages.  First used to count values received from client (less
-   * than @e total), then used to count values transmitted from Alice
-   * (less than @e used_element_count)!  FIXME: maybe separate this.
+   * Counts the number of values received from Alice by us.
+   * Always less than @e used_element_count.
+   */
+  uint32_t cadet_received_element_count;
+
+  /**
+   * Counts the number of values transmitted from us to Alice.
+   * Always less than @e used_element_count.
    */
-  uint32_t transferred_element_count;
+  uint32_t cadet_transmitted_element_count;
 
   /**
-   * Is this session active (#GNUNET_YES), Concluded (#GNUNET_NO), or had an error (#GNUNET_SYSERR)
+   * State of this session.   In
+   * #GNUNET_SCALARPRODUCT_STATUS_ACTIVE while operation is
+   * ongoing, afterwards in #GNUNET_SCALARPRODUCT_STATUS_SUCCESS or
+   * #GNUNET_SCALARPRODUCT_STATUS_FAILURE.
    */
-  int32_t active;
+  enum GNUNET_SCALARPRODUCT_ResponseStatus status;
 
   /**
    * Are we already in #destroy_service_session()?
@@ -262,12 +276,6 @@ static struct GNUNET_CONTAINER_MultiHashMap *cadet_sessions;
  */
 static struct GNUNET_CADET_Handle *my_cadet;
 
-/**
- * Certain events (callbacks for server & cadet operations) must not
- * be queued after shutdown.
- */
-static int do_shutdown;
-
 
 
 /**
@@ -299,6 +307,25 @@ find_matching_cadet_session (const struct GNUNET_HashCode *key)
 }
 
 
+/**
+ * Callback used to free the elements in the map.
+ *
+ * @param cls NULL
+ * @param key key of the element
+ * @param value the value to free
+ */
+static int
+free_element_cb (void *cls,
+                 const struct GNUNET_HashCode *key,
+                 void *value)
+{
+  struct GNUNET_SCALARPRODUCT_Element *element = value;
+
+  GNUNET_free (element);
+  return GNUNET_OK;
+}
+
+
 /**
  * Destroy session state, we are done with it.
  *
@@ -343,7 +370,9 @@ destroy_service_session (struct BobServiceSession *s)
                                                        s));
   if (NULL != s->intersected_elements)
   {
-    /* FIXME: free elements */
+    GNUNET_CONTAINER_multihashmap_iterate (s->intersected_elements,
+                                           &free_element_cb,
+                                           NULL);
     GNUNET_CONTAINER_multihashmap_destroy (s->intersected_elements);
     s->intersected_elements = NULL;
   }
@@ -443,13 +472,13 @@ prepare_client_end_notification (struct BobServiceSession *session)
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Sending session-end notification with status %d to client for session %s\n",
-              session->active,
+              session->status,
               GNUNET_h2s (&session->session_id));
   e = GNUNET_MQ_msg (msg,
                      GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
   msg->range = 0;
   msg->product_length = htonl (0);
-  msg->status = htonl (session->active);
+  msg->status = htonl (session->status);
   GNUNET_MQ_send (session->client_mq,
                   e);
 }
@@ -478,12 +507,17 @@ cb_channel_destruction (void *cls,
               "Peer disconnected, terminating session %s with peer %s\n",
               GNUNET_h2s (&in->session_id),
               GNUNET_i2s (&in->peer));
+  if (NULL != in->cadet_mq)
+  {
+    GNUNET_MQ_destroy (in->cadet_mq);
+    in->cadet_mq = NULL;
+  }
   in->channel = NULL;
   if (NULL != (s = in->s))
   {
-    if (GNUNET_YES == s->active)
+    if (GNUNET_SCALARPRODUCT_STATUS_ACTIVE == s->status)
     {
-      s->active = GNUNET_SYSERR;
+      s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
       prepare_client_end_notification (s);
     }
   }
@@ -500,7 +534,7 @@ bob_cadet_done_cb (void *cls)
 {
   struct BobServiceSession *session = cls;
 
-  session->active = GNUNET_NO; /* that means, done */
+  session->status = GNUNET_SCALARPRODUCT_STATUS_SUCCESS;
   prepare_client_end_notification (session);
 }
 
@@ -508,7 +542,7 @@ bob_cadet_done_cb (void *cls)
 /**
  * Maximum count of elements we can put into a multipart message
  */
-#define ELEMENT_CAPACITY ((GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct MultipartMessage)) / sizeof (struct GNUNET_CRYPTO_PaillierCiphertext))
+#define ELEMENT_CAPACITY ((GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE - sizeof (struct BobCryptodataMultipartMessage)) / sizeof (struct GNUNET_CRYPTO_PaillierCiphertext))
 
 
 /**
@@ -521,15 +555,15 @@ static void
 transmit_bobs_cryptodata_message_multipart (struct BobServiceSession *s)
 {
   struct GNUNET_CRYPTO_PaillierCiphertext *payload;
-  struct MultipartMessage *msg;
+  struct BobCryptodataMultipartMessage *msg;
   struct GNUNET_MQ_Envelope *e;
   unsigned int i;
   unsigned int j;
   uint32_t todo_count;
 
-  while (s->transferred_element_count != s->used_element_count)
+  while (s->cadet_transmitted_element_count != s->used_element_count)
   {
-    todo_count = s->used_element_count - s->transferred_element_count;
+    todo_count = s->used_element_count - s->cadet_transmitted_element_count;
     if (todo_count > ELEMENT_CAPACITY / 2)
       todo_count = ELEMENT_CAPACITY / 2;
 
@@ -541,7 +575,7 @@ transmit_bobs_cryptodata_message_multipart (struct BobServiceSession *s)
                              GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA_MULTIPART);
     msg->contained_element_count = htonl (todo_count);
     payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
-    for (i = s->transferred_element_count, j = 0; i < s->transferred_element_count + todo_count; i++)
+    for (i = s->cadet_transmitted_element_count, j = 0; i < s->cadet_transmitted_element_count + todo_count; i++)
     {
       //r[i][p] and r[i][q]
       memcpy (&payload[j++],
@@ -551,8 +585,8 @@ transmit_bobs_cryptodata_message_multipart (struct BobServiceSession *s)
               &s->r_prime[i],
               sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
     }
-    s->transferred_element_count += todo_count;
-    if (s->transferred_element_count == s->used_element_count)
+    s->cadet_transmitted_element_count += todo_count;
+    if (s->cadet_transmitted_element_count == s->used_element_count)
       GNUNET_MQ_notify_sent (e,
                              &bob_cadet_done_cb,
                              s);
@@ -576,30 +610,26 @@ transmit_bobs_cryptodata_message_multipart (struct BobServiceSession *s)
 static void
 transmit_bobs_cryptodata_message (struct BobServiceSession *s)
 {
-  struct ServiceResponseMessage *msg;
+  struct BobCryptodataMessage *msg;
   struct GNUNET_MQ_Envelope *e;
   struct GNUNET_CRYPTO_PaillierCiphertext *payload;
   unsigned int i;
 
-  s->transferred_element_count = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ServiceResponseMessage)) /
+  s->cadet_transmitted_element_count = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BobCryptodataMessage)) /
     (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * 2) - 2;
-  if (s->transferred_element_count > s->used_element_count)
-    s->transferred_element_count = s->used_element_count;
+  if (s->cadet_transmitted_element_count > s->used_element_count)
+    s->cadet_transmitted_element_count = s->used_element_count;
 
   e = GNUNET_MQ_msg_extra (msg,
-                           (2 + s->transferred_element_count * 2)
+                           (2 + s->cadet_transmitted_element_count * 2)
                            * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext),
                            GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA);
-  // FIXME: 'total' maybe confusing here, and should already be known to Alice
-  msg->total_element_count = htonl (s->used_element_count);
-  // FIXME: redundant!
-  msg->used_element_count = htonl (s->transferred_element_count);
-  msg->contained_element_count = htonl (s->transferred_element_count);
-  msg->key = s->session_id;
+  msg->contained_element_count = htonl (s->cadet_transmitted_element_count);
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Sending %u crypto values to Alice\n",
-              (unsigned int) s->transferred_element_count);
+              "Sending %u/%u crypto values to Alice\n",
+              (unsigned int) s->cadet_transmitted_element_count,
+              (unsigned int) s->used_element_count);
 
   payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
   memcpy (&payload[0],
@@ -611,7 +641,7 @@ transmit_bobs_cryptodata_message (struct BobServiceSession *s)
 
   payload = &payload[2];
   // convert k[][]
-  for (i = 0; i < s->transferred_element_count; i++)
+  for (i = 0; i < s->cadet_transmitted_element_count; i++)
   {
     //k[i][p] and k[i][q]
     memcpy (&payload[i * 2],
@@ -621,7 +651,7 @@ transmit_bobs_cryptodata_message (struct BobServiceSession *s)
             &s->r_prime[i],
             sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
   }
-  if (s->transferred_element_count == s->used_element_count)
+  if (s->cadet_transmitted_element_count == s->used_element_count)
     GNUNET_MQ_notify_sent (e,
                            &bob_cadet_done_cb,
                            s);
@@ -749,6 +779,7 @@ compute_service_response (struct BobServiceSession *session)
                                                         &a[q[i]],
                                                         &r_prime[i]));
   }
+  gcry_mpi_release (tmp);
 
   // Calculate S' =  E(SUM( r_i^2 ))
   tmp = compute_square_sum (rand, count);
@@ -756,6 +787,7 @@ compute_service_response (struct BobServiceSession *session)
                                   tmp,
                                   1,
                                   &session->s_prime);
+  gcry_mpi_release (tmp);
 
   // Calculate S = E(SUM( (r_i + b_i)^2 ))
   for (i = 0; i < count; i++)
@@ -765,6 +797,7 @@ compute_service_response (struct BobServiceSession *session)
                                   tmp,
                                   1,
                                   &session->s);
+  gcry_mpi_release (tmp);
 
   session->r = r;
   session->r_prime = r_prime;
@@ -772,7 +805,6 @@ compute_service_response (struct BobServiceSession *session)
   // release rand, b and a
   for (i = 0; i < count; i++)
     gcry_mpi_release (rand[i]);
-  gcry_mpi_release (tmp);
   GNUNET_free (session->e_a);
   session->e_a = NULL;
   GNUNET_free (p);
@@ -917,7 +949,7 @@ handle_alices_cryptodata_message (void *cls,
   if ( (msize != msg_length) ||
        (0 == contained_elements) ||
        (contained_elements > UINT16_MAX) ||
-       (max < contained_elements + s->transferred_element_count) )
+       (max < contained_elements + s->cadet_received_element_count) )
   {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
@@ -930,12 +962,12 @@ handle_alices_cryptodata_message (void *cls,
   if (NULL == s->e_a)
     s->e_a = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) *
                             max);
-  memcpy (&s->e_a[s->transferred_element_count],
+  memcpy (&s->e_a[s->cadet_received_element_count],
           payload,
           sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * contained_elements);
-  s->transferred_element_count += contained_elements;
+  s->cadet_received_element_count += contained_elements;
 
-  if ( (s->transferred_element_count == max) &&
+  if ( (s->cadet_received_element_count == max) &&
        (NULL == s->intersection_op) )
   {
     /* intersection has finished also on our side, and
@@ -943,6 +975,7 @@ handle_alices_cryptodata_message (void *cls,
        CADET response(s) */
     transmit_cryptographic_reply (s);
   }
+  GNUNET_CADET_receive_done (s->cadet->channel);
   return GNUNET_OK;
 }
 
@@ -982,11 +1015,12 @@ cb_intersection_element_removed (void *cls,
     return;
   case GNUNET_SET_STATUS_DONE:
     s->intersection_op = NULL;
-    s->intersection_set = NULL;
+    GNUNET_break (NULL == s->intersection_set);
+    GNUNET_CADET_receive_done (s->cadet->channel);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Finished intersection, %d items remain\n",
          GNUNET_CONTAINER_multihashmap_size (s->intersected_elements));
-    if (s->transferred_element_count ==
+    if (s->client_received_element_count ==
         GNUNET_CONTAINER_multihashmap_size (s->intersected_elements))
     {
       /* CADET transmission from Alice is also already done,
@@ -1003,8 +1037,12 @@ cb_intersection_element_removed (void *cls,
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Set intersection failed!\n");
     s->intersection_op = NULL;
-    s->intersection_set = NULL;
-    s->active = GNUNET_SYSERR;
+    if (NULL != s->intersection_set)
+    {
+      GNUNET_SET_destroy (s->intersection_set);
+      s->intersection_set = NULL;
+    }
+    s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
     prepare_client_end_notification (s);
     return;
   default:
@@ -1026,7 +1064,7 @@ start_intersection (struct BobServiceSession *s)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Got session with key %s and %u elements, starting intersection.\n",
               GNUNET_h2s (&s->session_id),
-              (unsigned int) s->transferred_element_count);
+              (unsigned int) s->total);
 
   s->intersection_op
     = GNUNET_SET_prepare (&s->cadet->peer,
@@ -1035,8 +1073,17 @@ start_intersection (struct BobServiceSession *s)
                           GNUNET_SET_RESULT_REMOVED,
                           &cb_intersection_element_removed,
                           s);
-  GNUNET_SET_commit (s->intersection_op,
-                     s->intersection_set);
+  if (GNUNET_OK !=
+      GNUNET_SET_commit (s->intersection_op,
+                         s->intersection_set))
+  {
+    GNUNET_break (0);
+    s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
+    prepare_client_end_notification (s);
+    return;
+  }
+  GNUNET_SET_destroy (s->intersection_set);
+  s->intersection_set = NULL;
 }
 
 
@@ -1094,7 +1141,7 @@ handle_alices_computation_request (void *cls,
   /* pair them up */
   in->s = s;
   s->cadet = in;
-  if (s->transferred_element_count == s->total)
+  if (s->client_received_element_count == s->total)
     start_intersection (s);
   return GNUNET_OK;
 }
@@ -1127,6 +1174,7 @@ cb_channel_incoming (void *cls,
   in = GNUNET_new (struct CadetIncomingSession);
   in->peer = *initiator;
   in->channel = channel;
+  in->cadet_mq = GNUNET_CADET_mq_create (in->channel);
   return in;
 }
 
@@ -1144,7 +1192,7 @@ GSS_handle_bob_client_message_multipart (void *cls,
                                          struct GNUNET_SERVER_Client *client,
                                          const struct GNUNET_MessageHeader *message)
 {
-  const struct ComputationMultipartMessage * msg;
+  const struct ComputationBobCryptodataMultipartMessage * msg;
   struct BobServiceSession *s;
   uint32_t contained_count;
   const struct GNUNET_SCALARPRODUCT_Element *elements;
@@ -1164,22 +1212,22 @@ GSS_handle_bob_client_message_multipart (void *cls,
     return;
   }
   msize = ntohs (message->size);
-  if (msize < sizeof (struct ComputationMultipartMessage))
+  if (msize < sizeof (struct ComputationBobCryptodataMultipartMessage))
   {
     GNUNET_break (0);
     GNUNET_SERVER_receive_done (client,
                                 GNUNET_SYSERR);
     return;
   }
-  msg = (const struct ComputationMultipartMessage *) message;
+  msg = (const struct ComputationBobCryptodataMultipartMessage *) message;
   contained_count = ntohl (msg->element_count_contained);
 
-  if ( (msize != (sizeof (struct ComputationMultipartMessage) +
+  if ( (msize != (sizeof (struct ComputationBobCryptodataMultipartMessage) +
                   contained_count * sizeof (struct GNUNET_SCALARPRODUCT_Element))) ||
        (0 == contained_count) ||
        (UINT16_MAX < contained_count) ||
-       (s->total == s->transferred_element_count) ||
-       (s->total < s->transferred_element_count + contained_count) )
+       (s->total == s->client_received_element_count) ||
+       (s->total < s->client_received_element_count + contained_count) )
   {
     GNUNET_break_op (0);
     GNUNET_SERVER_receive_done (client,
@@ -1212,10 +1260,10 @@ GSS_handle_bob_client_message_multipart (void *cls,
                             &set_elem,
                             NULL, NULL);
   }
-  s->transferred_element_count += contained_count;
+  s->client_received_element_count += contained_count;
   GNUNET_SERVER_receive_done (client,
                               GNUNET_OK);
-  if (s->total != s->transferred_element_count)
+  if (s->total != s->client_received_element_count)
   {
     /* more to come */
     return;
@@ -1296,11 +1344,11 @@ GSS_handle_bob_client_message (void *cls,
   }
 
   s = GNUNET_new (struct BobServiceSession);
-  s->active = GNUNET_YES;
+  s->status = GNUNET_SCALARPRODUCT_STATUS_ACTIVE;
   s->client = client;
   s->client_mq = GNUNET_MQ_queue_for_server_client (client);
   s->total = total_count;
-  s->transferred_element_count = contained_count;
+  s->client_received_element_count = contained_count;
   s->session_id = msg->session_key;
   GNUNET_break (GNUNET_YES ==
                 GNUNET_CONTAINER_multihashmap_put (client_sessions,
@@ -1342,7 +1390,7 @@ GSS_handle_bob_client_message (void *cls,
                                          s);
   GNUNET_SERVER_receive_done (client,
                               GNUNET_YES);
-  if (s->total != s->transferred_element_count)
+  if (s->total != s->client_received_element_count)
   {
     /* multipart msg */
     return;
@@ -1373,8 +1421,7 @@ shutdown_task (void *cls,
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Shutting down, initiating cleanup.\n");
-  do_shutdown = GNUNET_YES;
-  // FIXME: is there a need to shutdown active sessions?
+  // FIXME: do we have to cut our connections to CADET first?
   if (NULL != my_cadet)
   {
     GNUNET_CADET_disconnect (my_cadet);