made the service more resilient against out of order and simply incorrect messages
[oweals/gnunet.git] / src / scalarproduct / gnunet-service-scalarproduct.c
index 518ff5f3dec4fb1f06557336b59c8f88db675260..46567abed17a8cc03b4c62eeae07147febb28f8d 100644 (file)
@@ -168,6 +168,16 @@ struct ServiceSession
   * Bob's permutation q of R
   */
  gcry_mpi_t * r_prime;
+ /**
+  * Bob's s
+  */
+ gcry_mpi_t s;
+ /**
+  * Bob's s'
+  */
+ gcry_mpi_t s_prime;
 
  /**
   * Bobs matching response session from the client
@@ -1709,8 +1719,7 @@ tunnel_destruction_handler (void *cls,
  * @return product as MPI, never NULL
  */
 static gcry_mpi_t
-compute_scalar_product (struct ServiceSession * session,
-                        gcry_mpi_t * r, gcry_mpi_t * r_prime, gcry_mpi_t s, gcry_mpi_t s_prime)
+compute_scalar_product (struct ServiceSession * session)
 {
   uint32_t count;
   gcry_mpi_t t;
@@ -1727,12 +1736,12 @@ compute_scalar_product (struct ServiceSession * session,
   // from the E(a_pi)(+)E(-b_pi-r_pi) and E(a_qi)(+)E(-r_qi) twice each,
   // the result is E((S + a_pi) + (S -b_pi-r_pi)) and E(S + a_qi + S - r_qi)
   for (i = 0; i < count; i++) {
-    decrypt_element (r[i], r[i], my_mu, my_lambda, my_n, my_nsquare);
-    gcry_mpi_sub (r[i], r[i], my_offset);
-    gcry_mpi_sub (r[i], r[i], my_offset);
-    decrypt_element (r_prime[i], r_prime[i], my_mu, my_lambda, my_n, my_nsquare);
-    gcry_mpi_sub (r_prime[i], r_prime[i], my_offset);
-    gcry_mpi_sub (r_prime[i], r_prime[i], my_offset);
+    decrypt_element (session->r[i], session->r[i], my_mu, my_lambda, my_n, my_nsquare);
+    gcry_mpi_sub (session->r[i], session->r[i], my_offset);
+    gcry_mpi_sub (session->r[i], session->r[i], my_offset);
+    decrypt_element (session->r_prime[i], session->r_prime[i], my_mu, my_lambda, my_n, my_nsquare);
+    gcry_mpi_sub (session->r_prime[i], session->r_prime[i], my_offset);
+    gcry_mpi_sub (session->r_prime[i], session->r_prime[i], my_offset);
   }
 
   // calculate t = sum(ai)
@@ -1740,28 +1749,28 @@ compute_scalar_product (struct ServiceSession * session,
 
   // calculate U
   u = gcry_mpi_new (0);
-  tmp = compute_square_sum (r, count);
+  tmp = compute_square_sum (session->r, count);
   gcry_mpi_sub (u, u, tmp);
   gcry_mpi_release (tmp);
 
   //calculate U'
   utick = gcry_mpi_new (0);
-  tmp = compute_square_sum (r_prime, count);
+  tmp = compute_square_sum (session->r_prime, count);
   gcry_mpi_sub (utick, utick, tmp);
 
   GNUNET_assert (p = gcry_mpi_new (0));
   GNUNET_assert (ptick = gcry_mpi_new (0));
 
   // compute P
-  decrypt_element (ss, my_mu, my_lambda, my_n, my_nsquare);
-  decrypt_element (s_prime, s_prime, my_mu, my_lambda, my_n, my_nsquare);
+  decrypt_element (session->s, session->s, my_mu, my_lambda, my_n, my_nsquare);
+  decrypt_element (session->s_prime, session->s_prime, my_mu, my_lambda, my_n, my_nsquare);
 
   // compute P
-  gcry_mpi_add (p, s, t);
+  gcry_mpi_add (p, session->s, t);
   gcry_mpi_add (p, p, u);
 
   // compute P'
-  gcry_mpi_add (ptick, s_prime, t);
+  gcry_mpi_add (ptick, session->s_prime, t);
   gcry_mpi_add (ptick, ptick, utick);
 
   gcry_mpi_release (t);
@@ -1892,6 +1901,88 @@ handle_service_request_multipart (void *cls,
                                   void **tunnel_ctx,
                                   const struct GNUNET_MessageHeader * message)
 {
+  struct ServiceSession * session;
+  const struct GNUNET_SCALARPRODUCT_multipart_message * msg = (const struct GNUNET_SCALARPRODUCT_multipart_message *) message;
+  uint32_t used_elements;
+  uint32_t contained_elements=0;
+  uint32_t msg_length;
+  unsigned char * current;
+  int32_t i = -1;
+  // are we in the correct state?
+  session = (struct ServiceSession *) * tunnel_ctx;
+  if ((BOB != session->role) || (WAITING_FOR_MULTIPART_TRANSMISSION != session->state)) {
+    GNUNET_break_op (0);
+    return GNUNET_OK;
+  }
+  // shorter than minimum?
+  if (ntohs (msg->header.size) <= sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)) {
+    goto except;
+  }
+  used_elements = session->used_element_count;
+  contained_elements = ntohl (msg->multipart_element_count);
+  msg_length = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)
+          + contained_elements * PAILLIER_ELEMENT_LENGTH;
+  //sanity check
+  if (( ntohs (msg->header.size) != msg_length) 
+       || (used_elements < contained_elements + session->transferred_element_count)) {
+    goto except;
+  }
+  current = (unsigned char *) &msg[1];
+  if (contained_elements != 0) {
+    gcry_error_t ret = 0;
+    // Convert each vector element to MPI_value
+    for (i = session->transferred_element_count; i < session->transferred_element_count+contained_elements; i++) {
+      size_t read = 0;
+
+      ret = gcry_mpi_scan (&session->a[i],
+                           GCRYMPI_FMT_USG,
+                           &current[i * PAILLIER_ELEMENT_LENGTH],
+                           PAILLIER_ELEMENT_LENGTH,
+                           &read);
+      if (ret) {
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _ ("Could not translate E[a%d] to MPI!\n%s/%s\n"),
+                    i, gcry_strsource (ret), gcry_strerror (ret));
+        goto except;
+      }
+    }
+    session->transferred_element_count+=contained_elements;
+    
+    if (session->transferred_element_count == used_elements) {
+      // single part finished
+      session->state = SERVICE_REQUEST_RECEIVED;
+      if (session->response) {
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Got session with key %s and a matching element set, processing.\n"), GNUNET_h2s (&session->key));
+        if (GNUNET_OK != compute_service_response (session, session->response)) {
+          //something went wrong, remove it again...
+          GNUNET_CONTAINER_DLL_remove (from_service_head, from_service_tail, session);
+          goto except;
+        }
+      }
+      else
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Got session with key %s without a matching element set, queueing.\n"), GNUNET_h2s (&session->key));
+    }
+    else{
+      // multipart message
+    }
+  }
+  
+  return GNUNET_OK;
+except:
+  for (i = 0; i < session->transferred_element_count + contained_elements; i++)
+    if (session->a[i])
+      gcry_mpi_release (session->a[i]);
+  gcry_sexp_release (session->remote_pubkey);
+  session->remote_pubkey = NULL;
+  GNUNET_free (session->a);
+  session->a = NULL;
+  free_session (session);
+  // and notify our client-session that we could not complete the session
+  if (session->response)
+    // we just found the responder session in this queue
+    session->response->client_notification_task =
+          GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
+                                    session->response);
   return GNUNET_SYSERR;
 }
 
