error handling
[oweals/gnunet.git] / src / scalarproduct / gnunet-service-scalarproduct_alice.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013, 2014, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file scalarproduct/gnunet-service-scalarproduct_alice.c
22  * @brief scalarproduct service implementation
23  * @author Christian M. Fuchs
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include <limits.h>
28 #include <gcrypt.h>
29 #include "gnunet_util_lib.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_scalarproduct_service.h"
35 #include "gnunet_set_service.h"
36 #include "scalarproduct.h"
37 #include "gnunet-service-scalarproduct.h"
38
39 #define LOG(kind, ...) \
40   GNUNET_log_from (kind, "scalarproduct-alice", __VA_ARGS__)
41
42 /**
43  * An encrypted element key-value pair.
44  */
45 struct MpiElement
46 {
47   /**
48    * Key used to identify matching pairs of values to multiply.
49    * Points into an existing data structure, to avoid copying
50    * and doubling memory use.
51    */
52   const struct GNUNET_HashCode *key;
53
54   /**
55    * Value represented (a).
56    */
57   gcry_mpi_t value;
58 };
59
60
61 /**
62  * A scalarproduct session which tracks
63  * a request form the client to our final response.
64  */
65 struct AliceServiceSession
66 {
67   /**
68    * (hopefully) unique transaction ID
69    */
70   struct GNUNET_HashCode session_id;
71
72   /**
73    * Alice or Bob's peerID
74    */
75   struct GNUNET_PeerIdentity peer;
76
77   /**
78    * The client this request is related to.
79    */
80   struct GNUNET_SERVICE_Client *client;
81
82   /**
83    * The message queue for the client.
84    */
85   struct GNUNET_MQ_Handle *client_mq;
86
87   /**
88    * The message queue for CADET.
89    */
90   struct GNUNET_MQ_Handle *cadet_mq;
91
92   /**
93    * all non-0-value'd elements transmitted to us.
94    * Values are of type `struct GNUNET_SCALARPRODUCT_Element *`
95    */
96   struct GNUNET_CONTAINER_MultiHashMap *intersected_elements;
97
98   /**
99    * Set of elements for which will conduction an intersection.
100    * the resulting elements are then used for computing the scalar product.
101    */
102   struct GNUNET_SET_Handle *intersection_set;
103
104   /**
105    * Set of elements for which will conduction an intersection.
106    * the resulting elements are then used for computing the scalar product.
107    */
108   struct GNUNET_SET_OperationHandle *intersection_op;
109
110   /**
111    * Handle to Alice's Intersection operation listening for Bob
112    */
113   struct GNUNET_SET_ListenHandle *intersection_listen;
114
115   /**
116    * channel-handle associated with our cadet handle
117    */
118   struct GNUNET_CADET_Channel *channel;
119
120   /**
121    * a(Alice), sorted array by key of length @e used_element_count.
122    */
123   struct MpiElement *sorted_elements;
124
125   /**
126    * Bob's permutation p of R
127    */
128   struct GNUNET_CRYPTO_PaillierCiphertext *r;
129
130   /**
131    * Bob's permutation q of R
132    */
133   struct GNUNET_CRYPTO_PaillierCiphertext *r_prime;
134
135   /**
136    * Bob's "s"
137    */
138   struct GNUNET_CRYPTO_PaillierCiphertext s;
139
140   /**
141    * Bob's "s'"
142    */
143   struct GNUNET_CRYPTO_PaillierCiphertext s_prime;
144
145   /**
146    * The computed scalar
147    */
148   gcry_mpi_t product;
149
150   /**
151    * How many elements we were supplied with from the client (total
152    * count before intersection).
153    */
154   uint32_t total;
155
156   /**
157    * How many elements actually are used for the scalar product.
158    * Size of the arrays in @e r and @e r_prime.  Sometimes also
159    * reset to 0 and used as a counter!
160    */
161   uint32_t used_element_count;
162
163   /**
164    * Already transferred elements from client to us.
165    * Less or equal than @e total.
166    */
167   uint32_t client_received_element_count;
168
169   /**
170    * Already transferred elements from Bob to us.
171    * Less or equal than @e total.
172    */
173   uint32_t cadet_received_element_count;
174
175   /**
176    * State of this session.   In
177    * #GNUNET_SCALARPRODUCT_STATUS_ACTIVE while operation is
178    * ongoing, afterwards in #GNUNET_SCALARPRODUCT_STATUS_SUCCESS or
179    * #GNUNET_SCALARPRODUCT_STATUS_FAILURE.
180    */
181   enum GNUNET_SCALARPRODUCT_ResponseStatus status;
182
183   /**
184    * Flag to prevent recursive calls to #destroy_service_session() from
185    * doing harm.
186    */
187   int in_destroy;
188 };
189
190
191 /**
192  * GNUnet configuration handle
193  */
194 static const struct GNUNET_CONFIGURATION_Handle *cfg;
195
196 /**
197  * Service's own public key
198  */
199 static struct GNUNET_CRYPTO_PaillierPublicKey my_pubkey;
200
201 /**
202  * Service's own private key
203  */
204 static struct GNUNET_CRYPTO_PaillierPrivateKey my_privkey;
205
206 /**
207  * Service's offset for values that could possibly be negative but are plaintext for encryption.
208  */
209 static gcry_mpi_t my_offset;
210
211 /**
212  * Handle to the CADET service.
213  */
214 static struct GNUNET_CADET_Handle *my_cadet;
215
216
217 /**
218  * Iterator called to free elements.
219  *
220  * @param cls the `struct AliceServiceSession *` (unused)
221  * @param key the key (unused)
222  * @param value value to free
223  * @return #GNUNET_OK (continue to iterate)
224  */
225 static int
226 free_element_cb (void *cls, const struct GNUNET_HashCode *key, void *value)
227 {
228   struct GNUNET_SCALARPRODUCT_Element *e = value;
229
230   GNUNET_free (e);
231   return GNUNET_OK;
232 }
233
234
235 /**
236  * Destroy session state, we are done with it.
237  *
238  * @param s the session to free elements from
239  */
240 static void
241 destroy_service_session (struct AliceServiceSession *s)
242 {
243   if (GNUNET_YES == s->in_destroy)
244     return;
245   s->in_destroy = GNUNET_YES;
246   if (NULL != s->client)
247   {
248     struct GNUNET_SERVICE_Client *c = s->client;
249
250     s->client = NULL;
251     GNUNET_SERVICE_client_drop (c);
252   }
253   if (NULL != s->channel)
254   {
255     GNUNET_CADET_channel_destroy (s->channel);
256     s->channel = NULL;
257   }
258   if (NULL != s->intersected_elements)
259   {
260     GNUNET_CONTAINER_multihashmap_iterate (s->intersected_elements,
261                                            &free_element_cb,
262                                            s);
263     GNUNET_CONTAINER_multihashmap_destroy (s->intersected_elements);
264     s->intersected_elements = NULL;
265   }
266   if (NULL != s->intersection_listen)
267   {
268     GNUNET_SET_listen_cancel (s->intersection_listen);
269     s->intersection_listen = NULL;
270   }
271   if (NULL != s->intersection_op)
272   {
273     GNUNET_SET_operation_cancel (s->intersection_op);
274     s->intersection_op = NULL;
275   }
276   if (NULL != s->intersection_set)
277   {
278     GNUNET_SET_destroy (s->intersection_set);
279     s->intersection_set = NULL;
280   }
281   if (NULL != s->sorted_elements)
282   {
283     for (unsigned int i = 0; i < s->used_element_count; i++)
284       gcry_mpi_release (s->sorted_elements[i].value);
285     GNUNET_free (s->sorted_elements);
286     s->sorted_elements = NULL;
287   }
288   if (NULL != s->r)
289   {
290     GNUNET_free (s->r);
291     s->r = NULL;
292   }
293   if (NULL != s->r_prime)
294   {
295     GNUNET_free (s->r_prime);
296     s->r_prime = NULL;
297   }
298   if (NULL != s->product)
299   {
300     gcry_mpi_release (s->product);
301     s->product = NULL;
302   }
303   GNUNET_free (s);
304 }
305
306
307 /**
308  * Notify the client that the session has failed.  A message gets sent
309  * to Alice's client if we encountered any error.
310  *
311  * @param session the associated client session to fail or succeed
312  */
313 static void
314 prepare_client_end_notification (struct AliceServiceSession *session)
315 {
316   struct ClientResponseMessage *msg;
317   struct GNUNET_MQ_Envelope *e;
318
319   if (NULL == session->client_mq)
320     return; /* no client left to be notified */
321   GNUNET_log (
322     GNUNET_ERROR_TYPE_DEBUG,
323     "Sending session-end notification with status %d to client for session %s\n",
324     session->status,
325     GNUNET_h2s (&session->session_id));
326   e = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
327   msg->product_length = htonl (0);
328   msg->status = htonl (session->status);
329   GNUNET_MQ_send (session->client_mq, e);
330 }
331
332
333 /**
334  * Prepare the final (positive) response we will send to Alice's
335  * client.
336  *
337  * @param s the session associated with our client.
338  */
339 static void
340 transmit_client_response (struct AliceServiceSession *s)
341 {
342   struct ClientResponseMessage *msg;
343   struct GNUNET_MQ_Envelope *e;
344   unsigned char *product_exported = NULL;
345   size_t product_length = 0;
346   int32_t range;
347   gcry_error_t rc;
348   int sign;
349   gcry_mpi_t value;
350
351   if (NULL == s->product)
352   {
353     GNUNET_break (0);
354     prepare_client_end_notification (s);
355     return;
356   }
357   value = gcry_mpi_new (0);
358   sign = gcry_mpi_cmp_ui (s->product, 0);
359   if (0 > sign)
360   {
361     range = -1;
362     gcry_mpi_sub (value, value, s->product);
363   }
364   else if (0 < sign)
365   {
366     range = 1;
367     gcry_mpi_add (value, value, s->product);
368   }
369   else
370   {
371     /* result is exactly zero */
372     range = 0;
373   }
374   gcry_mpi_release (s->product);
375   s->product = NULL;
376
377   if ((0 != range) && (0 != (rc = gcry_mpi_aprint (GCRYMPI_FMT_STD,
378                                                    &product_exported,
379                                                    &product_length,
380                                                    value))))
381   {
382     LOG_GCRY (GNUNET_ERROR_TYPE_ERROR, "gcry_mpi_scan", rc);
383     prepare_client_end_notification (s);
384     return;
385   }
386   gcry_mpi_release (value);
387   e = GNUNET_MQ_msg_extra (msg,
388                            product_length,
389                            GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
390   msg->status = htonl (GNUNET_SCALARPRODUCT_STATUS_SUCCESS);
391   msg->range = htonl (range);
392   msg->product_length = htonl (product_length);
393   if (NULL != product_exported)
394   {
395     GNUNET_memcpy (&msg[1], product_exported, product_length);
396     GNUNET_free (product_exported);
397   }
398   GNUNET_MQ_send (s->client_mq, e);
399   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
400               "Sent result to client, session %s has ended!\n",
401               GNUNET_h2s (&s->session_id));
402 }
403
404
405 /**
406  * Function called whenever a channel is destroyed.  Should clean up
407  * any associated state.
408  *
409  * It must NOT call #GNUNET_CADET_channel_destroy() on the channel.
410  *
411  * @param cls our `struct AliceServiceSession`
412  * @param channel connection to the other end (henceforth invalid)
413  */
414 static void
415 cb_channel_destruction (void *cls, const struct GNUNET_CADET_Channel *channel)
416 {
417   struct AliceServiceSession *s = cls;
418
419   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
420               "Peer disconnected, terminating session %s with peer %s\n",
421               GNUNET_h2s (&s->session_id),
422               GNUNET_i2s (&s->peer));
423   if (GNUNET_SCALARPRODUCT_STATUS_ACTIVE == s->status)
424   {
425     /* We didn't get an answer yet, fail with error */
426     s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
427     prepare_client_end_notification (s);
428   }
429   s->channel = NULL;
430 }
431
432
433 /**
434  * Computes the square sum over a vector of a given length.
435  *
436  * @param vector the vector to compute over
437  * @param length the length of the vector
438  * @return an MPI value containing the calculated sum, never NULL
439  */
440 static gcry_mpi_t
441 compute_square_sum_mpi_elements (const struct MpiElement *vector,
442                                  uint32_t length)
443 {
444   gcry_mpi_t elem;
445   gcry_mpi_t sum;
446   uint32_t i;
447
448   GNUNET_assert (NULL != (sum = gcry_mpi_new (0)));
449   GNUNET_assert (NULL != (elem = gcry_mpi_new (0)));
450   for (i = 0; i < length; i++)
451   {
452     gcry_mpi_mul (elem, vector[i].value, vector[i].value);
453     gcry_mpi_add (sum, sum, elem);
454   }
455   gcry_mpi_release (elem);
456   return sum;
457 }
458
459
460 /**
461  * Computes the square sum over a vector of a given length.
462  *
463  * @param vector the vector to compute over
464  * @param length the length of the vector
465  * @return an MPI value containing the calculated sum, never NULL
466  */
467 static gcry_mpi_t
468 compute_square_sum (const gcry_mpi_t *vector, uint32_t length)
469 {
470   gcry_mpi_t elem;
471   gcry_mpi_t sum;
472   uint32_t i;
473
474   GNUNET_assert (NULL != (sum = gcry_mpi_new (0)));
475   GNUNET_assert (NULL != (elem = gcry_mpi_new (0)));
476   for (i = 0; i < length; i++)
477   {
478     gcry_mpi_mul (elem, vector[i], vector[i]);
479     gcry_mpi_add (sum, sum, elem);
480   }
481   gcry_mpi_release (elem);
482   return sum;
483 }
484
485
486 /**
487  * Compute our scalar product, done by Alice
488  *
489  * @param session the session associated with this computation
490  * @return product as MPI, never NULL
491  */
492 static gcry_mpi_t
493 compute_scalar_product (struct AliceServiceSession *session)
494 {
495   uint32_t count;
496   gcry_mpi_t t;
497   gcry_mpi_t u;
498   gcry_mpi_t u_prime;
499   gcry_mpi_t p;
500   gcry_mpi_t p_prime;
501   gcry_mpi_t tmp;
502   gcry_mpi_t r[session->used_element_count];
503   gcry_mpi_t r_prime[session->used_element_count];
504   gcry_mpi_t s;
505   gcry_mpi_t s_prime;
506   unsigned int i;
507
508   count = session->used_element_count;
509   // due to the introduced static offset S, we now also have to remove this
510   // from the E(a_pi)(+)E(-b_pi-r_pi) and E(a_qi)(+)E(-r_qi) twice each,
511   // the result is E((S + a_pi) + (S -b_pi-r_pi)) and E(S + a_qi + S - r_qi)
512   for (i = 0; i < count; i++)
513   {
514     r[i] = gcry_mpi_new (0);
515     GNUNET_CRYPTO_paillier_decrypt (&my_privkey,
516                                     &my_pubkey,
517                                     &session->r[i],
518                                     r[i]);
519     gcry_mpi_sub (r[i], r[i], my_offset);
520     gcry_mpi_sub (r[i], r[i], my_offset);
521     r_prime[i] = gcry_mpi_new (0);
522     GNUNET_CRYPTO_paillier_decrypt (&my_privkey,
523                                     &my_pubkey,
524                                     &session->r_prime[i],
525                                     r_prime[i]);
526     gcry_mpi_sub (r_prime[i], r_prime[i], my_offset);
527     gcry_mpi_sub (r_prime[i], r_prime[i], my_offset);
528   }
529
530   // calculate t = sum(ai)
531   t = compute_square_sum_mpi_elements (session->sorted_elements, count);
532   // calculate U
533   u = gcry_mpi_new (0);
534   tmp = compute_square_sum (r, count);
535   gcry_mpi_sub (u, u, tmp);
536   gcry_mpi_release (tmp);
537
538   // calculate U'
539   u_prime = gcry_mpi_new (0);
540   tmp = compute_square_sum (r_prime, count);
541   gcry_mpi_sub (u_prime, u_prime, tmp);
542
543   GNUNET_assert (p = gcry_mpi_new (0));
544   GNUNET_assert (p_prime = gcry_mpi_new (0));
545   GNUNET_assert (s = gcry_mpi_new (0));
546   GNUNET_assert (s_prime = gcry_mpi_new (0));
547
548   // compute P
549   GNUNET_CRYPTO_paillier_decrypt (&my_privkey, &my_pubkey, &session->s, s);
550   GNUNET_CRYPTO_paillier_decrypt (&my_privkey,
551                                   &my_pubkey,
552                                   &session->s_prime,
553                                   s_prime);
554
555   // compute P
556   gcry_mpi_add (p, s, t);
557   gcry_mpi_add (p, p, u);
558
559   // compute P'
560   gcry_mpi_add (p_prime, s_prime, t);
561   gcry_mpi_add (p_prime, p_prime, u_prime);
562
563   gcry_mpi_release (t);
564   gcry_mpi_release (u);
565   gcry_mpi_release (u_prime);
566   gcry_mpi_release (s);
567   gcry_mpi_release (s_prime);
568
569   // compute product
570   gcry_mpi_sub (p, p, p_prime);
571   gcry_mpi_release (p_prime);
572   tmp = gcry_mpi_set_ui (tmp, 2);
573   gcry_mpi_div (p, NULL, p, tmp, 0);
574
575   gcry_mpi_release (tmp);
576   for (i = 0; i < count; i++)
577   {
578     gcry_mpi_release (session->sorted_elements[i].value);
579     gcry_mpi_release (r[i]);
580     gcry_mpi_release (r_prime[i]);
581   }
582   GNUNET_free (session->sorted_elements);
583   session->sorted_elements = NULL;
584   GNUNET_free (session->r);
585   session->r = NULL;
586   GNUNET_free (session->r_prime);
587   session->r_prime = NULL;
588
589   return p;
590 }
591
592
593 /**
594  * Check a multipart chunk of a response we got from another service
595  * we wanted to calculate a scalarproduct with.
596  *
597  * @param cls the `struct AliceServiceSession`
598  * @param msg the actual message
599  * @return #GNUNET_OK to keep the connection open,
600  *         #GNUNET_SYSERR to close it (signal serious error)
601  */
602 static int
603 check_bobs_cryptodata_multipart (
604   void *cls,
605   const struct BobCryptodataMultipartMessage *msg)
606 {
607   struct AliceServiceSession *s = cls;
608   uint32_t contained;
609   size_t msg_size;
610   size_t required_size;
611
612   msg_size = ntohs (msg->header.size);
613   contained = ntohl (msg->contained_element_count);
614   required_size =
615     sizeof(struct BobCryptodataMultipartMessage)
616     + 2 * contained * sizeof(struct GNUNET_CRYPTO_PaillierCiphertext);
617   if ((required_size != msg_size) ||
618       (s->cadet_received_element_count + contained > s->used_element_count))
619   {
620     GNUNET_break (0);
621     return GNUNET_SYSERR;
622   }
623   return GNUNET_OK;
624 }
625
626
627 /**
628  * Handle a multipart chunk of a response we got from another service
629  * we wanted to calculate a scalarproduct with.
630  *
631  * @param cls the `struct AliceServiceSession`
632  * @param msg the actual message
633  */
634 static void
635 handle_bobs_cryptodata_multipart (
636   void *cls,
637   const struct BobCryptodataMultipartMessage *msg)
638 {
639   struct AliceServiceSession *s = cls;
640   const struct GNUNET_CRYPTO_PaillierCiphertext *payload;
641   size_t i;
642   uint32_t contained;
643
644   contained = ntohl (msg->contained_element_count);
645   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
646               "Received %u additional crypto values from Bob\n",
647               (unsigned int) contained);
648
649   payload = (const struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
650   /* Convert each k[][perm] to its MPI_value */
651   for (i = 0; i < contained; i++)
652   {
653     GNUNET_memcpy (&s->r[s->cadet_received_element_count + i],
654                    &payload[2 * i],
655                    sizeof(struct GNUNET_CRYPTO_PaillierCiphertext));
656     GNUNET_memcpy (&s->r_prime[s->cadet_received_element_count + i],
657                    &payload[2 * i],
658                    sizeof(struct GNUNET_CRYPTO_PaillierCiphertext));
659   }
660   s->cadet_received_element_count += contained;
661   GNUNET_CADET_receive_done (s->channel);
662   if (s->cadet_received_element_count != s->used_element_count)
663     return; /* more to come */
664
665   s->product = compute_scalar_product (s);
666   transmit_client_response (s);
667 }
668
669
670 /**
671  * Check a response we got from another service we wanted to
672  * calculate a scalarproduct with.
673  *
674  * @param cls our `struct AliceServiceSession`
675  * @param message the actual message
676  * @return #GNUNET_OK to keep the connection open,
677  *         #GNUNET_SYSERR to close it (we are done)
678  */
679 static int
680 check_bobs_cryptodata_message (void *cls,
681                                const struct BobCryptodataMessage *msg)
682 {
683   struct AliceServiceSession *s = cls;
684   uint32_t contained;
685   uint16_t msg_size;
686   size_t required_size;
687
688   msg_size = ntohs (msg->header.size);
689   contained = ntohl (msg->contained_element_count);
690   required_size =
691     sizeof(struct BobCryptodataMessage)
692     + 2 * contained * sizeof(struct GNUNET_CRYPTO_PaillierCiphertext)
693     + 2 * sizeof(struct GNUNET_CRYPTO_PaillierCiphertext);
694   if ((msg_size != required_size) || (contained > UINT16_MAX) ||
695       (s->used_element_count < contained))
696   {
697     GNUNET_break_op (0);
698     return GNUNET_SYSERR;
699   }
700   if (NULL == s->sorted_elements)
701   {
702     /* we're not ready yet, how can Bob be? */
703     GNUNET_break_op (0);
704     return GNUNET_SYSERR;
705   }
706   if (s->total != s->client_received_element_count)
707   {
708     /* we're not ready yet, how can Bob be? */
709     GNUNET_break_op (0);
710     return GNUNET_SYSERR;
711   }
712   return GNUNET_OK;
713 }
714
715
716 /**
717  * Handle a response we got from another service we wanted to
718  * calculate a scalarproduct with.
719  *
720  * @param cls our `struct AliceServiceSession`
721  * @param msg the actual message
722  */
723 static void
724 handle_bobs_cryptodata_message (void *cls,
725                                 const struct BobCryptodataMessage *msg)
726 {
727   struct AliceServiceSession *s = cls;
728   const struct GNUNET_CRYPTO_PaillierCiphertext *payload;
729   uint32_t i;
730   uint32_t contained;
731
732   contained = ntohl (msg->contained_element_count);
733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734               "Received %u crypto values from Bob\n",
735               (unsigned int) contained);
736   payload = (const struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
737   GNUNET_memcpy (&s->s,
738                  &payload[0],
739                  sizeof(struct GNUNET_CRYPTO_PaillierCiphertext));
740   GNUNET_memcpy (&s->s_prime,
741                  &payload[1],
742                  sizeof(struct GNUNET_CRYPTO_PaillierCiphertext));
743   payload = &payload[2];
744
745   s->r = GNUNET_new_array (s->used_element_count,
746                            struct GNUNET_CRYPTO_PaillierCiphertext);
747   s->r_prime = GNUNET_new_array (s->used_element_count,
748                                  struct GNUNET_CRYPTO_PaillierCiphertext);
749   for (i = 0; i < contained; i++)
750   {
751     GNUNET_memcpy (&s->r[i],
752                    &payload[2 * i],
753                    sizeof(struct GNUNET_CRYPTO_PaillierCiphertext));
754     GNUNET_memcpy (&s->r_prime[i],
755                    &payload[2 * i + 1],
756                    sizeof(struct GNUNET_CRYPTO_PaillierCiphertext));
757   }
758   s->cadet_received_element_count = contained;
759   GNUNET_CADET_receive_done (s->channel);
760
761   if (s->cadet_received_element_count != s->used_element_count)
762   {
763     /* More to come */
764     return;
765   }
766   s->product = compute_scalar_product (s);
767   transmit_client_response (s);
768 }
769
770
771 /**
772  * Iterator to copy over messages from the hash map
773  * into an array for sorting.
774  *
775  * @param cls the `struct AliceServiceSession *`
776  * @param key the key (unused)
777  * @param value the `struct GNUNET_SCALARPRODUCT_Element *`
778  */
779 static int
780 copy_element_cb (void *cls, const struct GNUNET_HashCode *key, void *value)
781 {
782   struct AliceServiceSession *s = cls;
783   struct GNUNET_SCALARPRODUCT_Element *e = value;
784   gcry_mpi_t mval;
785   int64_t val;
786
787   mval = gcry_mpi_new (0);
788   val = (int64_t) GNUNET_ntohll (e->value);
789   if (0 > val)
790     gcry_mpi_sub_ui (mval, mval, -val);
791   else
792     gcry_mpi_add_ui (mval, mval, val);
793   s->sorted_elements[s->used_element_count].value = mval;
794   s->sorted_elements[s->used_element_count].key = &e->key;
795   s->used_element_count++;
796   return GNUNET_OK;
797 }
798
799
800 /**
801  * Compare two `struct MpiValue`s by key for sorting.
802  *
803  * @param a pointer to first `struct MpiValue *`
804  * @param b pointer to first `struct MpiValue *`
805  * @return -1 for a < b, 0 for a=b, 1 for a > b.
806  */
807 static int
808 element_cmp (const void *a, const void *b)
809 {
810   const struct MpiElement *ma = a;
811   const struct MpiElement *mb = b;
812
813   return GNUNET_CRYPTO_hash_cmp (ma->key, mb->key);
814 }
815
816
817 /**
818  * Maximum number of elements we can put into a single cryptodata
819  * message
820  */
821 #define ELEMENT_CAPACITY                          \
822   ((GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE - 1   \
823     - sizeof(struct AliceCryptodataMessage))       \
824    / sizeof(struct GNUNET_CRYPTO_PaillierCiphertext))
825
826
827 /**
828  * Send the cryptographic data from Alice to Bob.
829  * Does nothing if we already transferred all elements.
830  *
831  * @param s the associated service session
832  */
833 static void
834 send_alices_cryptodata_message (struct AliceServiceSession *s)
835 {
836   struct AliceCryptodataMessage *msg;
837   struct GNUNET_MQ_Envelope *e;
838   struct GNUNET_CRYPTO_PaillierCiphertext *payload;
839   unsigned int i;
840   uint32_t todo_count;
841   gcry_mpi_t a;
842   uint32_t off;
843
844   s->sorted_elements = GNUNET_malloc (
845     GNUNET_CONTAINER_multihashmap_size (s->intersected_elements)
846     * sizeof(struct MpiElement));
847   s->used_element_count = 0;
848   GNUNET_CONTAINER_multihashmap_iterate (s->intersected_elements,
849                                          &copy_element_cb,
850                                          s);
851   LOG (GNUNET_ERROR_TYPE_DEBUG,
852        "Finished intersection, %d items remain\n",
853        s->used_element_count);
854   qsort (s->sorted_elements,
855          s->used_element_count,
856          sizeof(struct MpiElement),
857          &element_cmp);
858   off = 0;
859   while (off < s->used_element_count)
860   {
861     todo_count = s->used_element_count - off;
862     if (todo_count > ELEMENT_CAPACITY)
863       todo_count = ELEMENT_CAPACITY;
864     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
865                 "Sending %u/%u crypto values to Bob\n",
866                 (unsigned int) todo_count,
867                 (unsigned int) s->used_element_count);
868
869     e =
870       GNUNET_MQ_msg_extra (msg,
871                            todo_count
872                            * sizeof(struct GNUNET_CRYPTO_PaillierCiphertext),
873                            GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA);
874     msg->contained_element_count = htonl (todo_count);
875     payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
876     a = gcry_mpi_new (0);
877     for (i = off; i < off + todo_count; i++)
878     {
879       gcry_mpi_add (a, s->sorted_elements[i].value, my_offset);
880       GNUNET_assert (
881         3 ==
882         GNUNET_CRYPTO_paillier_encrypt (&my_pubkey, a, 3, &payload[i - off]));
883     }
884     gcry_mpi_release (a);
885     off += todo_count;
886     GNUNET_MQ_send (s->cadet_mq, e);
887   }
888 }
889
890
891 /**
892  * Callback for set operation results. Called for each element
893  * that should be removed from the result set, and then once
894  * to indicate that the set intersection operation is done.
895  *
896  * @param cls closure with the `struct AliceServiceSession`
897  * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
898  * @param current_size current set size
899  * @param status what has happened with the set intersection?
900  */
901 static void
902 cb_intersection_element_removed (void *cls,
903                                  const struct GNUNET_SET_Element *element,
904                                  uint64_t current_size,
905                                  enum GNUNET_SET_Status status)
906 {
907   struct AliceServiceSession *s = cls;
908   struct GNUNET_SCALARPRODUCT_Element *se;
909
910   switch (status)
911   {
912   case GNUNET_SET_STATUS_OK:
913     /* this element has been removed from the set */
914     se = GNUNET_CONTAINER_multihashmap_get (s->intersected_elements,
915                                             element->data);
916     GNUNET_assert (NULL != se);
917     LOG (GNUNET_ERROR_TYPE_DEBUG,
918          "Intersection removed element with key %s and value %lld\n",
919          GNUNET_h2s (&se->key),
920          (long long) GNUNET_ntohll (se->value));
921     GNUNET_assert (
922       GNUNET_YES ==
923       GNUNET_CONTAINER_multihashmap_remove (s->intersected_elements,
924                                             element->data,
925                                             se));
926     GNUNET_free (se);
927     return;
928
929   case GNUNET_SET_STATUS_DONE:
930     s->intersection_op = NULL;
931     if (NULL != s->intersection_set)
932     {
933       GNUNET_SET_destroy (s->intersection_set);
934       s->intersection_set = NULL;
935     }
936     send_alices_cryptodata_message (s);
937     return;
938
939   case GNUNET_SET_STATUS_HALF_DONE:
940     /* unexpected for intersection */
941     GNUNET_break (0);
942     return;
943
944   case GNUNET_SET_STATUS_FAILURE:
945     /* unhandled status code */
946     LOG (GNUNET_ERROR_TYPE_DEBUG, "Set intersection failed!\n");
947     if (NULL != s->intersection_listen)
948     {
949       GNUNET_SET_listen_cancel (s->intersection_listen);
950       s->intersection_listen = NULL;
951     }
952     s->intersection_op = NULL;
953     if (NULL != s->intersection_set)
954     {
955       GNUNET_SET_destroy (s->intersection_set);
956       s->intersection_set = NULL;
957     }
958     s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
959     prepare_client_end_notification (s);
960     return;
961
962   default:
963     GNUNET_break (0);
964     return;
965   }
966 }
967
968
969 /**
970  * Called when another peer wants to do a set operation with the
971  * local peer. If a listen error occurs, the @a request is NULL.
972  *
973  * @param cls closure with the `struct AliceServiceSession *`
974  * @param other_peer the other peer
975  * @param context_msg message with application specific information from
976  *        the other peer
977  * @param request request from the other peer (never NULL), use GNUNET_SET_accept()
978  *        to accept it, otherwise the request will be refused
979  *        Note that we can't just return value from the listen callback,
980  *        as it is also necessary to specify the set we want to do the
981  *        operation with, whith sometimes can be derived from the context
982  *        message. It's necessary to specify the timeout.
983  */
984 static void
985 cb_intersection_request_alice (void *cls,
986                                const struct GNUNET_PeerIdentity *other_peer,
987                                const struct GNUNET_MessageHeader *context_msg,
988                                struct GNUNET_SET_Request *request)
989 {
990   struct AliceServiceSession *s = cls;
991
992   if (0 != GNUNET_memcmp (other_peer, &s->peer))
993   {
994     GNUNET_break_op (0);
995     return;
996   }
997   s->intersection_op = GNUNET_SET_accept (request,
998                                           GNUNET_SET_RESULT_REMOVED,
999                                           (struct GNUNET_SET_Option[]){ { 0 } },
1000                                           &cb_intersection_element_removed,
1001                                           s);
1002   if (NULL == s->intersection_op)
1003   {
1004     GNUNET_break (0);
1005     s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
1006     prepare_client_end_notification (s);
1007     return;
1008   }
1009   if (GNUNET_OK != GNUNET_SET_commit (s->intersection_op, s->intersection_set))
1010   {
1011     GNUNET_break (0);
1012     s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
1013     prepare_client_end_notification (s);
1014     return;
1015   }
1016 }
1017
1018
1019 /**
1020  * Our client has finished sending us its multipart message.
1021  *
1022  * @param session the service session context
1023  */
1024 static void
1025 client_request_complete_alice (struct AliceServiceSession *s)
1026 {
1027   struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1028   { GNUNET_MQ_hd_var_size (bobs_cryptodata_message,
1029                            GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA,
1030                            struct BobCryptodataMessage,
1031                            s),
1032     GNUNET_MQ_hd_var_size (
1033       bobs_cryptodata_multipart,
1034       GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA_MULTIPART,
1035       struct BobCryptodataMultipartMessage,
1036       s),
1037     GNUNET_MQ_handler_end () };
1038   struct ServiceRequestMessage *msg;
1039   struct GNUNET_MQ_Envelope *e;
1040
1041   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1042               "Creating new channel for session with key %s.\n",
1043               GNUNET_h2s (&s->session_id));
1044   s->channel = GNUNET_CADET_channel_create (my_cadet,
1045                                             s,
1046                                             &s->peer,
1047                                             &s->session_id,
1048                                             NULL,
1049                                             &cb_channel_destruction,
1050                                             cadet_handlers);
1051   if (NULL == s->channel)
1052   {
1053     s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
1054     prepare_client_end_notification (s);
1055     return;
1056   }
1057   s->cadet_mq = GNUNET_CADET_get_mq (s->channel);
1058   s->intersection_listen = GNUNET_SET_listen (cfg,
1059                                               GNUNET_SET_OPERATION_INTERSECTION,
1060                                               &s->session_id,
1061                                               &cb_intersection_request_alice,
1062                                               s);
1063   if (NULL == s->intersection_listen)
1064   {
1065     s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
1066     GNUNET_CADET_channel_destroy (s->channel);
1067     s->channel = NULL;
1068     prepare_client_end_notification (s);
1069     return;
1070   }
1071
1072   e = GNUNET_MQ_msg (msg,
1073                      GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SESSION_INITIALIZATION);
1074   msg->session_id = s->session_id;
1075   msg->public_key = my_pubkey;
1076   GNUNET_MQ_send (s->cadet_mq, e);
1077 }
1078
1079
1080 /**
1081  * We're receiving additional set data. Check if
1082  * @a msg is well-formed.
1083  *
1084  * @param cls client identification of the client
1085  * @param msg the actual message
1086  * @return #GNUNET_OK if @a msg is well-formed
1087  */
1088 static int
1089 check_alice_client_message_multipart (
1090   void *cls,
1091   const struct ComputationBobCryptodataMultipartMessage *msg)
1092 {
1093   struct AliceServiceSession *s = cls;
1094   uint32_t contained_count;
1095   uint16_t msize;
1096
1097   msize = ntohs (msg->header.size);
1098   contained_count = ntohl (msg->element_count_contained);
1099   if ((msize !=
1100        (sizeof(struct ComputationBobCryptodataMultipartMessage)
1101         + contained_count * sizeof(struct GNUNET_SCALARPRODUCT_Element))) ||
1102       (0 == contained_count) ||
1103       (s->total == s->client_received_element_count) ||
1104       (s->total < s->client_received_element_count + contained_count))
1105   {
1106     GNUNET_break_op (0);
1107     return GNUNET_SYSERR;
1108   }
1109   return GNUNET_OK;
1110 }
1111
1112
1113 /**
1114  * We're receiving additional set data. Add it to our
1115  * set and if we are done, initiate the transaction.
1116  *
1117  * @param cls client identification of the client
1118  * @param msg the actual message
1119  */
1120 static void
1121 handle_alice_client_message_multipart (
1122   void *cls,
1123   const struct ComputationBobCryptodataMultipartMessage *msg)
1124 {
1125   struct AliceServiceSession *s = cls;
1126   uint32_t contained_count;
1127   const struct GNUNET_SCALARPRODUCT_Element *elements;
1128   struct GNUNET_SET_Element set_elem;
1129   struct GNUNET_SCALARPRODUCT_Element *elem;
1130
1131   contained_count = ntohl (msg->element_count_contained);
1132   s->client_received_element_count += contained_count;
1133   elements = (const struct GNUNET_SCALARPRODUCT_Element *) &msg[1];
1134   for (uint32_t i = 0; i < contained_count; i++)
1135   {
1136     elem = GNUNET_new (struct GNUNET_SCALARPRODUCT_Element);
1137     GNUNET_memcpy (elem,
1138                    &elements[i],
1139                    sizeof(struct GNUNET_SCALARPRODUCT_Element));
1140     if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (
1141           s->intersected_elements,
1142           &elem->key,
1143           elem,
1144           GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1145     {
1146       GNUNET_break (0);
1147       GNUNET_free (elem);
1148       continue;
1149     }
1150     set_elem.data = &elem->key;
1151     set_elem.size = sizeof(elem->key);
1152     set_elem.element_type = 0;
1153     GNUNET_SET_add_element (s->intersection_set, &set_elem, NULL, NULL);
1154     s->used_element_count++;
1155   }
1156   GNUNET_SERVICE_client_continue (s->client);
1157   if (s->total != s->client_received_element_count)
1158   {
1159     /* more to come */
1160     return;
1161   }
1162   client_request_complete_alice (s);
1163 }
1164
1165
1166 /**
1167  * Handler for Alice's client request message.
1168  * Check that @a msg is well-formed.
1169  *
1170  * @param cls identification of the client
1171  * @param msg the actual message
1172  * @return #GNUNET_OK if @a msg is well-formed
1173  */
1174 static int
1175 check_alice_client_message (void *cls,
1176                             const struct AliceComputationMessage *msg)
1177 {
1178   struct AliceServiceSession *s = cls;
1179   uint16_t msize;
1180   uint32_t total_count;
1181   uint32_t contained_count;
1182
1183   if (NULL != s->intersected_elements)
1184   {
1185     /* only one concurrent session per client connection allowed,
1186        simplifies logic a lot... */
1187     GNUNET_break (0);
1188     return GNUNET_SYSERR;
1189   }
1190   msize = ntohs (msg->header.size);
1191   total_count = ntohl (msg->element_count_total);
1192   contained_count = ntohl (msg->element_count_contained);
1193   if ((0 == total_count) || (0 == contained_count) ||
1194       (msize !=
1195        (sizeof(struct AliceComputationMessage)
1196         + contained_count * sizeof(struct GNUNET_SCALARPRODUCT_Element))))
1197   {
1198     GNUNET_break_op (0);
1199     return GNUNET_SYSERR;
1200   }
1201   return GNUNET_OK;
1202 }
1203
1204
1205 /**
1206  * Handler for Alice's client request message.
1207  * We are doing request-initiation to compute a scalar product with a peer.
1208  *
1209  * @param cls identification of the client
1210  * @param msg the actual message
1211  */
1212 static void
1213 handle_alice_client_message (void *cls,
1214                              const struct AliceComputationMessage *msg)
1215 {
1216   struct AliceServiceSession *s = cls;
1217   uint32_t contained_count;
1218   uint32_t total_count;
1219   const struct GNUNET_SCALARPRODUCT_Element *elements;
1220   struct GNUNET_SET_Element set_elem;
1221   struct GNUNET_SCALARPRODUCT_Element *elem;
1222
1223   total_count = ntohl (msg->element_count_total);
1224   contained_count = ntohl (msg->element_count_contained);
1225   s->peer = msg->peer;
1226   s->status = GNUNET_SCALARPRODUCT_STATUS_ACTIVE;
1227   s->total = total_count;
1228   s->client_received_element_count = contained_count;
1229   s->session_id = msg->session_key;
1230   elements = (const struct GNUNET_SCALARPRODUCT_Element *) &msg[1];
1231   s->intersected_elements =
1232     GNUNET_CONTAINER_multihashmap_create (s->total, GNUNET_YES);
1233   s->intersection_set =
1234     GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_INTERSECTION);
1235
1236   for (uint32_t i = 0; i < contained_count; i++)
1237   {
1238     if (0 == GNUNET_ntohll (elements[i].value))
1239       continue;
1240     elem = GNUNET_new (struct GNUNET_SCALARPRODUCT_Element);
1241     GNUNET_memcpy (elem,
1242                    &elements[i],
1243                    sizeof(struct GNUNET_SCALARPRODUCT_Element));
1244     if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (
1245           s->intersected_elements,
1246           &elem->key,
1247           elem,
1248           GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1249     {
1250       /* element with same key encountered twice! */
1251       GNUNET_break (0);
1252       GNUNET_free (elem);
1253       continue;
1254     }
1255     set_elem.data = &elem->key;
1256     set_elem.size = sizeof(elem->key);
1257     set_elem.element_type = 0;
1258     GNUNET_SET_add_element (s->intersection_set, &set_elem, NULL, NULL);
1259     s->used_element_count++;
1260   }
1261   GNUNET_SERVICE_client_continue (s->client);
1262   if (s->total != s->client_received_element_count)
1263   {
1264     /* wait for multipart msg */
1265     return;
1266   }
1267   client_request_complete_alice (s);
1268 }
1269
1270
1271 /**
1272  * Task run during shutdown.
1273  *
1274  * @param cls unused
1275  */
1276 static void
1277 shutdown_task (void *cls)
1278 {
1279   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Shutting down, initiating cleanup.\n");
1280   // FIXME: we have to cut our connections to CADET first!
1281   if (NULL != my_cadet)
1282   {
1283     GNUNET_CADET_disconnect (my_cadet);
1284     my_cadet = NULL;
1285   }
1286 }
1287
1288
1289 /**
1290  * A client connected.
1291  *
1292  * Setup the associated data structure.
1293  *
1294  * @param cls closure, NULL
1295  * @param client identification of the client
1296  * @param mq message queue to communicate with @a client
1297  * @return our `struct AliceServiceSession`
1298  */
1299 static void *
1300 client_connect_cb (void *cls,
1301                    struct GNUNET_SERVICE_Client *client,
1302                    struct GNUNET_MQ_Handle *mq)
1303 {
1304   struct AliceServiceSession *s;
1305
1306   s = GNUNET_new (struct AliceServiceSession);
1307   s->client = client;
1308   s->client_mq = mq;
1309   return s;
1310 }
1311
1312
1313 /**
1314  * A client disconnected.
1315  *
1316  * Remove the associated session(s), release data structures
1317  * and cancel pending outgoing transmissions to the client.
1318  *
1319  * @param cls closure, NULL
1320  * @param client identification of the client
1321  * @param app_cls our `struct AliceServiceSession`
1322  */
1323 static void
1324 client_disconnect_cb (void *cls,
1325                       struct GNUNET_SERVICE_Client *client,
1326                       void *app_cls)
1327 {
1328   struct AliceServiceSession *s = app_cls;
1329
1330   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331               "Client %p disconnected from us.\n",
1332               client);
1333   s->client = NULL;
1334   s->client_mq = NULL;
1335   destroy_service_session (s);
1336 }
1337
1338
1339 /**
1340  * Initialization of the program and message handlers
1341  *
1342  * @param cls closure
1343  * @param c configuration to use
1344  * @param service the initialized service
1345  */
1346 static void
1347 run (void *cls,
1348      const struct GNUNET_CONFIGURATION_Handle *c,
1349      struct GNUNET_SERVICE_Handle *service)
1350 {
1351   cfg = c;
1352   /*
1353      offset has to be sufficiently small to allow computation of:
1354      m1+m2 mod n == (S + a) + (S + b) mod n,
1355      if we have more complex operations, this factor needs to be lowered */
1356   my_offset = gcry_mpi_new (GNUNET_CRYPTO_PAILLIER_BITS / 3);
1357   gcry_mpi_set_bit (my_offset, GNUNET_CRYPTO_PAILLIER_BITS / 3);
1358   GNUNET_CRYPTO_paillier_create (&my_pubkey, &my_privkey);
1359   my_cadet = GNUNET_CADET_connect (cfg);
1360   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
1361   if (NULL == my_cadet)
1362   {
1363     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Connect to CADET failed\n"));
1364     GNUNET_SCHEDULER_shutdown ();
1365     return;
1366   }
1367 }
1368
1369
1370 /**
1371  * Define "main" method using service macro.
1372  */
1373 GNUNET_SERVICE_MAIN (
1374   "scalarproduct-alice",
1375   GNUNET_SERVICE_OPTION_NONE,
1376   &run,
1377   &client_connect_cb,
1378   &client_disconnect_cb,
1379   NULL,
1380   GNUNET_MQ_hd_var_size (alice_client_message,
1381                          GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE,
1382                          struct AliceComputationMessage,
1383                          NULL),
1384   GNUNET_MQ_hd_var_size (
1385     alice_client_message_multipart,
1386     GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MULTIPART_ALICE,
1387     struct ComputationBobCryptodataMultipartMessage,
1388     NULL),
1389   GNUNET_MQ_handler_end ());
1390
1391
1392 /* end of gnunet-service-scalarproduct_alice.c */