-more SP fixes and todos
[oweals/gnunet.git] / src / scalarproduct / gnunet-service-scalarproduct_alice.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20 /**
21  * @file scalarproduct/gnunet-service-scalarproduct_alice.c
22  * @brief scalarproduct service implementation
23  * @author Christian M. Fuchs
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include <limits.h>
28 #include <gcrypt.h>
29 #include "gnunet_util_lib.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_scalarproduct_service.h"
35 #include "gnunet_set_service.h"
36 #include "scalarproduct.h"
37 #include "gnunet-service-scalarproduct.h"
38
39 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-alice", __VA_ARGS__)
40
41 /**
42  * An encrypted element key-value pair.
43  */
44 struct MpiElement
45 {
46   /**
47    * Key used to identify matching pairs of values to multiply.
48    * Points into an existing data structure, to avoid copying
49    * and doubling memory use.
50    */
51   const struct GNUNET_HashCode *key;
52
53   /**
54    * Value represented (a).
55    */
56   gcry_mpi_t value;
57 };
58
59
60 /**
61  * A scalarproduct session which tracks
62  * a request form the client to our final response.
63  */
64 struct AliceServiceSession
65 {
66
67   /**
68    * (hopefully) unique transaction ID
69    */
70   struct GNUNET_HashCode session_id;
71
72   /**
73    * Alice or Bob's peerID
74    */
75   struct GNUNET_PeerIdentity peer;
76
77   /**
78    * The client this request is related to.
79    */
80   struct GNUNET_SERVER_Client *client;
81
82   /**
83    * The message queue for the client.
84    */
85   struct GNUNET_MQ_Handle *client_mq;
86
87   /**
88    * The message queue for CADET.
89    */
90   struct GNUNET_MQ_Handle *cadet_mq;
91
92   /**
93    * all non-0-value'd elements transmitted to us.
94    * Values are of type `struct GNUNET_SCALARPRODUCT_Element *`
95    */
96   struct GNUNET_CONTAINER_MultiHashMap *intersected_elements;
97
98   /**
99    * Set of elements for which will conduction an intersection.
100    * the resulting elements are then used for computing the scalar product.
101    */
102   struct GNUNET_SET_Handle *intersection_set;
103
104   /**
105    * Set of elements for which will conduction an intersection.
106    * the resulting elements are then used for computing the scalar product.
107    */
108   struct GNUNET_SET_OperationHandle *intersection_op;
109
110   /**
111    * Handle to Alice's Intersection operation listening for Bob
112    */
113   struct GNUNET_SET_ListenHandle *intersection_listen;
114
115   /**
116    * channel-handle associated with our cadet handle
117    */
118   struct GNUNET_CADET_Channel *channel;
119
120   /**
121    * a(Alice), sorted array by key of length @e used_element_count.
122    */
123   struct MpiElement *sorted_elements;
124
125   /**
126    * Bob's permutation p of R
127    */
128   struct GNUNET_CRYPTO_PaillierCiphertext *r;
129
130   /**
131    * Bob's permutation q of R
132    */
133   struct GNUNET_CRYPTO_PaillierCiphertext *r_prime;
134
135   /**
136    * Bob's "s"
137    */
138   struct GNUNET_CRYPTO_PaillierCiphertext s;
139
140   /**
141    * Bob's "s'"
142    */
143   struct GNUNET_CRYPTO_PaillierCiphertext s_prime;
144
145   /**
146    * The computed scalar
147    */
148   gcry_mpi_t product;
149
150   /**
151    * How many elements we were supplied with from the client (total
152    * count before intersection).
153    */
154   uint32_t total;
155
156   /**
157    * How many elements actually are used for the scalar product.
158    * Size of the arrays in @e r and @e r_prime.  Sometimes also
159    * reset to 0 and used as a counter!
160    */
161   uint32_t used_element_count;
162
163   /**
164    * Already transferred elements from client to us.
165    * Less or equal than @e total.
166    */
167   uint32_t transferred_element_count;
168
169   /**
170    * Is this session active (#GNUNET_YES), Concluded (#GNUNET_NO), or
171    * had an error (#GNUNET_SYSERR).
172    * FIXME: replace with proper enum for status codes!
173    */
174   int32_t active;
175
176   /**
177    * Flag to prevent recursive calls to #destroy_service_session() from
178    * doing harm.
179    */
180   int in_destroy;
181
182 };
183
184
185 /**
186  * GNUnet configuration handle
187  */
188 static const struct GNUNET_CONFIGURATION_Handle *cfg;
189
190 /**
191  * Service's own public key
192  */
193 static struct GNUNET_CRYPTO_PaillierPublicKey my_pubkey;
194
195 /**
196  * Service's own private key
197  */
198 static struct GNUNET_CRYPTO_PaillierPrivateKey my_privkey;
199
200 /**
201  * Service's offset for values that could possibly be negative but are plaintext for encryption.
202  */
203 static gcry_mpi_t my_offset;
204
205 /**
206  * Handle to the CADET service.
207  */
208 static struct GNUNET_CADET_Handle *my_cadet;
209
210
211 /**
212  * Iterator called to free elements.
213  *
214  * @param cls the `struct AliceServiceSession *` (unused)
215  * @param key the key (unused)
216  * @param value value to free
217  * @return #GNUNET_OK (continue to iterate)
218  */
219 static int
220 free_element (void *cls,
221               const struct GNUNET_HashCode *key,
222               void *value)
223 {
224   struct GNUNET_SCALARPRODUCT_Element *e = value;
225
226   GNUNET_free (e);
227   return GNUNET_OK;
228 }
229
230
231 /**
232  * Destroy session state, we are done with it.
233  *
234  * @param s the session to free elements from
235  */
236 static void
237 destroy_service_session (struct AliceServiceSession *s)
238 {
239   unsigned int i;
240
241   if (GNUNET_YES == s->in_destroy)
242     return;
243   s->in_destroy = GNUNET_YES;
244   if (NULL != s->client_mq)
245   {
246     GNUNET_MQ_destroy (s->client_mq);
247     s->client_mq = NULL;
248   }
249   if (NULL != s->cadet_mq)
250   {
251     GNUNET_MQ_destroy (s->cadet_mq);
252     s->cadet_mq = NULL;
253   }
254   if (NULL != s->client)
255   {
256     GNUNET_SERVER_client_set_user_context (s->client,
257                                            NULL);
258     GNUNET_SERVER_client_disconnect (s->client);
259     s->client = NULL;
260   }
261   if (NULL != s->channel)
262   {
263     GNUNET_CADET_channel_destroy (s->channel);
264     s->channel = NULL;
265   }
266   if (NULL != s->intersected_elements)
267   {
268     GNUNET_CONTAINER_multihashmap_iterate (s->intersected_elements,
269                                            &free_element,
270                                            s);
271     GNUNET_CONTAINER_multihashmap_destroy (s->intersected_elements);
272     s->intersected_elements = NULL;
273   }
274   if (NULL != s->intersection_listen)
275   {
276     GNUNET_SET_listen_cancel (s->intersection_listen);
277     s->intersection_listen = NULL;
278   }
279   if (NULL != s->intersection_op)
280   {
281     GNUNET_SET_operation_cancel (s->intersection_op);
282     s->intersection_op = NULL;
283   }
284   if (NULL != s->intersection_set)
285   {
286     GNUNET_SET_destroy (s->intersection_set);
287     s->intersection_set = NULL;
288   }
289   if (NULL != s->sorted_elements)
290   {
291     for (i=0;i<s->used_element_count;i++)
292       gcry_mpi_release (s->sorted_elements[i].value);
293     GNUNET_free (s->sorted_elements);
294     s->sorted_elements = NULL;
295   }
296   if (NULL != s->r)
297   {
298     GNUNET_free (s->r);
299     s->r = NULL;
300   }
301   if (NULL != s->r_prime)
302   {
303     GNUNET_free (s->r_prime);
304     s->r_prime = NULL;
305   }
306   if (NULL != s->product)
307   {
308     gcry_mpi_release (s->product);
309     s->product = NULL;
310   }
311   GNUNET_free (s);
312 }
313
314
315 /**
316  * Notify the client that the session has failed.  A message gets sent
317  * to Alice's client if we encountered any error.
318  *
319  * @param session the associated client session to fail or succeed
320  */
321 static void
322 prepare_client_end_notification (struct AliceServiceSession *session)
323 {
324   struct ClientResponseMessage *msg;
325   struct GNUNET_MQ_Envelope *e;
326
327   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
328               "Sending session-end notification with status %d to client for session %s\n",
329               session->active,
330               GNUNET_h2s (&session->session_id));
331   e = GNUNET_MQ_msg (msg,
332                      GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
333   msg->product_length = htonl (0);
334   msg->status = htonl (session->active);
335   GNUNET_MQ_send (session->client_mq,
336                   e);
337 }
338
339
340 /**
341  * Prepare the final (positive) response we will send to Alice's
342  * client.
343  *
344  * @param s the session associated with our client.
345  */
346 static void
347 transmit_client_response (struct AliceServiceSession *s)
348 {
349   struct ClientResponseMessage *msg;
350   struct GNUNET_MQ_Envelope *e;
351   unsigned char *product_exported = NULL;
352   size_t product_length = 0;
353   int32_t range;
354   gcry_error_t rc;
355   int sign;
356   gcry_mpi_t value;
357
358   if (NULL == s->product)
359   {
360     GNUNET_break (0);
361     prepare_client_end_notification (s);
362     return;
363   }
364   value = gcry_mpi_new (0);
365   sign = gcry_mpi_cmp_ui (s->product, 0);
366   if (0 > sign)
367   {
368     range = -1;
369     gcry_mpi_sub (value,
370                   value,
371                   s->product);
372   }
373   else if (0 < sign)
374   {
375     range = 1;
376     gcry_mpi_add (value, value, s->product);
377   }
378   else
379   {
380     /* result is exactly zero */
381     range = 0;
382   }
383   gcry_mpi_release (s->product);
384   s->product = NULL;
385
386   if ( (0 != range) &&
387        (0 != (rc = gcry_mpi_aprint (GCRYMPI_FMT_STD,
388                                     &product_exported,
389                                     &product_length,
390                                     value))))
391   {
392     LOG_GCRY (GNUNET_ERROR_TYPE_ERROR,
393               "gcry_mpi_scan",
394               rc);
395     prepare_client_end_notification (s);
396     return;
397   }
398   gcry_mpi_release (value);
399   e = GNUNET_MQ_msg_extra (msg,
400                            product_length,
401                            GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
402   msg->range = htonl (range);
403   msg->product_length = htonl (product_length);
404   if (NULL != product_exported)
405   {
406     memcpy (&msg[1],
407             product_exported,
408             product_length);
409     GNUNET_free (product_exported);
410   }
411   GNUNET_MQ_send (s->client_mq,
412                   e);
413   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
414               "Sent result to client, session %s has ended!\n",
415               GNUNET_h2s (&s->session_id));
416 }
417
418
419
420 /**
421  * Function called whenever a channel is destroyed.  Should clean up
422  * any associated state.
423  *
424  * It must NOT call #GNUNET_CADET_channel_destroy() on the channel.
425  *
426  * @param cls closure (set from #GNUNET_CADET_connect())
427  * @param channel connection to the other end (henceforth invalid)
428  * @param channel_ctx place where local state associated
429  *                   with the channel is stored
430  */
431 static void
432 cb_channel_destruction (void *cls,
433                         const struct GNUNET_CADET_Channel *channel,
434                         void *channel_ctx)
435 {
436   struct AliceServiceSession *s = channel_ctx;
437
438   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
439               "Peer disconnected, terminating session %s with peer %s\n",
440               GNUNET_h2s (&s->session_id),
441               GNUNET_i2s (&s->peer));
442   s->channel = NULL;
443   if (GNUNET_YES == s->active)
444   {
445     /* We didn't get an answer yet, fail with error */
446     s->active = GNUNET_SYSERR;
447     prepare_client_end_notification (s);
448   }
449 }
450
451
452 /**
453  * Computes the square sum over a vector of a given length.
454  *
455  * @param vector the vector to compute over
456  * @param length the length of the vector
457  * @return an MPI value containing the calculated sum, never NULL
458  */
459 static gcry_mpi_t
460 compute_square_sum_mpi_elements (const struct MpiElement *vector,
461                                  uint32_t length)
462 {
463   gcry_mpi_t elem;
464   gcry_mpi_t sum;
465   uint32_t i;
466
467   GNUNET_assert (NULL != (sum = gcry_mpi_new (0)));
468   GNUNET_assert (NULL != (elem = gcry_mpi_new (0)));
469   for (i = 0; i < length; i++)
470   {
471     gcry_mpi_mul (elem, vector[i].value, vector[i].value);
472     gcry_mpi_add (sum, sum, elem);
473   }
474   gcry_mpi_release (elem);
475   return sum;
476 }
477
478
479 /**
480  * Computes the square sum over a vector of a given length.
481  *
482  * @param vector the vector to compute over
483  * @param length the length of the vector
484  * @return an MPI value containing the calculated sum, never NULL
485  */
486 static gcry_mpi_t
487 compute_square_sum (const gcry_mpi_t *vector,
488                     uint32_t length)
489 {
490   gcry_mpi_t elem;
491   gcry_mpi_t sum;
492   uint32_t i;
493
494   GNUNET_assert (NULL != (sum = gcry_mpi_new (0)));
495   GNUNET_assert (NULL != (elem = gcry_mpi_new (0)));
496   for (i = 0; i < length; i++)
497   {
498     gcry_mpi_mul (elem, vector[i], vector[i]);
499     gcry_mpi_add (sum, sum, elem);
500   }
501   gcry_mpi_release (elem);
502   return sum;
503 }
504
505
506 /**
507  * Compute our scalar product, done by Alice
508  *
509  * @param session the session associated with this computation
510  * @return product as MPI, never NULL
511  */
512 static gcry_mpi_t
513 compute_scalar_product (struct AliceServiceSession *session)
514 {
515   uint32_t count;
516   gcry_mpi_t t;
517   gcry_mpi_t u;
518   gcry_mpi_t u_prime;
519   gcry_mpi_t p;
520   gcry_mpi_t p_prime;
521   gcry_mpi_t tmp;
522   gcry_mpi_t r[session->used_element_count];
523   gcry_mpi_t r_prime[session->used_element_count];
524   gcry_mpi_t s;
525   gcry_mpi_t s_prime;
526   unsigned int i;
527
528   count = session->used_element_count;
529   // due to the introduced static offset S, we now also have to remove this
530   // from the E(a_pi)(+)E(-b_pi-r_pi) and E(a_qi)(+)E(-r_qi) twice each,
531   // the result is E((S + a_pi) + (S -b_pi-r_pi)) and E(S + a_qi + S - r_qi)
532   for (i = 0; i < count; i++)
533   {
534     r[i] = gcry_mpi_new (0);
535     GNUNET_CRYPTO_paillier_decrypt (&my_privkey,
536                                     &my_pubkey,
537                                     &session->r[i],
538                                     r[i]);
539     gcry_mpi_sub (r[i],
540                   r[i],
541                   my_offset);
542     gcry_mpi_sub (r[i],
543                   r[i],
544                   my_offset);
545     r_prime[i] = gcry_mpi_new (0);
546     GNUNET_CRYPTO_paillier_decrypt (&my_privkey,
547                                     &my_pubkey,
548                                     &session->r_prime[i],
549                                     r_prime[i]);
550     gcry_mpi_sub (r_prime[i],
551                   r_prime[i],
552                   my_offset);
553     gcry_mpi_sub (r_prime[i],
554                   r_prime[i],
555                   my_offset);
556   }
557
558   // calculate t = sum(ai)
559   t = compute_square_sum_mpi_elements (session->sorted_elements,
560                                        count);
561   // calculate U
562   u = gcry_mpi_new (0);
563   tmp = compute_square_sum (r, count);
564   gcry_mpi_sub (u, u, tmp);
565   gcry_mpi_release (tmp);
566
567   //calculate U'
568   u_prime = gcry_mpi_new (0);
569   tmp = compute_square_sum (r_prime, count);
570   gcry_mpi_sub (u_prime, u_prime, tmp);
571
572   GNUNET_assert (p = gcry_mpi_new (0));
573   GNUNET_assert (p_prime = gcry_mpi_new (0));
574   GNUNET_assert (s = gcry_mpi_new (0));
575   GNUNET_assert (s_prime = gcry_mpi_new (0));
576
577   // compute P
578   GNUNET_CRYPTO_paillier_decrypt (&my_privkey,
579                                   &my_pubkey,
580                                   &session->s,
581                                   s);
582   GNUNET_CRYPTO_paillier_decrypt (&my_privkey,
583                                   &my_pubkey,
584                                   &session->s_prime,
585                                   s_prime);
586
587   // compute P
588   gcry_mpi_add (p, s, t);
589   gcry_mpi_add (p, p, u);
590
591   // compute P'
592   gcry_mpi_add (p_prime, s_prime, t);
593   gcry_mpi_add (p_prime, p_prime, u_prime);
594
595   gcry_mpi_release (t);
596   gcry_mpi_release (u);
597   gcry_mpi_release (u_prime);
598   gcry_mpi_release (s);
599   gcry_mpi_release (s_prime);
600
601   // compute product
602   gcry_mpi_sub (p, p, p_prime);
603   gcry_mpi_release (p_prime);
604   tmp = gcry_mpi_set_ui (tmp, 2);
605   gcry_mpi_div (p, NULL, p, tmp, 0);
606
607   gcry_mpi_release (tmp);
608   for (i = 0; i < count; i++)
609   {
610     gcry_mpi_release (session->sorted_elements[i].value);
611     gcry_mpi_release (r[i]);
612     gcry_mpi_release (r_prime[i]);
613   }
614   GNUNET_free (session->sorted_elements);
615   session->sorted_elements = NULL;
616   GNUNET_free (session->r);
617   session->r = NULL;
618   GNUNET_free (session->r_prime);
619   session->r_prime = NULL;
620
621   return p;
622 }
623
624
625 /**
626  * Handle a multipart chunk of a response we got from another service
627  * we wanted to calculate a scalarproduct with.
628  *
629  * @param cls closure (set from #GNUNET_CADET_connect)
630  * @param channel connection to the other end
631  * @param channel_ctx place to store local state associated with the @a channel
632  * @param message the actual message
633  * @return #GNUNET_OK to keep the connection open,
634  *         #GNUNET_SYSERR to close it (signal serious error)
635  */
636 static int
637 handle_bobs_cryptodata_multipart (void *cls,
638                                   struct GNUNET_CADET_Channel *channel,
639                                   void **channel_ctx,
640                                   const struct GNUNET_MessageHeader *message)
641 {
642   struct AliceServiceSession *s = *channel_ctx;
643   const struct MultipartMessage *msg;
644   const struct GNUNET_CRYPTO_PaillierCiphertext *payload;
645   size_t i;
646   uint32_t contained;
647   size_t msg_size;
648   size_t required_size;
649
650   if (NULL == s)
651   {
652     GNUNET_break_op (0);
653     return GNUNET_SYSERR;
654   }
655   msg_size = ntohs (message->size);
656   if (sizeof (struct MultipartMessage) > msg_size)
657   {
658     GNUNET_break_op (0);
659     return GNUNET_SYSERR;
660   }
661   msg = (const struct MultipartMessage *) message;
662   contained = ntohl (msg->contained_element_count);
663   required_size = sizeof (struct MultipartMessage)
664     + 2 * contained * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
665   if ( (required_size != msg_size) ||
666        (s->transferred_element_count + contained > s->used_element_count) )
667   {
668     GNUNET_break (0);
669     return GNUNET_SYSERR;
670   }
671
672   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
673               "Received %u additional crypto values from Bob\n",
674               (unsigned int) contained);
675
676   payload = (const struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
677   /* Convert each k[][perm] to its MPI_value */
678   for (i = 0; i < contained; i++)
679   {
680     memcpy (&s->r[s->transferred_element_count + i],
681             &payload[2 * i],
682             sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
683     memcpy (&s->r_prime[s->transferred_element_count + i],
684             &payload[2 * i],
685             sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
686   }
687   s->transferred_element_count += contained;
688   if (s->transferred_element_count != s->used_element_count)
689     return GNUNET_OK;
690
691   s->product = compute_scalar_product (s);
692   transmit_client_response (s);
693   return GNUNET_OK;
694 }
695
696
697 /**
698  * Handle a response we got from another service we wanted to
699  * calculate a scalarproduct with.
700  *
701  * @param cls closure (set from #GNUNET_CADET_connect)
702  * @param channel connection to the other end
703  * @param channel_ctx place to store local state associated with the channel
704  * @param message the actual message
705  * @return #GNUNET_OK to keep the connection open,
706  *         #GNUNET_SYSERR to close it (we are done)
707  */
708 static int
709 handle_bobs_cryptodata_message (void *cls,
710                                 struct GNUNET_CADET_Channel *channel,
711                                 void **channel_ctx,
712                                 const struct GNUNET_MessageHeader *message)
713 {
714   struct AliceServiceSession *s = *channel_ctx;
715   const struct ServiceResponseMessage *msg;
716   const struct GNUNET_CRYPTO_PaillierCiphertext *payload;
717   uint32_t i;
718   uint32_t contained;
719   uint16_t msg_size;
720   size_t required_size;
721
722   if (NULL == s)
723   {
724     GNUNET_break_op (0);
725     return GNUNET_SYSERR;
726   }
727   msg_size = ntohs (message->size);
728   if (sizeof (struct ServiceResponseMessage) > msg_size)
729   {
730     GNUNET_break_op (0);
731     return GNUNET_SYSERR;
732   }
733   msg = (const struct ServiceResponseMessage *) message;
734   contained = ntohl (msg->contained_element_count);
735   required_size = sizeof (struct ServiceResponseMessage)
736     + 2 * contained * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)
737     + 2 * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
738   if ( (msg_size != required_size) ||
739        (contained > UINT16_MAX) ||
740        (s->used_element_count < contained) )
741   {
742     GNUNET_break_op (0);
743     return GNUNET_SYSERR;
744   }
745   if ( (NULL == s->sorted_elements) ||
746        (s->used_element_count != s->transferred_element_count) )
747   {
748     /* we're not ready yet, how can Bob be? */
749     GNUNET_break_op (0);
750     return GNUNET_SYSERR;
751   }
752
753   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
754               "Received %u crypto values from Bob\n",
755               (unsigned int) contained);
756
757   payload = (const struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
758   memcpy (&s->s,
759           &payload[0],
760           sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
761   memcpy (&s->s_prime,
762           &payload[1],
763           sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
764   payload = &payload[2];
765
766   s->r = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * s->used_element_count);
767   s->r_prime = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * s->used_element_count);
768   for (i = 0; i < contained; i++)
769   {
770     memcpy (&s->r[i],
771             &payload[2 * i],
772             sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
773     memcpy (&s->r_prime[i],
774             &payload[2 * i + 1],
775             sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
776   }
777   s->transferred_element_count = contained;
778
779   if (s->transferred_element_count != s->used_element_count)
780   {
781     /* More to come */
782     return GNUNET_OK;
783   }
784
785   s->product = compute_scalar_product (s);
786   transmit_client_response (s);
787   return GNUNET_OK;
788 }
789
790
791 /**
792  * Iterator to copy over messages from the hash map
793  * into an array for sorting.
794  *
795  * @param cls the `struct AliceServiceSession *`
796  * @param key the key (unused)
797  * @param value the `struct GNUNET_SCALARPRODUCT_Element *`
798  */
799 static int
800 copy_element_cb (void *cls,
801                  const struct GNUNET_HashCode *key,
802                  void *value)
803 {
804   struct AliceServiceSession *s = cls;
805   struct GNUNET_SCALARPRODUCT_Element *e = value;
806   gcry_mpi_t mval;
807   int64_t val;
808
809   mval = gcry_mpi_new (0);
810   val = (int64_t) GNUNET_ntohll (e->value);
811   if (0 > val)
812     gcry_mpi_sub_ui (mval, mval, -val);
813   else
814     gcry_mpi_add_ui (mval, mval, val);
815   s->sorted_elements [s->used_element_count].value = mval;
816   s->sorted_elements [s->used_element_count].key = &e->key;
817   s->used_element_count++;
818   return GNUNET_OK;
819 }
820
821
822 /**
823  * Compare two `struct MpiValue`s by key for sorting.
824  *
825  * @param a pointer to first `struct MpiValue *`
826  * @param b pointer to first `struct MpiValue *`
827  * @return -1 for a < b, 0 for a=b, 1 for a > b.
828  */
829 static int
830 element_cmp (const void *a,
831              const void *b)
832 {
833   const struct MpiElement *ma = a;
834   const struct MpiElement *mb = b;
835
836   return GNUNET_CRYPTO_hash_cmp (ma->key,
837                                  mb->key);
838 }
839
840
841 /**
842  * Maximum number of elements we can put into a single cryptodata
843  * message
844  */
845 #define ELEMENT_CAPACITY ((GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct AliceCryptodataMessage)) / sizeof (struct GNUNET_CRYPTO_PaillierCiphertext))
846
847
848 /**
849  * Send the cryptographic data from Alice to Bob.
850  * Does nothing if we already transferred all elements.
851  *
852  * @param s the associated service session
853  */
854 static void
855 send_alices_cryptodata_message (struct AliceServiceSession *s)
856 {
857   struct AliceCryptodataMessage *msg;
858   struct GNUNET_MQ_Envelope *e;
859   struct GNUNET_CRYPTO_PaillierCiphertext *payload;
860   unsigned int i;
861   uint32_t todo_count;
862   gcry_mpi_t a;
863   uint32_t off;
864
865   s->sorted_elements
866     = GNUNET_malloc (GNUNET_CONTAINER_multihashmap_size (s->intersected_elements) *
867                      sizeof (struct MpiElement));
868   s->used_element_count = 0;
869   GNUNET_CONTAINER_multihashmap_iterate (s->intersected_elements,
870                                          &copy_element_cb,
871                                          s);
872   LOG (GNUNET_ERROR_TYPE_DEBUG,
873        "Finished intersection, %d items remain\n",
874        s->used_element_count);
875   qsort (s->sorted_elements,
876          s->used_element_count,
877          sizeof (struct MpiElement),
878          &element_cmp);
879   off = 0;
880   while (off < s->used_element_count)
881   {
882     todo_count = s->used_element_count - off;
883     if (todo_count > ELEMENT_CAPACITY)
884       todo_count = ELEMENT_CAPACITY;
885     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886                 "Sending %u crypto values to Bob\n",
887                 (unsigned int) todo_count);
888
889     e = GNUNET_MQ_msg_extra (msg,
890                              todo_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext),
891                              GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA);
892     msg->contained_element_count = htonl (todo_count);
893     payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
894     a = gcry_mpi_new (0);
895     for (i = off; i < todo_count; i++)
896     {
897       gcry_mpi_add (a,
898                     s->sorted_elements[i].value,
899                     my_offset);
900       GNUNET_CRYPTO_paillier_encrypt (&my_pubkey,
901                                       a,
902                                       3,
903                                       &payload[i - off]);
904     }
905     gcry_mpi_release (a);
906     off += todo_count;
907     GNUNET_MQ_send (s->cadet_mq,
908                     e);
909   }
910 }
911
912
913 /**
914  * Callback for set operation results. Called for each element
915  * that should be removed from the result set, and then once
916  * to indicate that the set intersection operation is done.
917  *
918  * @param cls closure with the `struct AliceServiceSession`
919  * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
920  * @param status what has happened with the set intersection?
921  */
922 static void
923 cb_intersection_element_removed (void *cls,
924                                  const struct GNUNET_SET_Element *element,
925                                  enum GNUNET_SET_Status status)
926 {
927   struct AliceServiceSession *s = cls;
928   struct GNUNET_SCALARPRODUCT_Element *se;
929
930   switch (status)
931   {
932   case GNUNET_SET_STATUS_OK:
933     /* this element has been removed from the set */
934     se = GNUNET_CONTAINER_multihashmap_get (s->intersected_elements,
935                                             element->data);
936     GNUNET_assert (NULL != se);
937     LOG (GNUNET_ERROR_TYPE_DEBUG,
938          "Intersection removed element with key %s and value %lld\n",
939          GNUNET_h2s (&se->key),
940          (long long) GNUNET_ntohll (se->value));
941     GNUNET_assert (GNUNET_YES ==
942                    GNUNET_CONTAINER_multihashmap_remove (s->intersected_elements,
943                                                          element->data,
944                                                          se));
945     GNUNET_free (se);
946     return;
947   case GNUNET_SET_STATUS_DONE:
948     s->intersection_op = NULL;
949     s->intersection_set = NULL;
950     send_alices_cryptodata_message (s);
951     return;
952   case GNUNET_SET_STATUS_HALF_DONE:
953     /* unexpected for intersection */
954     GNUNET_break (0);
955     return;
956   case GNUNET_SET_STATUS_FAILURE:
957     /* unhandled status code */
958     LOG (GNUNET_ERROR_TYPE_DEBUG,
959          "Set intersection failed!\n");
960     if (NULL != s->intersection_listen)
961     {
962       GNUNET_SET_listen_cancel (s->intersection_listen);
963       s->intersection_listen = NULL;
964     }
965     s->intersection_op = NULL;
966     s->intersection_set = NULL;
967     s->active = GNUNET_SYSERR;
968     prepare_client_end_notification (s);
969     return;
970   default:
971     GNUNET_break (0);
972     return;
973   }
974 }
975
976
977 /**
978  * Called when another peer wants to do a set operation with the
979  * local peer. If a listen error occurs, the @a request is NULL.
980  *
981  * @param cls closure with the `struct AliceServiceSession *`
982  * @param other_peer the other peer
983  * @param context_msg message with application specific information from
984  *        the other peer
985  * @param request request from the other peer (never NULL), use GNUNET_SET_accept()
986  *        to accept it, otherwise the request will be refused
987  *        Note that we can't just return value from the listen callback,
988  *        as it is also necessary to specify the set we want to do the
989  *        operation with, whith sometimes can be derived from the context
990  *        message. It's necessary to specify the timeout.
991  */
992 static void
993 cb_intersection_request_alice (void *cls,
994                                const struct GNUNET_PeerIdentity *other_peer,
995                                const struct GNUNET_MessageHeader *context_msg,
996                                struct GNUNET_SET_Request *request)
997 {
998   struct AliceServiceSession *s = cls;
999
1000   if (0 != memcmp (other_peer,
1001                    &s->peer,
1002                    sizeof (struct GNUNET_PeerIdentity)))
1003   {
1004     GNUNET_break_op (0);
1005     return;
1006   }
1007   s->intersection_op
1008     = GNUNET_SET_accept (request,
1009                          GNUNET_SET_RESULT_REMOVED,
1010                          &cb_intersection_element_removed,
1011                          s);
1012   if (NULL == s->intersection_op)
1013   {
1014     GNUNET_break (0);
1015     s->active = GNUNET_SYSERR;
1016     prepare_client_end_notification (s);
1017     return;
1018   }
1019   if (GNUNET_OK !=
1020       GNUNET_SET_commit (s->intersection_op,
1021                          s->intersection_set))
1022   {
1023     s->active = GNUNET_SYSERR;
1024     prepare_client_end_notification (s);
1025     return;
1026   }
1027   s->intersection_set = NULL;
1028   s->intersection_listen = NULL;
1029 }
1030
1031
1032 /**
1033  * Our client has finished sending us its multipart message.
1034  *
1035  * @param session the service session context
1036  */
1037 static void
1038 client_request_complete_alice (struct AliceServiceSession *s)
1039 {
1040   struct ServiceRequestMessage *msg;
1041   struct GNUNET_MQ_Envelope *e;
1042
1043   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044               "Creating new channel for session with key %s.\n",
1045               GNUNET_h2s (&s->session_id));
1046   s->channel
1047     = GNUNET_CADET_channel_create (my_cadet,
1048                                    s,
1049                                    &s->peer,
1050                                    GNUNET_APPLICATION_TYPE_SCALARPRODUCT,
1051                                    GNUNET_CADET_OPTION_RELIABLE);
1052   if (NULL == s->channel)
1053   {
1054     s->active = GNUNET_SYSERR;
1055     prepare_client_end_notification (s);
1056     return;
1057   }
1058   s->cadet_mq = GNUNET_CADET_mq_create (s->channel);
1059   s->intersection_listen
1060     = GNUNET_SET_listen (cfg,
1061                          GNUNET_SET_OPERATION_INTERSECTION,
1062                          &s->session_id,
1063                          &cb_intersection_request_alice,
1064                          s);
1065   if (NULL == s->intersection_listen)
1066   {
1067     s->active = GNUNET_SYSERR;
1068     GNUNET_CADET_channel_destroy (s->channel);
1069     s->channel = NULL;
1070     prepare_client_end_notification (s);
1071     return;
1072   }
1073
1074   e = GNUNET_MQ_msg (msg,
1075                      GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SESSION_INITIALIZATION);
1076   msg->session_id = s->session_id;
1077   GNUNET_MQ_send (s->cadet_mq,
1078                   e);
1079 }
1080
1081
1082 /**
1083  * We're receiving additional set data. Add it to our
1084  * set and if we are done, initiate the transaction.
1085  *
1086  * @param cls closure
1087  * @param client identification of the client
1088  * @param message the actual message
1089  */
1090 static void
1091 GSS_handle_alice_client_message_multipart (void *cls,
1092                                            struct GNUNET_SERVER_Client *client,
1093                                            const struct GNUNET_MessageHeader *message)
1094 {
1095   const struct ComputationMultipartMessage * msg;
1096   struct AliceServiceSession *s;
1097   uint32_t contained_count;
1098   const struct GNUNET_SCALARPRODUCT_Element *elements;
1099   uint32_t i;
1100   uint16_t msize;
1101   struct GNUNET_SET_Element set_elem;
1102   struct GNUNET_SCALARPRODUCT_Element *elem;
1103
1104   s = GNUNET_SERVER_client_get_user_context (client,
1105                                              struct AliceServiceSession);
1106   if (NULL == s)
1107   {
1108     /* session needs to already exist */
1109     GNUNET_break (0);
1110     GNUNET_SERVER_receive_done (client,
1111                                 GNUNET_SYSERR);
1112     return;
1113   }
1114   msize = ntohs (message->size);
1115   if (msize < sizeof (struct ComputationMultipartMessage))
1116   {
1117     GNUNET_break (0);
1118     GNUNET_SERVER_receive_done (client,
1119                                 GNUNET_SYSERR);
1120     return;
1121   }
1122   msg = (const struct ComputationMultipartMessage *) message;
1123   contained_count = ntohl (msg->element_count_contained);
1124
1125   if ( (msize != (sizeof (struct ComputationMultipartMessage) +
1126                   contained_count * sizeof (struct GNUNET_SCALARPRODUCT_Element))) ||
1127        (0 == contained_count) ||
1128        (s->total == s->transferred_element_count) ||
1129        (s->total < s->transferred_element_count + contained_count) )
1130   {
1131     GNUNET_break_op (0);
1132     GNUNET_SERVER_receive_done (client,
1133                                 GNUNET_SYSERR);
1134     return;
1135   }
1136   s->transferred_element_count += contained_count;
1137   elements = (const struct GNUNET_SCALARPRODUCT_Element *) &msg[1];
1138   for (i = 0; i < contained_count; i++)
1139   {
1140     if (0 == GNUNET_ntohll (elements[i].value))
1141       continue;
1142     elem = GNUNET_new (struct GNUNET_SCALARPRODUCT_Element);
1143     memcpy (elem,
1144             &elements[i],
1145             sizeof (struct GNUNET_SCALARPRODUCT_Element));
1146     if (GNUNET_SYSERR ==
1147         GNUNET_CONTAINER_multihashmap_put (s->intersected_elements,
1148                                            &elem->key,
1149                                            elem,
1150                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1151     {
1152       GNUNET_break (0);
1153       GNUNET_free (elem);
1154       continue;
1155     }
1156     set_elem.data = &elem->key;
1157     set_elem.size = sizeof (elem->key);
1158     set_elem.element_type = 0;
1159     GNUNET_SET_add_element (s->intersection_set,
1160                             &set_elem,
1161                             NULL, NULL);
1162     s->used_element_count++;
1163   }
1164   GNUNET_SERVER_receive_done (client,
1165                               GNUNET_OK);
1166   if (s->total != s->transferred_element_count)
1167   {
1168     /* more to come */
1169     return;
1170   }
1171   client_request_complete_alice (s);
1172 }
1173
1174
1175 /**
1176  * Handler for Alice's client request message.
1177  * We are doing request-initiation to compute a scalar product with a peer.
1178  *
1179  * @param cls closure
1180  * @param client identification of the client
1181  * @param message the actual message
1182  */
1183 static void
1184 GSS_handle_alice_client_message (void *cls,
1185                                  struct GNUNET_SERVER_Client *client,
1186                                  const struct GNUNET_MessageHeader *message)
1187 {
1188   const struct AliceComputationMessage *msg;
1189   struct AliceServiceSession *s;
1190   uint32_t contained_count;
1191   uint32_t total_count;
1192   const struct GNUNET_SCALARPRODUCT_Element *elements;
1193   uint32_t i;
1194   uint16_t msize;
1195   struct GNUNET_SET_Element set_elem;
1196   struct GNUNET_SCALARPRODUCT_Element *elem;
1197
1198   s = GNUNET_SERVER_client_get_user_context (client,
1199                                              struct AliceServiceSession);
1200   if (NULL != s)
1201   {
1202     /* only one concurrent session per client connection allowed,
1203        simplifies logic a lot... */
1204     GNUNET_break (0);
1205     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1206     return;
1207   }
1208   msize = ntohs (message->size);
1209   if (msize < sizeof (struct AliceComputationMessage))
1210   {
1211     GNUNET_break (0);
1212     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1213     return;
1214   }
1215   msg = (const struct AliceComputationMessage *) message;
1216   total_count = ntohl (msg->element_count_total);
1217   contained_count = ntohl (msg->element_count_contained);
1218   if ( (0 == total_count) ||
1219        (0 == contained_count) ||
1220        (msize != (sizeof (struct AliceComputationMessage) +
1221                   contained_count * sizeof (struct GNUNET_SCALARPRODUCT_Element))) )
1222   {
1223     GNUNET_break_op (0);
1224     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1225     return;
1226   }
1227
1228   s = GNUNET_new (struct AliceServiceSession);
1229   s->peer = msg->peer;
1230   s->active = GNUNET_YES;
1231   s->client = client;
1232   s->client_mq = GNUNET_MQ_queue_for_server_client (client);
1233   s->total = total_count;
1234   s->transferred_element_count = contained_count;
1235   s->session_id = msg->session_key;
1236   elements = (const struct GNUNET_SCALARPRODUCT_Element *) &msg[1];
1237   s->intersected_elements = GNUNET_CONTAINER_multihashmap_create (s->total,
1238                                                                   GNUNET_YES);
1239   s->intersection_set = GNUNET_SET_create (cfg,
1240                                            GNUNET_SET_OPERATION_INTERSECTION);
1241   for (i = 0; i < contained_count; i++)
1242   {
1243     if (0 == GNUNET_ntohll (elements[i].value))
1244       continue;
1245     elem = GNUNET_new (struct GNUNET_SCALARPRODUCT_Element);
1246     memcpy (elem,
1247             &elements[i],
1248             sizeof (struct GNUNET_SCALARPRODUCT_Element));
1249     if (GNUNET_SYSERR ==
1250         GNUNET_CONTAINER_multihashmap_put (s->intersected_elements,
1251                                            &elem->key,
1252                                            elem,
1253                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1254     {
1255       /* element with same key encountered twice! */
1256       GNUNET_break (0);
1257       GNUNET_free (elem);
1258       continue;
1259     }
1260     set_elem.data = &elem->key;
1261     set_elem.size = sizeof (elem->key);
1262     set_elem.element_type = 0;
1263     GNUNET_SET_add_element (s->intersection_set,
1264                             &set_elem,
1265                             NULL, NULL);
1266     s->used_element_count++;
1267   }
1268   GNUNET_SERVER_client_set_user_context (client,
1269                                          s);
1270   GNUNET_SERVER_receive_done (client,
1271                               GNUNET_OK);
1272   if (s->total != s->transferred_element_count)
1273   {
1274     /* wait for multipart msg */
1275     return;
1276   }
1277   client_request_complete_alice (s);
1278 }
1279
1280
1281 /**
1282  * Task run during shutdown.
1283  *
1284  * @param cls unused
1285  * @param tc unused
1286  */
1287 static void
1288 shutdown_task (void *cls,
1289                const struct GNUNET_SCHEDULER_TaskContext *tc)
1290 {
1291   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1292               "Shutting down, initiating cleanup.\n");
1293   if (NULL != my_cadet)
1294   {
1295     GNUNET_CADET_disconnect (my_cadet);
1296     my_cadet = NULL;
1297   }
1298 }
1299
1300
1301 /**
1302  * A client disconnected.
1303  *
1304  * Remove the associated session(s), release data structures
1305  * and cancel pending outgoing transmissions to the client.
1306  *
1307  * @param cls closure, NULL
1308  * @param client identification of the client
1309  */
1310 static void
1311 handle_client_disconnect (void *cls,
1312                           struct GNUNET_SERVER_Client *client)
1313 {
1314   struct AliceServiceSession *s;
1315
1316   if (NULL == client)
1317     return;
1318   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1319               "Client %p disconnected from us.\n",
1320               client);
1321   s = GNUNET_SERVER_client_get_user_context (client,
1322                                              struct AliceServiceSession);
1323   if (NULL == s)
1324     return;
1325   s->client = NULL;
1326   GNUNET_SERVER_client_set_user_context (client,
1327                                          NULL);
1328   destroy_service_session (s);
1329 }
1330
1331
1332 /**
1333  * Initialization of the program and message handlers
1334  *
1335  * @param cls closure
1336  * @param server the initialized server
1337  * @param c configuration to use
1338  */
1339 static void
1340 run (void *cls,
1341      struct GNUNET_SERVER_Handle *server,
1342      const struct GNUNET_CONFIGURATION_Handle *c)
1343 {
1344   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1345     { &handle_bobs_cryptodata_message,
1346       GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA,
1347       0},
1348     { &handle_bobs_cryptodata_multipart,
1349       GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA_MULTIPART,
1350       0},
1351     { NULL, 0, 0}
1352   };
1353   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1354     { &GSS_handle_alice_client_message, NULL,
1355       GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE,
1356       0},
1357     { &GSS_handle_alice_client_message_multipart, NULL,
1358       GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_ALICE,
1359       0},
1360     { NULL, NULL, 0, 0}
1361   };
1362
1363   cfg = c;
1364   /*
1365     offset has to be sufficiently small to allow computation of:
1366     m1+m2 mod n == (S + a) + (S + b) mod n,
1367     if we have more complex operations, this factor needs to be lowered */
1368   my_offset = gcry_mpi_new (GNUNET_CRYPTO_PAILLIER_BITS / 3);
1369   gcry_mpi_set_bit (my_offset,
1370                     GNUNET_CRYPTO_PAILLIER_BITS / 3);
1371
1372   GNUNET_CRYPTO_paillier_create (&my_pubkey,
1373                                  &my_privkey);
1374   GNUNET_SERVER_add_handlers (server,
1375                               server_handlers);
1376   GNUNET_SERVER_disconnect_notify (server,
1377                                    &handle_client_disconnect,
1378                                    NULL);
1379   my_cadet = GNUNET_CADET_connect (cfg, NULL,
1380                                    NULL /* no incoming supported */,
1381                                    &cb_channel_destruction,
1382                                    cadet_handlers,
1383                                    NULL);
1384   if (NULL == my_cadet)
1385   {
1386     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1387                 _("Connect to CADET failed\n"));
1388     GNUNET_SCHEDULER_shutdown ();
1389     return;
1390   }
1391   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1392                                 &shutdown_task,
1393                                 NULL);
1394
1395 }
1396
1397
1398 /**
1399  * The main function for the scalarproduct service.
1400  *
1401  * @param argc number of arguments from the command line
1402  * @param argv command line arguments
1403  * @return 0 ok, 1 on error
1404  */
1405 int
1406 main (int argc,
1407       char *const *argv)
1408 {
1409   return (GNUNET_OK ==
1410           GNUNET_SERVICE_run (argc, argv,
1411                               "scalarproduct-alice",
1412                               GNUNET_SERVICE_OPTION_NONE,
1413                               &run, NULL)) ? 0 : 1;
1414 }
1415
1416 /* end of gnunet-service-scalarproduct_alice.c */