@@ -1922,19 +2013,13 @@ handle_service_request (void *cls,
   uint32_t element_count;
   uint32_t msg_length;
   unsigned char * current;
-  struct ServiceSession * responder_session;
   int32_t i = -1;
   enum SessionState needed_state;
 
   session = (struct ServiceSession *) * tunnel_ctx;
-  if (BOB != session->role) {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  // is this tunnel already in use?
-  if ((session->next) || (from_service_head == session)) {
+  if (WAITING_FOR_SERVICE_REQUEST != session->state) {
     GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+    return GNUNET_OK;
   }
   // Check if message was sent by me, which would be bad!
   if (!memcmp (&session->peer, &me, sizeof (struct GNUNET_PeerIdentity))) {
@@ -1975,8 +2060,7 @@ handle_service_request (void *cls,
   }
 
   memcpy (&session->peer, &session->peer, sizeof (struct GNUNET_PeerIdentity));
-  session->state = SERVICE_REQUEST_RECEIVED;
-  session->element_count = ntohl (msg->element_count);
+  session->element_count = element_count;
   session->used_element_count = used_elements;
   session->transferred_element_count = contained_elements;
   session->tunnel = tunnel;
@@ -2002,18 +2086,17 @@ handle_service_request (void *cls,
 
   //check if service queue contains a matching request
   needed_state = CLIENT_RESPONSE_RECEIVED;
-  responder_session = find_matching_session (from_client_tail,
+  session->response = find_matching_session (from_client_tail,
                                              &session->key,
                                              session->element_count,
                                              &needed_state, NULL);
 
   session->a = GNUNET_malloc (sizeof (gcry_mpi_t) * used_elements);
-
+  session->state = WAITING_FOR_MULTIPART_TRANSMISSION; 
   if (contained_elements != 0) {
     gcry_error_t ret = 0;
-    session->a = GNUNET_malloc (sizeof (gcry_mpi_t) * used_elements);
     // Convert each vector element to MPI_value
-    for (i = 0; i < used_elements; i++) {
+    for (i = 0; i < contained_elements; i++) {
       size_t read = 0;
 
       ret = gcry_mpi_scan (&session->a[i],
@@ -2031,9 +2114,10 @@ handle_service_request (void *cls,
     
     if (contained_elements == used_elements) {
       // single part finished
-      if (responder_session) {
+      session->state = SERVICE_REQUEST_RECEIVED;
+      if (session->response) {
         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Got session with key %s and a matching element set, processing.\n"), GNUNET_h2s (&session->key));
-        if (GNUNET_OK != compute_service_response (session, responder_session)) {
+        if (GNUNET_OK != compute_service_response (session, session->response)) {
           //something went wrong, remove it again...
           GNUNET_CONTAINER_DLL_remove (from_service_head, from_service_tail, session);
           goto except;
@@ -2044,26 +2128,24 @@ handle_service_request (void *cls,
     }
     else{
       // multipart message
-      
     }
-
-    return GNUNET_OK;
   }
+  return GNUNET_OK;
 except:
-  for (i = 0; i < used_elements; i++)
+  for (i = 0; i < contained_elements; i++)
     if (session->a[i])
       gcry_mpi_release (session->a[i]);
   gcry_sexp_release (session->remote_pubkey);
   session->remote_pubkey = NULL;
-  GNUNET_free_non_null (session->a);
+  GNUNET_free (session->a);
   session->a = NULL;
   free_session (session);
   // and notify our client-session that we could not complete the session
-  if (responder_session)
+  if (session->response)
     // we just found the responder session in this queue
-    responder_session->client_notification_task =
+    session->response->client_notification_task =
           GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
-                                    responder_session);
+                                    session->response);
   return GNUNET_SYSERR;
 }
 
@@ -2085,6 +2167,78 @@ handle_service_response_multipart (void *cls,
                                    void **tunnel_ctx,
                                    const struct GNUNET_MessageHeader * message)
 {
+  struct ServiceSession * session;
+  const struct GNUNET_SCALARPRODUCT_multipart_message * msg = (const struct GNUNET_SCALARPRODUCT_multipart_message *) message;
+  unsigned char * current;
+  size_t read;
+  size_t i;
+  uint32_t contained_element_count=0;
+  size_t msg_size;
+  int rc;
+
+  GNUNET_assert (NULL != message);
+  // are we in the correct state?
+  session = (struct ServiceSession *) * tunnel_ctx;
+  if ((ALICE != session->role) || (WAITING_FOR_MULTIPART_TRANSMISSION != session->state)) {
+    GNUNET_break_op (0);
+    return GNUNET_OK;
+  }
+  // shorter than minimum?
+  if (ntohs (msg->header.size) <= sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)) {
+    goto except;
+  }
+  contained_element_count = ntohl (msg->multipart_element_count);
+  msg_size = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)
+          + 2 * contained_element_count * PAILLIER_ELEMENT_LENGTH;
+  //sanity check: is the message as long as the message_count fields suggests?
+  if ((ntohs (msg->header.size) != msg_size) || (session->used_element_count < contained_element_count)) {
+    goto except;
+  }
+  current = (unsigned char *) &msg[1];
+  // Convert each k[][perm] to its MPI_value
+  for (i = 0; i < contained_element_count; i++) {
+    if (0 != (rc = gcry_mpi_scan (&session->r[i], GCRYMPI_FMT_USG, current,
+                                  PAILLIER_ELEMENT_LENGTH, &read))) {
+      LOG_GCRY (GNUNET_ERROR_TYPE_DEBUG, "gcry_mpi_scan", rc);
+      GNUNET_break_op (0);
+      goto except;
+    }
+    current += PAILLIER_ELEMENT_LENGTH;
+    if (0 != (rc = gcry_mpi_scan (&session->r_prime[i], GCRYMPI_FMT_USG, current,
+                                  PAILLIER_ELEMENT_LENGTH, &read))) {
+      LOG_GCRY (GNUNET_ERROR_TYPE_DEBUG, "gcry_mpi_scan", rc);
+      GNUNET_break_op (0);
+      goto except;
+    }
+    current += PAILLIER_ELEMENT_LENGTH;
+  }
+  session->transferred_element_count += contained_element_count;
+  if (session->transferred_element_count != session->used_element_count)
+    return GNUNET_OK;
+  session->state = SERVICE_RESPONSE_RECEIVED;
+  session->product = compute_scalar_product (session);
+  return GNUNET_SYSERR; // terminate the tunnel right away, we are done here!
+except:
+  GNUNET_break_op (0);
+  if (session->s)
+    gcry_mpi_release (session->s);
+  if (session->s_prime)
+    gcry_mpi_release (session->s_prime);
+  for (i = 0; session->r && i < session->transferred_element_count; i++)
+    if (session->r[i]) gcry_mpi_release (session->r[i]);
+  for (i = 0; session->r_prime && i < session->transferred_element_count; i++)
+    if (session->r_prime[i]) gcry_mpi_release (session->r_prime[i]);
+  GNUNET_free_non_null (session->r);
+  GNUNET_free_non_null (session->r_prime);
+
+  session->tunnel = NULL;
+  // send message with product to client
+  session->client_notification_task =
+          GNUNET_SCHEDULER_add_now (&prepare_client_response,
+                                    session);
+  // the tunnel has done its job, terminate our connection and the tunnel
+  // the peer will be notified that the tunnel was destroyed via tunnel_destruction_handler
+  // just close the connection, as recommended by Christian
   return GNUNET_SYSERR;
 }
 
@@ -2109,29 +2263,19 @@ handle_service_response (void *cls,
   struct ServiceSession * session;
   const struct GNUNET_SCALARPRODUCT_service_response * msg = (const struct GNUNET_SCALARPRODUCT_service_response *) message;
   unsigned char * current;
-  uint32_t count;
-  gcry_mpi_t s = NULL;
-  gcry_mpi_t s_prime = NULL;
   size_t read;
   size_t i;
-  uint32_t contained_element_count;
+  uint32_t contained_element_count=0;
   size_t msg_size;
-  gcry_mpi_t * r = NULL;
-  gcry_mpi_t * r_prime = NULL;
   int rc;
 
   GNUNET_assert (NULL != message);
   session = (struct ServiceSession *) * tunnel_ctx;
-  if (ALICE != session->role) {
+  if (session->state != WAITING_FOR_SERVICE_REQUEST) {
     GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+    return GNUNET_OK;
   }
-
-  count = session->used_element_count;
-  session->product = NULL;
-  session->state = SERVICE_RESPONSE_RECEIVED;
-
-  //we need at least a peer and one message id to compare
+  //we need at least a full message
   if (sizeof (struct GNUNET_SCALARPRODUCT_service_response) > ntohs (msg->header.size)) {
     GNUNET_break_op (0);
     goto invalid_msg;
@@ -2141,14 +2285,15 @@ handle_service_response (void *cls,
           + 2 * contained_element_count * PAILLIER_ELEMENT_LENGTH
           + 2 * PAILLIER_ELEMENT_LENGTH;
   //sanity check: is the message as long as the message_count fields suggests?
-  if ((ntohs (msg->header.size) != msg_size) || (count != contained_element_count)) {
+  if ((ntohs (msg->header.size) != msg_size) || (session->used_element_count < contained_element_count)) {
     GNUNET_break_op (0);
     goto invalid_msg;
   }
-
+  session->state = WAITING_FOR_MULTIPART_TRANSMISSION;
+  session->transferred_element_count = contained_element_count;
   //convert s
   current = (unsigned char *) &msg[1];
-  if (0 != (rc = gcry_mpi_scan (&s, GCRYMPI_FMT_USG, current,
+  if (0 != (rc = gcry_mpi_scan (&session->s, GCRYMPI_FMT_USG, current,
                                 PAILLIER_ELEMENT_LENGTH, &read))) {
     LOG_GCRY (GNUNET_ERROR_TYPE_DEBUG, "gcry_mpi_scan", rc);
     GNUNET_break_op (0);
@@ -2156,31 +2301,25 @@ handle_service_response (void *cls,
   }
   current += PAILLIER_ELEMENT_LENGTH;
   //convert stick
-  if (0 != (rc = gcry_mpi_scan (&s_prime, GCRYMPI_FMT_USG, current,
+  if (0 != (rc = gcry_mpi_scan (&session->s_prime, GCRYMPI_FMT_USG, current,
                                 PAILLIER_ELEMENT_LENGTH, &read))) {
     LOG_GCRY (GNUNET_ERROR_TYPE_DEBUG, "gcry_mpi_scan", rc);
     GNUNET_break_op (0);
     goto invalid_msg;
   }
   current += PAILLIER_ELEMENT_LENGTH;
-
-  r = GNUNET_malloc (sizeof (gcry_mpi_t) * count);
-  // Convert each kp[] to its MPI_value
-  for (i = 0; i < count; i++) {
-    if (0 != (rc = gcry_mpi_scan (&r[i], GCRYMPI_FMT_USG, current,
+  session->r = GNUNET_malloc (sizeof (gcry_mpi_t) * session->used_element_count);
+  session->r_prime = GNUNET_malloc (sizeof (gcry_mpi_t) * session->used_element_count);
+  // Convert each k[][perm] to its MPI_value
+  for (i = 0; i < contained_element_count; i++) {
+    if (0 != (rc = gcry_mpi_scan (&session->r[i], GCRYMPI_FMT_USG, current,
                                   PAILLIER_ELEMENT_LENGTH, &read))) {
       LOG_GCRY (GNUNET_ERROR_TYPE_DEBUG, "gcry_mpi_scan", rc);
       GNUNET_break_op (0);
       goto invalid_msg;
     }
     current += PAILLIER_ELEMENT_LENGTH;
-  }
-
-
-  r_prime = GNUNET_malloc (sizeof (gcry_mpi_t) * count);
-  // Convert each kq[] to its MPI_value
-  for (i = 0; i < count; i++) {
-    if (0 != (rc = gcry_mpi_scan (&r_prime[i], GCRYMPI_FMT_USG, current,
+    if (0 != (rc = gcry_mpi_scan (&session->r_prime[i], GCRYMPI_FMT_USG, current,
                                   PAILLIER_ELEMENT_LENGTH, &read))) {
       LOG_GCRY (GNUNET_ERROR_TYPE_DEBUG, "gcry_mpi_scan", rc);
       GNUNET_break_op (0);
@@ -2188,19 +2327,24 @@ handle_service_response (void *cls,
     }
     current += PAILLIER_ELEMENT_LENGTH;
   }
-  session->product = compute_scalar_product (session, r, r_prime, s, s_prime);
+  if (session->transferred_element_count != session->used_element_count)
+    return GNUNET_OK; //wait for the other multipart chunks
+  
+  session->state = SERVICE_RESPONSE_RECEIVED;
+  session->product = compute_scalar_product (session);
+  return GNUNET_SYSERR; // terminate the tunnel right away, we are done here!
 
 invalid_msg:
-  if (s)
-    gcry_mpi_release (s);
-  if (s_prime)
-    gcry_mpi_release (s_prime);
-  for (i = 0; r && i < count; i++)
-    if (r[i]) gcry_mpi_release (r[i]);
-  for (i = 0; r_prime && i < count; i++)
-    if (r_prime[i]) gcry_mpi_release (r_prime[i]);
-  GNUNET_free_non_null (r);
-  GNUNET_free_non_null (r_prime);
+  if (session->s)
+    gcry_mpi_release (session->s);
+  if (session->s_prime)
+    gcry_mpi_release (session->s_prime);
+  for (i = 0; session->r && i < contained_element_count; i++)
+    if (session->r[i]) gcry_mpi_release (session->r[i]);
+  for (i = 0; session->r_prime && i < contained_element_count; i++)
+    if (session->r_prime[i]) gcry_mpi_release (session->r_prime[i]);
+  GNUNET_free_non_null (session->r);
+  GNUNET_free_non_null (session->r_prime);
 
   session->tunnel = NULL;
   // send message with product to client