- more abnormal session-termination handling
[oweals/gnunet.git] / src / scalarproduct / gnunet-service-scalarproduct.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file scalarproduct/gnunet-service-scalarproduct.c
23  * @brief scalarproduct service implementation
24  * @author Christian M. Fuchs
25  */
26 #include "platform.h"
27 #include <limits.h>
28 #include <gcrypt.h>
29 #include "gnunet_util_lib.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_scalarproduct_service.h"
35 #include "gnunet_set_service.h"
36 #include "scalarproduct.h"
37
38 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct", __VA_ARGS__)
39
40 ///////////////////////////////////////////////////////////////////////////////
41 //                     Service Structure Definitions
42 ///////////////////////////////////////////////////////////////////////////////
43
44
45 /**
46  * role a peer in a session can assume
47  */
48 enum PeerRole
49 {
50   ALICE,
51   BOB
52 };
53
54 /**
55  * DLL for sorting elements
56  */
57 struct SortedValue
58 {
59   struct SortedValue * next;
60   struct SortedValue * prev;
61   struct GNUNET_SCALARPRODUCT_Element * elem;
62   gcry_mpi_t val;
63 };
64
65
66 /**
67  * A scalarproduct session which tracks:
68  *
69  * a request form the client to our final response.
70  * or
71  * a request from a service to us(service).
72  */
73 struct ServiceSession
74 {
75   /**
76    * the role this peer has
77    */
78   enum PeerRole role;
79
80   /**
81    * session information is kept in a DLL
82    */
83   struct ServiceSession *next;
84
85   /**
86    * session information is kept in a DLL
87    */
88   struct ServiceSession *prev;
89
90   /**
91    * (hopefully) unique transaction ID
92    */
93   struct GNUNET_HashCode session_id;
94
95   /**
96    * Alice or Bob's peerID
97    */
98   struct GNUNET_PeerIdentity peer;
99
100   /**
101    * the client this request is related to
102    */
103   struct GNUNET_SERVER_Client * client;
104
105   /**
106    * The message to send
107    */
108   struct GNUNET_MessageHeader * msg;
109
110   /**
111    * how many elements we were supplied with from the client
112    */
113   uint32_t total;
114
115   /**
116    * how many elements we used for intersection
117    */
118   uint32_t intersected_elements_count;
119
120   /**
121    * all non-0-value'd elements transmitted to us
122    */
123   struct GNUNET_CONTAINER_MultiHashMap * intersected_elements;
124
125   /**
126    * how many elements actually are used for the scalar product
127    */
128   uint32_t used_elements_count;
129
130   /**
131    * already transferred elements (sent/received) for multipart messages, less or equal than used_element_count for
132    */
133   uint32_t transferred_element_count;
134
135   /**
136    * Set of elements for which will conduction an intersection. 
137    * the resulting elements are then used for computing the scalar product.
138    */
139   struct GNUNET_SET_Handle * intersection_set;
140
141   /**
142    * Set of elements for which will conduction an intersection. 
143    * the resulting elements are then used for computing the scalar product.
144    */
145   struct GNUNET_SET_OperationHandle * intersection_op;
146
147   /**
148    * Handle to Alice's Intersection operation listening for Bob
149    */
150   struct GNUNET_SET_ListenHandle * intersection_listen;
151
152   /**
153    * Public key of the remote service, only used by bob
154    */
155   struct GNUNET_CRYPTO_PaillierPublicKey remote_pubkey;
156
157   /**
158    * DLL for sorting elements after intersection
159    */
160   struct SortedValue * a_head;
161
162   /**
163    * a(Alice)
164    */
165   struct SortedValue * a_tail;
166
167   /**
168    * a(Alice)
169    */
170   gcry_mpi_t * sorted_elements;
171
172   /**
173    * E(ai)(Bob) after applying the mask
174    */
175   struct GNUNET_CRYPTO_PaillierCiphertext * e_a;
176
177   /**
178    * Bob's permutation p of R
179    */
180   struct GNUNET_CRYPTO_PaillierCiphertext * r;
181
182   /**
183    * Bob's permutation q of R
184    */
185   struct GNUNET_CRYPTO_PaillierCiphertext * r_prime;
186
187   /**
188    * Bob's s
189    */
190   struct GNUNET_CRYPTO_PaillierCiphertext * s;
191
192   /**
193    * Bob's s'
194    */
195   struct GNUNET_CRYPTO_PaillierCiphertext * s_prime;
196
197   /**
198    * Bobs matching response session from the client
199    */
200   struct ServiceSession * response;
201
202   /**
203    * The computed scalar
204    */
205   gcry_mpi_t product;
206
207   /**
208    * My transmit handle for the current message to a alice/bob
209    */
210   struct GNUNET_CADET_TransmitHandle * service_transmit_handle;
211
212   /**
213    * My transmit handle for the current message to the client
214    */
215   struct GNUNET_SERVER_TransmitHandle * client_transmit_handle;
216
217   /**
218    * channel-handle associated with our cadet handle
219    */
220   struct GNUNET_CADET_Channel * channel;
221
222   /**
223    * Handle to a task that sends a msg to the our client
224    */
225   GNUNET_SCHEDULER_TaskIdentifier client_notification_task;
226 };
227
228 ///////////////////////////////////////////////////////////////////////////////
229 //                      Forward Delcarations
230 ///////////////////////////////////////////////////////////////////////////////
231
232 /**
233  * Send a multi part chunk of a service request from alice to bob.
234  * This element only contains a part of the elements-vector (session->a[]),
235  * mask and public key set have to be contained within the first message
236  *
237  * This allows a ~32kbit key length while using 32000 elements or 62000 elements per request.
238  *
239  * @param cls the associated service session
240  */
241 static void
242 prepare_alices_cyrptodata_message_multipart (void *cls);
243
244 /**
245  * Send a multi part chunk of a service response from bob to alice.
246  * This element only contains the two permutations of R, R'.
247  *
248  * @param cls the associated service session
249  */
250 static void
251 prepare_bobs_cryptodata_message_multipart (void *cls);
252
253
254 ///////////////////////////////////////////////////////////////////////////////
255 //                      Global Variables
256 ///////////////////////////////////////////////////////////////////////////////
257
258
259 /**
260  * Gnunet configuration handle
261  */
262 const struct GNUNET_CONFIGURATION_Handle * cfg;
263
264 /**
265  * Handle to the core service (NULL until we've connected to it).
266  */
267 static struct GNUNET_CADET_Handle *my_cadet;
268
269 /**
270  * The identity of this host.
271  */
272 static struct GNUNET_PeerIdentity me;
273
274 /**
275  * Service's own public key
276  */
277 static struct GNUNET_CRYPTO_PaillierPublicKey my_pubkey;
278
279 /**
280  * Service's own private key
281  */
282 static struct GNUNET_CRYPTO_PaillierPrivateKey my_privkey;
283
284 /**
285  * Service's offset for values that could possibly be negative but are plaintext for encryption.
286  */
287 static gcry_mpi_t my_offset;
288
289 /**
290  * Head of our double linked list for client-requests sent to us.
291  * for all of these elements we calculate a scalar product with a remote peer
292  * split between service->service and client->service for simplicity
293  */
294 static struct ServiceSession * from_client_head;
295 /**
296  * Tail of our double linked list for client-requests sent to us.
297  * for all of these elements we calculate a scalar product with a remote peer
298  * split between service->service and client->service for simplicity
299  */
300 static struct ServiceSession * from_client_tail;
301
302 /**
303  * Head of our double linked list for service-requests sent to us.
304  * for all of these elements we help the requesting service in calculating a scalar product
305  * split between service->service and client->service for simplicity
306  */
307 static struct ServiceSession * from_service_head;
308
309 /**
310  * Tail of our double linked list for service-requests sent to us.
311  * for all of these elements we help the requesting service in calculating a scalar product
312  * split between service->service and client->service for simplicity
313  */
314 static struct ServiceSession * from_service_tail;
315
316 /**
317  * Certain events (callbacks for server & cadet operations) must not be queued after shutdown.
318  */
319 static int do_shutdown;
320
321 ///////////////////////////////////////////////////////////////////////////////
322 //                      Helper Functions
323 ///////////////////////////////////////////////////////////////////////////////
324
325
326 /**
327  * computes the square sum over a vector of a given length.
328  *
329  * @param vector the vector to encrypt
330  * @param length the length of the vector
331  * @return an MPI value containing the calculated sum, never NULL
332  */
333 static gcry_mpi_t
334 compute_square_sum (gcry_mpi_t * vector, uint32_t length)
335 {
336   gcry_mpi_t elem;
337   gcry_mpi_t sum;
338   int32_t i;
339
340   GNUNET_assert (sum = gcry_mpi_new (0));
341   GNUNET_assert (elem = gcry_mpi_new (0));
342
343   // calculare E(sum (ai ^ 2), publickey)
344   for (i = 0; i < length; i++) {
345     gcry_mpi_mul (elem, vector[i], vector[i]);
346     gcry_mpi_add (sum, sum, elem);
347   }
348   gcry_mpi_release (elem);
349
350   return sum;
351 }
352
353
354 /**
355  * Primitive callback for copying over a message, as they
356  * usually are too complex to be handled in the callback itself.
357  * clears a session-callback, if a session was handed over and the transmit handle was stored
358  *
359  * @param cls the session containing the message object
360  * @param size the size of the buffer we got
361  * @param buf the buffer to copy the message to
362  * @return 0 if we couldn't copy, else the size copied over
363  */
364 static size_t
365 do_send_message (void *cls, size_t size, void *buf)
366 {
367   struct ServiceSession * s = cls;
368   uint16_t type;
369
370   GNUNET_assert (buf);
371
372   if (ntohs (s->msg->size) != size) {
373     GNUNET_break (0);
374     return 0;
375   }
376
377   type = ntohs (s->msg->type);
378   memcpy (buf, s->msg, size);
379   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
380               "Sent a message of type %hu.\n",
381               type);
382   GNUNET_free (s->msg);
383   s->msg = NULL;
384
385   switch (type)
386   {
387   case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT:
388     s->client_transmit_handle = NULL;
389     break;
390
391   case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA:
392   case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA_MULTIPART:
393     s->service_transmit_handle = NULL;
394     if (s->used_elements_count != s->transferred_element_count)
395       prepare_alices_cyrptodata_message_multipart (s);
396     break;
397
398   case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA:
399   case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA_MULTIPART:
400     s->service_transmit_handle = NULL;
401     if (s->used_elements_count != s->transferred_element_count)
402       prepare_bobs_cryptodata_message_multipart (s);
403     break;
404
405   default:
406     GNUNET_assert (0);
407   }
408
409   return size;
410 }
411
412
413 /**
414  * Finds a not terminated client/service session in the
415  * given DLL based on session key, element count and state.
416  *
417  * @param tail - the tail of the DLL
418  * @param key - the key we want to search for
419  * @param element_count - the total element count of the dataset (session->total)
420  * @param peerid - a pointer to the peer ID of the associated peer, NULL to ignore
421  * @return a pointer to a matching session, or NULL
422  */
423 static struct ServiceSession *
424 find_matching_session (struct ServiceSession * tail,
425                        const struct GNUNET_HashCode * key,
426                        uint32_t element_count,
427                        const struct GNUNET_PeerIdentity * peerid)
428 {
429   struct ServiceSession * curr;
430
431   for (curr = tail; NULL != curr; curr = curr->prev) {
432     // if the key matches, and the element_count is same
433     if ((!memcmp (&curr->session_id, key, sizeof (struct GNUNET_HashCode)))
434         && (curr->total == element_count)) {
435       // if peerid is NULL OR same as the peer Id in the queued request
436       if ((NULL == peerid)
437           || (!memcmp (&curr->peer, peerid, sizeof (struct GNUNET_PeerIdentity))))
438         // matches and is not an already terminated session
439         return curr;
440     }
441   }
442
443   return NULL;
444 }
445
446
447 /**
448  * Safely frees ALL memory areas referenced by a session.
449  *
450  * @param session - the session to free elements from
451  */
452 static void
453 free_session_variables (struct ServiceSession * session)
454 {
455   while (NULL != session->a_head) {
456     struct SortedValue * e = session->a_head;
457     GNUNET_free (e->elem);
458     gcry_mpi_release (e->val);
459     GNUNET_CONTAINER_DLL_remove (session->a_head, session->a_tail, e);
460     GNUNET_free (e);
461   }
462   if (session->e_a) {
463     GNUNET_free (session->e_a);
464     session->e_a = NULL;
465   }
466   if (session->sorted_elements) {
467     GNUNET_free (session->sorted_elements);
468     session->sorted_elements = NULL;
469   }
470   if (session->intersected_elements) {
471     GNUNET_CONTAINER_multihashmap_destroy (session->intersected_elements);
472     //elements are freed independently in session->a_head/tail
473     session->intersected_elements = NULL;
474   }
475   if (session->intersection_listen) {
476     GNUNET_SET_listen_cancel (session->intersection_listen);
477     session->intersection_listen = NULL;
478   }
479   if (session->intersection_op) {
480     GNUNET_SET_operation_cancel (session->intersection_op);
481     session->intersection_op = NULL;
482   }
483   if (session->intersection_set) {
484     GNUNET_SET_destroy (session->intersection_set);
485     session->intersection_set = NULL;
486   }
487   if (session->channel){
488     GNUNET_CADET_channel_destroy(session->channel);
489     session->channel = NULL;
490   }
491   if (session->msg) {
492     GNUNET_free (session->msg);
493     session->msg = NULL;
494   }
495   if (session->r) {
496     GNUNET_free (session->r);
497     session->r = NULL;
498   }
499   if (session->r_prime) {
500     GNUNET_free (session->r_prime);
501     session->r_prime = NULL;
502   }
503   if (session->s) {
504     GNUNET_free (session->s);
505     session->s = NULL;
506   }
507   if (session->s_prime) {
508     GNUNET_free (session->s_prime);
509     session->s_prime = NULL;
510   }
511   if (session->product) {
512     gcry_mpi_release (session->product);
513     session->product = NULL;
514   }
515 }
516 ///////////////////////////////////////////////////////////////////////////////
517 //                      Event and Message Handlers
518 ///////////////////////////////////////////////////////////////////////////////
519
520
521 /**
522  * A client disconnected.
523  *
524  * Remove the associated session(s), release data structures
525  * and cancel pending outgoing transmissions to the client.
526  * if the session has not yet completed, we also cancel Alice's request to Bob.
527  *
528  * @param cls closure, NULL
529  * @param client identification of the client
530  */
531 static void
532 handle_client_disconnect (void *cls,
533                           struct GNUNET_SERVER_Client *client)
534 {
535   struct ServiceSession *session;
536
537   if (NULL != client)
538     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
539                 _ ("Client (%p) disconnected from us.\n"), client);
540   else
541     return;
542
543   session = GNUNET_SERVER_client_get_user_context (client, struct ServiceSession);
544   if (NULL == session)
545     return;
546   GNUNET_CONTAINER_DLL_remove (from_client_head, from_client_tail, session);
547
548   if (!(session->role == BOB && 0/*//TODO: if session concluded*/)) {
549     //we MUST terminate any client message underway
550     if (session->service_transmit_handle && session->channel)
551       GNUNET_CADET_notify_transmit_ready_cancel (session->service_transmit_handle);
552     if (session->channel && 0/* //TODO: waiting for service response */)
553       GNUNET_CADET_channel_destroy (session->channel);
554   }
555   if (GNUNET_SCHEDULER_NO_TASK != session->client_notification_task) {
556     GNUNET_SCHEDULER_cancel (session->client_notification_task);
557     session->client_notification_task = GNUNET_SCHEDULER_NO_TASK;
558   }
559   if (NULL != session->client_transmit_handle) {
560     GNUNET_SERVER_notify_transmit_ready_cancel (session->client_transmit_handle);
561     session->client_transmit_handle = NULL;
562   }
563   free_session_variables (session);
564   GNUNET_free (session);
565 }
566
567
568 /**
569  * Notify the client that the session has succeeded or failed completely.
570  * This message gets sent to
571  * * alice's client if bob disconnected or to
572  * * bob's client if the operation completed or alice disconnected
573  *
574  * @param cls the associated client session
575  * @param tc the task context handed to us by the scheduler, unused
576  */
577 static void
578 prepare_client_end_notification (void * cls,
579                                  const struct GNUNET_SCHEDULER_TaskContext * tc)
580 {
581   struct ServiceSession * session = cls;
582   struct GNUNET_SCALARPRODUCT_client_response * msg;
583
584   session->client_notification_task = GNUNET_SCHEDULER_NO_TASK;
585
586   msg = GNUNET_new (struct GNUNET_SCALARPRODUCT_client_response);
587   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
588   memcpy (&msg->key, &session->session_id, sizeof (struct GNUNET_HashCode));
589   memcpy (&msg->peer, &session->peer, sizeof ( struct GNUNET_PeerIdentity));
590   msg->header.size = htons (sizeof (struct GNUNET_SCALARPRODUCT_client_response));
591   // signal error if not signalized, positive result-range field but zero length.
592   msg->product_length = htonl (0);
593   msg->range = (session /* //TODO: if finalized */) ? 0 : -1;
594
595   session->msg = &msg->header;
596
597   //transmit this message to our client
598   session->client_transmit_handle =
599           GNUNET_SERVER_notify_transmit_ready (session->client,
600                                                sizeof (struct GNUNET_SCALARPRODUCT_client_response),
601                                                GNUNET_TIME_UNIT_FOREVER_REL,
602                                                &do_send_message,
603                                                session);
604
605   // if we could not even queue our request, something is wrong
606   if (NULL == session->client_transmit_handle) {
607     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _ ("Could not send message to client (%p)!\n"), session->client);
608     // usually gets freed by do_send_message
609     session->msg = NULL;
610     GNUNET_free (msg);
611   }
612   else
613     GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Sending session-end notification to client (%p) for session %s\n"), &session->client, GNUNET_h2s (&session->session_id));
614
615   free_session_variables (session);
616 }
617
618
619 /**
620  * Executed by Alice, fills in a service-request message and sends it to the given peer
621  *
622  * @param cls the session associated with this request
623  */
624 static void
625 prepare_alices_cyrptodata_message (void *cls)
626 {
627   struct ServiceSession * session = cls;
628   struct GNUNET_SCALARPRODUCT_alices_cryptodata_message * msg;
629   struct GNUNET_CRYPTO_PaillierCiphertext * payload;
630   unsigned int i;
631   uint32_t msg_length;
632   gcry_mpi_t a;
633
634   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, _ ("Successfully created new channel to peer (%s)!\n"), GNUNET_i2s (&session->peer));
635
636   msg_length = sizeof (struct GNUNET_SCALARPRODUCT_alices_cryptodata_message)
637           +session->used_elements_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
638
639   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > msg_length) {
640     session->transferred_element_count = session->used_elements_count;
641   }
642   else {
643     //create a multipart msg, first we calculate a new msg size for the head msg
644     session->transferred_element_count = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_alices_cryptodata_message))
645             / sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
646     msg_length = sizeof (struct GNUNET_SCALARPRODUCT_alices_cryptodata_message)
647             +session->transferred_element_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
648   }
649
650   msg = GNUNET_malloc (msg_length);
651   msg->header.size = htons (msg_length);
652   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA);
653   msg->contained_element_count = htonl (session->transferred_element_count);
654
655   // fill in the payload
656   payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
657
658   // now copy over the sorted element vector
659   a = gcry_mpi_new (0);
660   for (i = 0; i < session->transferred_element_count; i++) {
661     gcry_mpi_add (a, session->sorted_elements[i], my_offset);
662     GNUNET_CRYPTO_paillier_encrypt (&my_pubkey, a, 3, &payload[i]);
663   }
664   gcry_mpi_release (a);
665
666   session->msg = (struct GNUNET_MessageHeader *) msg;
667   GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Transmitting service request.\n"));
668
669   //transmit via cadet messaging
670   session->service_transmit_handle = GNUNET_CADET_notify_transmit_ready (session->channel, GNUNET_YES,
671                                                                         GNUNET_TIME_UNIT_FOREVER_REL,
672                                                                         msg_length,
673                                                                         &do_send_message,
674                                                                         session);
675   if (NULL == session->service_transmit_handle) {
676     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send message to channel!\n"));
677     GNUNET_free (msg);
678     session->msg = NULL;
679     session->client_notification_task =
680             GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
681                                       session);
682     return;
683   }
684 }
685
686
687 /**
688  * Send a multipart chunk of a service response from bob to alice.
689  * This element only contains the two permutations of R, R'.
690  *
691  * @param cls the associated service session
692  */
693 static void
694 prepare_bobs_cryptodata_message_multipart (void *cls)
695 {
696   struct ServiceSession * session = cls;
697   struct GNUNET_CRYPTO_PaillierCiphertext * payload;
698   struct GNUNET_SCALARPRODUCT_multipart_message * msg;
699   unsigned int i;
700   unsigned int j;
701   uint32_t msg_length;
702   uint32_t todo_count;
703
704   msg_length = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message);
705   todo_count = session->used_elements_count - session->transferred_element_count;
706
707   if (todo_count > MULTIPART_ELEMENT_CAPACITY / 2)
708     // send the currently possible maximum chunk, we always transfer both permutations
709     todo_count = MULTIPART_ELEMENT_CAPACITY / 2;
710
711   msg_length += todo_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * 2;
712   msg = GNUNET_malloc (msg_length);
713   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA_MULTIPART);
714   msg->header.size = htons (msg_length);
715   msg->contained_element_count = htonl (todo_count);
716
717   payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
718   for (i = session->transferred_element_count, j = 0; i < session->transferred_element_count + todo_count; i++) {
719     //r[i][p] and r[i][q]
720     memcpy (&payload[j++], &session->r[i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
721     memcpy (&payload[j++], &session->r_prime[i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
722   }
723   session->transferred_element_count += todo_count;
724   session->msg = (struct GNUNET_MessageHeader *) msg;
725   session->service_transmit_handle =
726           GNUNET_CADET_notify_transmit_ready (session->channel,
727                                              GNUNET_YES,
728                                              GNUNET_TIME_UNIT_FOREVER_REL,
729                                              msg_length,
730                                              &do_send_message,
731                                              session);
732   //disconnect our client
733   if (NULL == session->service_transmit_handle) {
734     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send service-response message via cadet!)\n"));
735     
736     GNUNET_free (msg);
737     session->msg = NULL;
738     GNUNET_CONTAINER_DLL_remove (from_service_head, from_service_tail, session);
739     
740     session->response->client_notification_task =
741             GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
742                                       session->response);
743     free_session_variables(session);
744     GNUNET_free(session);
745     return;
746   }
747   if (session->transferred_element_count != session->used_elements_count) {
748     // more multiparts
749   }
750   else {
751     // final part
752     GNUNET_free (session->r_prime);
753     GNUNET_free (session->r);
754     session->r_prime = NULL;
755     session->r = NULL;
756   }
757 }
758
759
760 /**
761  * Bob executes:
762  * generates the response message to be sent to alice after computing
763  * the values (1), (2), S and S'
764  *  (1)[]: $E_A(a_{pi(i)}) times E_A(- r_{pi(i)} - b_{pi(i)}) &= E_A(a_{pi(i)} - r_{pi(i)} - b_{pi(i)})$
765  *  (2)[]: $E_A(a_{pi'(i)}) times E_A(- r_{pi'(i)}) &= E_A(a_{pi'(i)} - r_{pi'(i)})$
766  *      S: $S := E_A(sum (r_i + b_i)^2)$
767  *     S': $S' := E_A(sum r_i^2)$
768  *
769  * @param session  the associated requesting session with alice
770  */
771 static void
772 prepare_bobs_cryptodata_message (void *cls,
773                           const struct GNUNET_SCHEDULER_TaskContext
774                           * tc)
775 {
776   struct ServiceSession * session = (struct ServiceSession *) cls;
777   struct GNUNET_SCALARPRODUCT_service_response * msg;
778   uint32_t msg_length = 0;
779   struct GNUNET_CRYPTO_PaillierCiphertext * payload;
780   int i;
781
782   msg_length = sizeof (struct GNUNET_SCALARPRODUCT_service_response)
783           + 2 * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext); // s, stick
784
785   if (GNUNET_SERVER_MAX_MESSAGE_SIZE >
786       msg_length + 2 * session->used_elements_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)) { //r, r'
787     msg_length += 2 * session->used_elements_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
788     session->transferred_element_count = session->used_elements_count;
789   }
790   else
791     session->transferred_element_count = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - msg_length) /
792     (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * 2);
793
794   msg = GNUNET_malloc (msg_length);
795   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA);
796   msg->header.size = htons (msg_length);
797   msg->total_element_count = htonl (session->total);
798   msg->used_element_count = htonl (session->used_elements_count);
799   msg->contained_element_count = htonl (session->transferred_element_count);
800   memcpy (&msg->key, &session->session_id, sizeof (struct GNUNET_HashCode));
801
802   payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
803   memcpy (&payload[0], session->s, sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
804   memcpy (&payload[1], session->s_prime, sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
805   GNUNET_free (session->s_prime);
806   session->s_prime = NULL;
807   GNUNET_free (session->s);
808   session->s = NULL;
809
810   payload = &payload[2];
811   // convert k[][]
812   for (i = 0; i < session->transferred_element_count; i++) {
813     //k[i][p] and k[i][q]
814     memcpy (&payload[i * 2], &session->r[i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
815     memcpy (&payload[i * 2 + 1], &session->r_prime[i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
816   }
817
818   session->msg = (struct GNUNET_MessageHeader *) msg;
819   session->service_transmit_handle =
820           GNUNET_CADET_notify_transmit_ready (session->channel,
821                                              GNUNET_YES,
822                                              GNUNET_TIME_UNIT_FOREVER_REL,
823                                              msg_length,
824                                              &do_send_message,
825                                              session);
826   //disconnect our client
827   if (NULL == session->service_transmit_handle) {
828     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send service-response message via cadet!)\n"));
829
830     GNUNET_free (msg);
831     session->msg = NULL;
832     GNUNET_CONTAINER_DLL_remove (from_service_head, from_service_tail, session);
833     
834     session->response->client_notification_task =
835             GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
836                                       session->response);
837     free_session_variables(session);
838     GNUNET_free(session);
839     return;
840   }
841   if (session->transferred_element_count != session->used_elements_count) {
842     // multipart
843   }
844   else {
845     //singlepart
846     GNUNET_free (session->r);
847     session->r = NULL;
848     GNUNET_free (session->r_prime);
849     session->r_prime = NULL;
850   }
851 }
852
853
854 /**
855  * executed by bob:
856  * compute the values
857  *  (1)[]: $E_A(a_{pi(i)}) otimes E_A(- r_{pi(i)} - b_{pi(i)}) &= E_A(a_{pi(i)} - r_{pi(i)} - b_{pi(i)})$
858  *  (2)[]: $E_A(a_{pi'(i)}) otimes E_A(- r_{pi'(i)}) &= E_A(a_{pi'(i)} - r_{pi'(i)})$
859  *      S: $S := E_A(sum (r_i + b_i)^2)$
860  *     S': $S' := E_A(sum r_i^2)$
861  *
862  * @param request the requesting session + bob's requesting peer
863  */
864 static void
865 compute_service_response (struct ServiceSession * session)
866 {
867   int i;
868   unsigned int * p;
869   unsigned int * q;
870   uint32_t count;
871   gcry_mpi_t * rand = NULL;
872   gcry_mpi_t tmp;
873   gcry_mpi_t * b;
874   struct GNUNET_CRYPTO_PaillierCiphertext * a;
875   struct GNUNET_CRYPTO_PaillierCiphertext * r;
876   struct GNUNET_CRYPTO_PaillierCiphertext * r_prime;
877   struct GNUNET_CRYPTO_PaillierCiphertext * s;
878   struct GNUNET_CRYPTO_PaillierCiphertext * s_prime;
879
880   count = session->used_elements_count;
881   a = session->e_a;
882   b = session->sorted_elements;
883   q = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK, count);
884   p = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK, count);
885
886   for (i = 0; i < count; i++)
887     GNUNET_assert (NULL != (rand[i] = gcry_mpi_new (0)));
888   r = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * count);
889   r_prime = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * count);
890   s = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
891   s_prime = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
892
893   for (i = 0; i < count; i++) {
894     int32_t svalue;
895
896     svalue = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
897
898     // long to gcry_mpi_t
899     if (svalue < 0)
900       gcry_mpi_sub_ui (rand[i],
901                        rand[i],
902                        -svalue);
903     else
904       rand[i] = gcry_mpi_set_ui (rand[i], svalue);
905   }
906
907   tmp = gcry_mpi_new (0);
908   // encrypt the element
909   // for the sake of readability I decided to have dedicated permutation
910   // vectors, which get rid of all the lookups in p/q.
911   // however, ap/aq are not absolutely necessary but are just abstraction
912   // Calculate Kp = E(S + a_pi) (+) E(S - r_pi - b_pi)
913   for (i = 0; i < count; i++) {
914     // E(S - r_pi - b_pi)
915     gcry_mpi_sub (tmp, my_offset, rand[p[i]]);
916     gcry_mpi_sub (tmp, tmp, b[p[i]]);
917     GNUNET_CRYPTO_paillier_encrypt (&session->remote_pubkey,
918                                     tmp,
919                                     2,
920                                     &r[i]);
921
922     // E(S - r_pi - b_pi) * E(S + a_pi) ==  E(2*S + a - r - b)
923     GNUNET_CRYPTO_paillier_hom_add (&session->remote_pubkey,
924                                     &r[i],
925                                     &a[p[i]],
926                                     &r[i]);
927   }
928
929   // Calculate Kq = E(S + a_qi) (+) E(S - r_qi)
930   for (i = 0; i < count; i++) {
931     // E(S - r_qi)
932     gcry_mpi_sub (tmp, my_offset, rand[q[i]]);
933     GNUNET_assert (2 == GNUNET_CRYPTO_paillier_encrypt (&session->remote_pubkey,
934                                                         tmp,
935                                                         2,
936                                                         &r_prime[i]));
937
938     // E(S - r_qi) * E(S + a_qi) == E(2*S + a_qi - r_qi)
939     GNUNET_assert (1 == GNUNET_CRYPTO_paillier_hom_add (&session->remote_pubkey,
940                                                         &r_prime[i],
941                                                         &a[q[i]],
942                                                         &r_prime[i]));
943   }
944
945   // Calculate S' =  E(SUM( r_i^2 ))
946   tmp = compute_square_sum (rand, count);
947   GNUNET_CRYPTO_paillier_encrypt (&session->remote_pubkey,
948                                   tmp,
949                                   1,
950                                   s_prime);
951
952   // Calculate S = E(SUM( (r_i + b_i)^2 ))
953   for (i = 0; i < count; i++)
954     gcry_mpi_add (rand[i], rand[i], b[i]);
955   tmp = compute_square_sum (rand, count);
956   GNUNET_CRYPTO_paillier_encrypt (&session->remote_pubkey,
957                                   tmp,
958                                   1,
959                                   s);
960
961   session->r = r;
962   session->r_prime = r_prime;
963   session->s = s;
964   session->s_prime = s_prime;
965
966   // release rand, b and a
967   for (i = 0; i < count; i++) {
968     gcry_mpi_release (rand[i]);
969     gcry_mpi_release (b[i]);
970   }
971   gcry_mpi_release (tmp);
972   GNUNET_free (session->e_a);
973   session->e_a = NULL;
974   GNUNET_free (p);
975   GNUNET_free (q);
976   GNUNET_free (b);
977   GNUNET_free (rand);
978
979   // copy the r[], r_prime[], S and Stick into a new message, prepare_service_response frees these
980   GNUNET_SCHEDULER_add_now (&prepare_bobs_cryptodata_message, session);
981 }
982
983
984 /**
985  * Iterator over all hash map entries in session->intersected_elements.
986  *
987  * @param cls closure
988  * @param key current key code
989  * @param value value in the hash map
990  * @return #GNUNET_YES if we should continue to
991  *         iterate,
992  *         #GNUNET_NO if not.
993  */
994 int
995 cb_insert_element_sorted (void *cls,
996                           const struct GNUNET_HashCode *key,
997                           void *value)
998 {
999   struct ServiceSession * session = (struct ServiceSession*) cls;
1000   struct SortedValue * e = GNUNET_new (struct SortedValue);
1001   struct SortedValue * o = session->a_head;
1002
1003   e->elem = value;
1004   e->val = gcry_mpi_new (0);
1005   if (0 > e->elem->value)
1006     gcry_mpi_sub_ui (e->val, e->val, abs (e->elem->value));
1007   else
1008     gcry_mpi_add_ui (e->val, e->val, e->elem->value);
1009
1010   // insert as first element with the lowest key
1011   if (NULL == session->a_head
1012       || (0 <= GNUNET_CRYPTO_hash_cmp (&session->a_head->elem->key, &e->elem->key))) {
1013     GNUNET_CONTAINER_DLL_insert (session->a_head, session->a_tail, e);
1014     return GNUNET_YES;
1015   }
1016   // insert as last element with the highest key
1017   if (0 >= GNUNET_CRYPTO_hash_cmp (&session->a_tail->elem->key, &e->elem->key)) {
1018     GNUNET_CONTAINER_DLL_insert_tail (session->a_head, session->a_tail, e);
1019     return GNUNET_YES;
1020   }
1021   // insert before the first higher/equal element
1022   do {
1023     if (0 <= GNUNET_CRYPTO_hash_cmp (&o->elem->key, &e->elem->key)) {
1024       GNUNET_CONTAINER_DLL_insert_before (session->a_head, session->a_tail, o, e);
1025       return GNUNET_YES;
1026     }
1027     o = o->next;
1028   }
1029   while (NULL != o);
1030   // broken DLL
1031   GNUNET_assert (0);
1032 }
1033
1034
1035 /**
1036  * Callback for set operation results. Called for each element
1037  * in the result set.
1038  *
1039  * @param cls closure
1040  * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
1041  * @param status see `enum GNUNET_SET_Status`
1042  */
1043 static void
1044 cb_intersection_element_removed (void *cls,
1045                                  const struct GNUNET_SET_Element *element,
1046                                  enum GNUNET_SET_Status status)
1047 {
1048   struct ServiceSession * session = (struct ServiceSession*) cls;
1049   struct GNUNET_SCALARPRODUCT_Element * se;
1050   int i;
1051
1052   switch (status)
1053   {
1054   case GNUNET_SET_STATUS_OK:
1055     //this element has been removed from the set
1056     se = GNUNET_CONTAINER_multihashmap_get (session->intersected_elements,
1057                                             element->data);
1058
1059     GNUNET_CONTAINER_multihashmap_remove (session->intersected_elements,
1060                                           element->data,
1061                                           se);
1062     session->used_elements_count--;
1063     return;
1064
1065   case GNUNET_SET_STATUS_DONE:
1066     if (2 > session->used_elements_count) {
1067       // failed! do not leak information about our single remaining element!
1068       // continue after the loop
1069       break;
1070     }
1071
1072     GNUNET_CONTAINER_multihashmap_iterate (session->intersected_elements,
1073                                            &cb_insert_element_sorted,
1074                                            session);
1075
1076     session->sorted_elements = GNUNET_malloc (session->used_elements_count * sizeof (gcry_mpi_t));
1077     for (i = 0; NULL != session->a_head; i++) {
1078       struct SortedValue* a = session->a_head;
1079       GNUNET_assert (i < session->used_elements_count);
1080       
1081       session->sorted_elements[i] = a->val;
1082       GNUNET_CONTAINER_DLL_remove (session->a_head, session->a_tail, a);
1083       GNUNET_free (a->elem);
1084     }
1085     GNUNET_assert (i == session->used_elements_count);
1086
1087     if (ALICE == session->role) {
1088       prepare_alices_cyrptodata_message (session);
1089       return;
1090     }
1091     else if (session->used_elements_count == session->transferred_element_count) {
1092       compute_service_response (session);
1093       return;
1094     }
1095   default:
1096     break;
1097   }
1098
1099   //failed if we go here
1100   GNUNET_break (0);
1101
1102   // and notify our client-session that we could not complete the session
1103   if (ALICE == session->role) {
1104     session->client_notification_task =
1105             GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1106                                       session);
1107   }
1108   else {
1109     GNUNET_CONTAINER_DLL_remove (from_service_head, from_service_tail, session);
1110     free_session_variables (session);
1111     session->response->client_notification_task =
1112             GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1113                                       session->response);
1114     GNUNET_free(session);
1115   }
1116 }
1117
1118
1119 /**
1120  * Called when another peer wants to do a set operation with the
1121  * local peer. If a listen error occurs, the @a request is NULL.
1122  *
1123  * @param cls closure
1124  * @param other_peer the other peer
1125  * @param context_msg message with application specific information from
1126  *        the other peer
1127  * @param request request from the other peer (never NULL), use GNUNET_SET_accept()
1128  *        to accept it, otherwise the request will be refused
1129  *        Note that we can't just return value from the listen callback,
1130  *        as it is also necessary to specify the set we want to do the
1131  *        operation with, whith sometimes can be derived from the context
1132  *        message. It's necessary to specify the timeout.
1133  */
1134 static void
1135 cb_intersection_request_alice (void *cls,
1136                                const struct GNUNET_PeerIdentity *other_peer,
1137                                const struct GNUNET_MessageHeader *context_msg,
1138                                struct GNUNET_SET_Request *request)
1139 {
1140   struct ServiceSession * session = (struct ServiceSession *) cls;
1141
1142   // check the peer-id, the app-id=session-id is compared by SET
1143   if (0 != memcmp (&session->peer, &other_peer, sizeof (struct GNUNET_PeerIdentity)))
1144     return;
1145
1146   session->intersection_op = GNUNET_SET_accept (request,
1147                                                 GNUNET_SET_RESULT_REMOVED,
1148                                                 cb_intersection_element_removed,
1149                                                 session);
1150
1151   if (NULL == session->intersection_op) {
1152     session->response->client_notification_task =
1153             GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1154                                       session);
1155     return;
1156   }
1157   if (GNUNET_OK != GNUNET_SET_commit (session->intersection_op, session->intersection_set)) {
1158     session->response->client_notification_task =
1159             GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1160                                       session);
1161     return;
1162   }
1163   session->intersection_set = NULL;
1164   session->intersection_listen = NULL;
1165 }
1166
1167
1168 /**
1169  * prepare the response we will send to alice or bobs' clients.
1170  * in Bobs case the product will be NULL.
1171  *
1172  * @param cls the session associated with our client.
1173  * @param tc the task context handed to us by the scheduler, unused
1174  */
1175 static void
1176 prepare_client_response (void *cls,
1177                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1178 {
1179   struct ServiceSession * session = cls;
1180   struct GNUNET_SCALARPRODUCT_client_response * msg;
1181   unsigned char * product_exported = NULL;
1182   size_t product_length = 0;
1183   uint32_t msg_length = 0;
1184   int8_t range = -1;
1185   gcry_error_t rc;
1186   int sign;
1187
1188   session->client_notification_task = GNUNET_SCHEDULER_NO_TASK;
1189
1190   if (session->product) {
1191     gcry_mpi_t value = gcry_mpi_new (0);
1192
1193     sign = gcry_mpi_cmp_ui (session->product, 0);
1194     // libgcrypt can not handle a print of a negative number
1195     // if (a->sign) return gcry_error (GPG_ERR_INTERNAL); /* Can't handle it yet. */
1196     if (0 > sign) {
1197       gcry_mpi_sub (value, value, session->product);
1198     }
1199     else if (0 < sign) {
1200       range = 1;
1201       gcry_mpi_add (value, value, session->product);
1202     }
1203     else
1204       range = 0;
1205
1206     gcry_mpi_release (session->product);
1207     session->product = NULL;
1208
1209     // get representation as string
1210     if (range
1211         && (0 != (rc = gcry_mpi_aprint (GCRYMPI_FMT_STD,
1212                                         &product_exported,
1213                                         &product_length,
1214                                         value)))) {
1215       LOG_GCRY (GNUNET_ERROR_TYPE_ERROR, "gcry_mpi_scan", rc);
1216       product_length = 0;
1217       range = -1; // signal error with product-length = 0 and range = -1
1218     }
1219     gcry_mpi_release (value);
1220   }
1221
1222   msg_length = sizeof (struct GNUNET_SCALARPRODUCT_client_response) +product_length;
1223   msg = GNUNET_malloc (msg_length);
1224   msg->key = session->session_id;
1225   msg->peer = session->peer;
1226   if (product_exported != NULL) {
1227     memcpy (&msg[1], product_exported, product_length);
1228     GNUNET_free (product_exported);
1229   }
1230   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
1231   msg->header.size = htons (msg_length);
1232   msg->range = range;
1233   msg->product_length = htonl (product_length);
1234
1235   session->msg = (struct GNUNET_MessageHeader *) msg;
1236   //transmit this message to our client
1237   session->client_transmit_handle =
1238           GNUNET_SERVER_notify_transmit_ready (session->client,
1239                                                msg_length,
1240                                                GNUNET_TIME_UNIT_FOREVER_REL,
1241                                                &do_send_message,
1242                                                session);
1243   if (NULL == session->client_transmit_handle) {
1244     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1245                 _ ("Could not send message to client (%p)!\n"),
1246                 session->client);
1247     session->client = NULL;
1248     // callback was not called!
1249     GNUNET_free (msg);
1250     session->msg = NULL;
1251   }
1252   else
1253     // gracefully sent message, just terminate session structure
1254     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1255                 _ ("Sent result to client (%p), this session (%s) has ended!\n"),
1256                 session->client,
1257                 GNUNET_h2s (&session->session_id));
1258   free_session_variables (session);
1259 }
1260
1261
1262 /**
1263  * Executed by Alice, fills in a service-request message and sends it to the given peer
1264  *
1265  * @param session the session associated with this request
1266  */
1267 static void
1268 prepare_alices_computation_request (struct ServiceSession * session)
1269 {
1270   struct GNUNET_SCALARPRODUCT_service_request * msg;
1271
1272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, _ ("Successfully created new channel to peer (%s)!\n"), GNUNET_i2s (&session->peer));
1273
1274   msg = GNUNET_new (struct GNUNET_SCALARPRODUCT_service_request);
1275   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA);
1276   msg->total_element_count = htonl (session->used_elements_count);
1277   memcpy (&msg->session_id, &session->session_id, sizeof (struct GNUNET_HashCode));
1278   msg->header.size = htons (sizeof (struct GNUNET_SCALARPRODUCT_service_request));
1279
1280   session->msg = (struct GNUNET_MessageHeader *) msg;
1281   GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Transmitting service request.\n"));
1282
1283   //transmit via cadet messaging
1284   session->service_transmit_handle = GNUNET_CADET_notify_transmit_ready (session->channel, GNUNET_YES,
1285                                                                         GNUNET_TIME_UNIT_FOREVER_REL,
1286                                                                         sizeof (struct GNUNET_SCALARPRODUCT_service_request),
1287                                                                         &do_send_message,
1288                                                                         session);
1289   if (!session->service_transmit_handle) {
1290     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send message to channel!\n"));
1291     GNUNET_free (msg);
1292     session->msg = NULL;
1293     session->client_notification_task =
1294             GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1295                                       session);
1296     return;
1297   }
1298 }
1299
1300
1301 /**
1302  * Send a multi part chunk of a service request from alice to bob.
1303  * This element only contains a part of the elements-vector (session->a[]),
1304  * mask and public key set have to be contained within the first message
1305  *
1306  * This allows a ~32kbit key length while using 32000 elements or 62000 elements per request.
1307  *
1308  * @param cls the associated service session
1309  */
1310 static void
1311 prepare_alices_cyrptodata_message_multipart (void *cls)
1312 {
1313   struct ServiceSession * session = cls;
1314   struct GNUNET_SCALARPRODUCT_multipart_message * msg;
1315   struct GNUNET_CRYPTO_PaillierCiphertext * payload;
1316   unsigned int i;
1317   uint32_t msg_length;
1318   uint32_t todo_count;
1319   gcry_mpi_t a;
1320
1321   msg_length = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message);
1322   todo_count = session->used_elements_count - session->transferred_element_count;
1323
1324   if (todo_count > MULTIPART_ELEMENT_CAPACITY)
1325     // send the currently possible maximum chunk
1326     todo_count = MULTIPART_ELEMENT_CAPACITY;
1327
1328   msg_length += todo_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
1329   msg = GNUNET_malloc (msg_length);
1330   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA_MULTIPART);
1331   msg->header.size = htons (msg_length);
1332   msg->contained_element_count = htonl (todo_count);
1333
1334   payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
1335
1336   // now copy over the sorted element vector
1337   a = gcry_mpi_new (0);
1338   for (i = session->transferred_element_count; i < todo_count; i++) {
1339     gcry_mpi_add (a, session->sorted_elements[i], my_offset);
1340     GNUNET_CRYPTO_paillier_encrypt (&my_pubkey, a, 3, &payload[i - session->transferred_element_count]);
1341   }
1342   gcry_mpi_release (a);
1343   session->transferred_element_count += todo_count;
1344
1345   session->msg = (struct GNUNET_MessageHeader *) msg;
1346   GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Transmitting service request.\n"));
1347
1348   //transmit via cadet messaging
1349   session->service_transmit_handle = GNUNET_CADET_notify_transmit_ready (session->channel, GNUNET_YES,
1350                                                                         GNUNET_TIME_UNIT_FOREVER_REL,
1351                                                                         msg_length,
1352                                                                         &do_send_message,
1353                                                                         session);
1354   if (!session->service_transmit_handle) {
1355     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send service-request multipart message to channel!\n"));
1356     GNUNET_free (msg);
1357     session->msg = NULL;
1358     session->client_notification_task =
1359             GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1360                                       session);
1361     return;
1362   }
1363 }
1364
1365
1366 /**
1367  * Our client has finished sending us its multipart message.
1368  * 
1369  * @param session the service session context
1370  */
1371 static void
1372 client_request_complete_bob (struct ServiceSession * client_session)
1373 {
1374   struct ServiceSession * session;
1375
1376   //check if service queue contains a matching request
1377   session = find_matching_session (from_service_tail,
1378                                    &client_session->session_id,
1379                                    client_session->total, NULL);
1380   if (NULL != session) {
1381     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1382                 _ ("Got client-responder-session with key %s and a matching service-request-session set, processing.\n"),
1383                 GNUNET_h2s (&client_session->session_id));
1384
1385     session->response = client_session;
1386     session->intersected_elements = client_session->intersected_elements;
1387     client_session->intersected_elements = NULL;
1388     session->intersection_set = client_session->intersection_set;
1389     client_session->intersection_set = NULL;
1390
1391     session->intersection_op = GNUNET_SET_prepare (&session->peer,
1392                                                    &session->session_id,
1393                                                    NULL,
1394                                                    GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT16_MAX),
1395                                                    GNUNET_SET_RESULT_REMOVED,
1396                                                    cb_intersection_element_removed,
1397                                                    session);
1398
1399     GNUNET_SET_commit (session->intersection_op, session->intersection_set);
1400   }
1401   else {
1402     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1403                 _ ("Got client-responder-session with key %s but NO matching service-request-session set, queuing element for later use.\n"),
1404                 GNUNET_h2s (&client_session->session_id));
1405     // no matching session exists yet, store the response
1406     // for later processing by handle_service_request()
1407   }
1408 }
1409
1410
1411 /**
1412  * Our client has finished sending us its multipart message.
1413  * 
1414  * @param session the service session context
1415  */
1416 static void
1417 client_request_complete_alice (struct ServiceSession * session)
1418 {
1419   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1420               _ ("Creating new channel for session with key %s.\n"),
1421               GNUNET_h2s (&session->session_id));
1422   session->channel = GNUNET_CADET_channel_create (my_cadet, session,
1423                                                  &session->peer,
1424                                                  GNUNET_APPLICATION_TYPE_SCALARPRODUCT,
1425                                                  GNUNET_CADET_OPTION_RELIABLE);
1426   if (NULL == session->channel) {
1427     session->response->client_notification_task =
1428             GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1429                                       session);
1430     return;
1431   }
1432   session->intersection_listen = GNUNET_SET_listen (cfg,
1433                                                     GNUNET_SET_OPERATION_INTERSECTION,
1434                                                     &session->session_id,
1435                                                     cb_intersection_request_alice,
1436                                                     session);
1437   if (NULL == session->intersection_listen) {
1438     session->response->client_notification_task =
1439             GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1440                                       session);
1441     return;
1442   }
1443   prepare_alices_computation_request (session);
1444 }
1445
1446
1447 static void
1448 handle_client_message_multipart (void *cls,
1449                                  struct GNUNET_SERVER_Client *client,
1450                                  const struct GNUNET_MessageHeader *message)
1451 {
1452   const struct GNUNET_SCALARPRODUCT_computation_message_multipart * msg = (const struct GNUNET_SCALARPRODUCT_computation_message_multipart *) message;
1453   struct ServiceSession * session;
1454   uint32_t contained_count;
1455   struct GNUNET_SCALARPRODUCT_Element * elements;
1456   uint32_t i;
1457
1458   // only one concurrent session per client connection allowed, simplifies logics a lot...
1459   session = GNUNET_SERVER_client_get_user_context (client, struct ServiceSession);
1460   if (NULL == session) {
1461     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1462     return;
1463   }
1464
1465   contained_count = ntohl (msg->element_count_contained);
1466
1467   //sanity check: is the message as long as the message_count fields suggests?
1468   if ((ntohs (msg->header.size) != (sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart) +contained_count * sizeof (struct GNUNET_SCALARPRODUCT_Element)))
1469       || (0 == contained_count) || (session->total < session->transferred_element_count + contained_count)) {
1470     GNUNET_break_op (0);
1471     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1472     return;
1473   }
1474   session->transferred_element_count += contained_count;
1475
1476   elements = (struct GNUNET_SCALARPRODUCT_Element *) & msg[1];
1477   for (i = 0; i < contained_count; i++) {
1478     struct GNUNET_SET_Element set_elem;
1479     struct GNUNET_SCALARPRODUCT_Element * elem;
1480
1481     if (0 == ntohl (elements[i].value))
1482       continue;
1483
1484     elem = GNUNET_new (struct GNUNET_SCALARPRODUCT_Element);
1485     memcpy (elem, &elements[i], sizeof (struct GNUNET_SCALARPRODUCT_Element));
1486
1487     if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (session->intersected_elements,
1488                                                             &elem->key,
1489                                                             elem,
1490                                                             GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) {
1491       GNUNET_free (elem);
1492       continue;
1493     }
1494     set_elem.data = &elements[i].key;
1495     set_elem.size = htons (sizeof (elements[i].key));
1496     set_elem.type = htons (0); /* do we REALLY need this? */
1497     GNUNET_SET_add_element (session->intersection_set, &set_elem, NULL, NULL);
1498     session->used_elements_count++;
1499   }
1500
1501   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1502
1503   if (session->total != session->transferred_element_count)
1504     // multipart msg
1505     return;
1506
1507   if (ALICE == session->role)
1508     client_request_complete_alice (session);
1509   else
1510     client_request_complete_bob (session);
1511 }
1512
1513
1514 /**
1515  * Handler for a client request message.
1516  * Can either be type A or B
1517  *   A: request-initiation to compute a scalar product with a peer
1518  *   B: response role, keep the values + session and wait for a matching session or process a waiting request
1519  *
1520  * @param cls closure
1521  * @param client identification of the client
1522  * @param message the actual message
1523  */
1524 static void
1525 handle_client_message (void *cls,
1526                        struct GNUNET_SERVER_Client *client,
1527                        const struct GNUNET_MessageHeader *message)
1528 {
1529   const struct GNUNET_SCALARPRODUCT_computation_message * msg = (const struct GNUNET_SCALARPRODUCT_computation_message *) message;
1530   struct ServiceSession * session;
1531   uint32_t contained_count;
1532   uint32_t total_count;
1533   uint32_t msg_type;
1534   struct GNUNET_SCALARPRODUCT_Element * elements;
1535   uint32_t i;
1536
1537   // only one concurrent session per client connection allowed, simplifies logics a lot...
1538   session = GNUNET_SERVER_client_get_user_context (client, struct ServiceSession);
1539   if (NULL != session) {
1540     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1541     return;
1542   }
1543
1544   msg_type = ntohs (msg->header.type);
1545   total_count = ntohl (msg->element_count_total);
1546   contained_count = ntohl (msg->element_count_contained);
1547
1548   if ((GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE == msg_type)
1549       && (!memcmp (&msg->peer, &me, sizeof (struct GNUNET_PeerIdentity)))) {
1550     //session with ourself makes no sense!
1551     GNUNET_break_op (0);
1552     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1553     return;
1554   }
1555
1556   //sanity check: is the message as long as the message_count fields suggests?
1557   if ((ntohs (msg->header.size) != (sizeof (struct GNUNET_SCALARPRODUCT_computation_message) +contained_count * sizeof (struct GNUNET_SCALARPRODUCT_Element)))
1558       || (0 == total_count)) {
1559     GNUNET_break_op (0);
1560     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1561     return;
1562   }
1563
1564   // do we have a duplicate session here already?
1565   if (NULL != find_matching_session (from_client_tail,
1566                                      &msg->session_key,
1567                                      total_count, NULL)) {
1568     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1569                 _ ("Duplicate session information received, can not create new session with key `%s'\n"),
1570                 GNUNET_h2s (&msg->session_key));
1571     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1572     return;
1573   }
1574
1575   session = GNUNET_new (struct ServiceSession);
1576   session->client_notification_task = GNUNET_SCHEDULER_NO_TASK;
1577   session->client = client;
1578   session->total = total_count;
1579   session->transferred_element_count = contained_count;
1580   // get our transaction key
1581   memcpy (&session->session_id, &msg->session_key, sizeof (struct GNUNET_HashCode));
1582
1583   elements = (struct GNUNET_SCALARPRODUCT_Element *) & msg[1];
1584   session->intersected_elements = GNUNET_CONTAINER_multihashmap_create (session->total, GNUNET_NO);
1585   session->intersection_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_INTERSECTION);
1586   for (i = 0; i < contained_count; i++) {
1587     struct GNUNET_SET_Element set_elem;
1588     struct GNUNET_SCALARPRODUCT_Element * elem;
1589
1590     if (0 == ntohl (elements[i].value))
1591       continue;
1592
1593     elem = GNUNET_new (struct GNUNET_SCALARPRODUCT_Element);
1594     memcpy (elem, &elements[i], sizeof (struct GNUNET_SCALARPRODUCT_Element));
1595
1596     if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (session->intersected_elements,
1597                                                             &elem->key,
1598                                                             elem,
1599                                                             GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) {
1600       GNUNET_free (elem);
1601       continue;
1602     }
1603     set_elem.data = &elements[i].key;
1604     set_elem.size = htons (sizeof (elements[i].key));
1605     set_elem.type = htons (0); /* do we REALLY need this? */
1606     GNUNET_SET_add_element (session->intersection_set, &set_elem, NULL, NULL);
1607     session->used_elements_count++;
1608   }
1609
1610   if (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE == msg_type) {
1611     session->role = ALICE;
1612     memcpy (&session->peer, &msg->peer, sizeof (struct GNUNET_PeerIdentity));
1613   }
1614   else {
1615     session->role = BOB;
1616   }
1617
1618   GNUNET_CONTAINER_DLL_insert (from_client_head, from_client_tail, session);
1619   GNUNET_SERVER_client_set_user_context (client, session);
1620   GNUNET_SERVER_receive_done (client, GNUNET_YES);
1621
1622   if (session->total != session->transferred_element_count)
1623     // multipart msg
1624     return;
1625
1626   if (ALICE == session->role)
1627     client_request_complete_alice (session);
1628   else
1629     client_request_complete_bob (session);
1630 }
1631
1632
1633 /**
1634  * Function called for inbound channels.
1635  *
1636  * @param cls closure
1637  * @param channel new handle to the channel
1638  * @param initiator peer that started the channel
1639  * @param port unused
1640  * @param options unused
1641  *
1642  * @return session associated with the channel
1643  */
1644 static void *
1645 cb_channel_incoming (void *cls,
1646                           struct GNUNET_CADET_Channel *channel,
1647                           const struct GNUNET_PeerIdentity *initiator,
1648                           uint32_t port, enum GNUNET_CADET_ChannelOption options)
1649 {
1650   struct ServiceSession * c = GNUNET_new (struct ServiceSession);
1651
1652   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1653               _ ("New incoming channel from peer %s.\n"),
1654               GNUNET_i2s (initiator));
1655
1656   c->peer = *initiator;
1657   c->channel = channel;
1658   c->role = BOB;
1659   return c;
1660 }
1661
1662
1663 /**
1664  * Function called whenever a channel is destroyed.  Should clean up
1665  * any associated state.
1666  *
1667  * It must NOT call GNUNET_CADET_channel_destroy on the channel.
1668  *
1669  * @param cls closure (set from GNUNET_CADET_connect)
1670  * @param channel connection to the other end (henceforth invalid)
1671  * @param channel_ctx place where local state associated
1672  *                   with the channel is stored
1673  */
1674 static void
1675 cb_channel_destruction (void *cls,
1676                              const struct GNUNET_CADET_Channel *channel,
1677                              void *channel_ctx)
1678 {
1679   struct ServiceSession * session = channel_ctx;
1680   struct ServiceSession * client_session;
1681   struct ServiceSession * curr;
1682
1683   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1684               _ ("Peer disconnected, terminating session %s with peer (%s)\n"),
1685               GNUNET_h2s (&session->session_id),
1686               GNUNET_i2s (&session->peer));
1687   if (ALICE == session->role) {
1688     // as we have only one peer connected in each session, just remove the session
1689
1690     if ((0/*//TODO: only for complete session*/) && (!do_shutdown)) {
1691       session->channel = NULL;
1692       // if this happened before we received the answer, we must terminate the session
1693       session->client_notification_task =
1694               GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1695                                         session);
1696     }
1697   }
1698   else { //(BOB == session->role) service session
1699     // remove the session, unless it has already been dequeued, but somehow still active
1700     // this could bug without the IF in case the queue is empty and the service session was the only one know to the service
1701     // scenario: disconnect before alice can send her message to bob.
1702     for (curr = from_service_head; NULL != curr; curr = curr->next)
1703       if (curr == session) {
1704         GNUNET_CONTAINER_DLL_remove (from_service_head, from_service_tail, curr);
1705         break;
1706       }
1707     // there is a client waiting for this service session, terminate it, too!
1708     // i assume the tupel of key and element count is unique. if it was not the rest of the code would not work either.
1709     client_session = find_matching_session (from_client_tail,
1710                                             &session->session_id,
1711                                             session->total, NULL);
1712     free_session_variables (session);
1713     GNUNET_free (session);
1714
1715     // the client has to check if it was waiting for a result
1716     // or if it was a responder, no point in adding more statefulness
1717     if (client_session && (!do_shutdown)) {
1718       client_session->client_notification_task =
1719               GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1720                                         client_session);
1721     }
1722   }
1723 }
1724
1725
1726 /**
1727  * Compute our scalar product, done by Alice
1728  *
1729  * @param session - the session associated with this computation
1730  * @return product as MPI, never NULL
1731  */
1732 static gcry_mpi_t
1733 compute_scalar_product (struct ServiceSession * session)
1734 {
1735   uint32_t count;
1736   gcry_mpi_t t;
1737   gcry_mpi_t u;
1738   gcry_mpi_t u_prime;
1739   gcry_mpi_t p;
1740   gcry_mpi_t p_prime;
1741   gcry_mpi_t tmp;
1742   gcry_mpi_t r[session->used_elements_count];
1743   gcry_mpi_t r_prime[session->used_elements_count];
1744   gcry_mpi_t s;
1745   gcry_mpi_t s_prime;
1746   unsigned int i;
1747
1748   count = session->used_elements_count;
1749   // due to the introduced static offset S, we now also have to remove this
1750   // from the E(a_pi)(+)E(-b_pi-r_pi) and E(a_qi)(+)E(-r_qi) twice each,
1751   // the result is E((S + a_pi) + (S -b_pi-r_pi)) and E(S + a_qi + S - r_qi)
1752   for (i = 0; i < count; i++) {
1753     GNUNET_CRYPTO_paillier_decrypt (&my_privkey, &my_pubkey,
1754                                     &session->r[i], r[i]);
1755     gcry_mpi_sub (r[i], r[i], my_offset);
1756     gcry_mpi_sub (r[i], r[i], my_offset);
1757     GNUNET_CRYPTO_paillier_decrypt (&my_privkey, &my_pubkey,
1758                                     &session->r_prime[i], r_prime[i]);
1759     gcry_mpi_sub (r_prime[i], r_prime[i], my_offset);
1760     gcry_mpi_sub (r_prime[i], r_prime[i], my_offset);
1761   }
1762
1763   // calculate t = sum(ai)
1764   t = compute_square_sum (session->sorted_elements, count);
1765
1766   // calculate U
1767   u = gcry_mpi_new (0);
1768   tmp = compute_square_sum (r, count);
1769   gcry_mpi_sub (u, u, tmp);
1770   gcry_mpi_release (tmp);
1771
1772   //calculate U'
1773   u_prime = gcry_mpi_new (0);
1774   tmp = compute_square_sum (r_prime, count);
1775   gcry_mpi_sub (u_prime, u_prime, tmp);
1776
1777   GNUNET_assert (p = gcry_mpi_new (0));
1778   GNUNET_assert (p_prime = gcry_mpi_new (0));
1779   GNUNET_assert (s = gcry_mpi_new (0));
1780   GNUNET_assert (s_prime = gcry_mpi_new (0));
1781
1782   // compute P
1783   GNUNET_CRYPTO_paillier_decrypt (&my_privkey, &my_pubkey,
1784                                   session->s, s);
1785   GNUNET_CRYPTO_paillier_decrypt (&my_privkey, &my_pubkey,
1786                                   session->s_prime, s_prime);
1787
1788   // compute P
1789   gcry_mpi_add (p, s, t);
1790   gcry_mpi_add (p, p, u);
1791
1792   // compute P'
1793   gcry_mpi_add (p_prime, s_prime, t);
1794   gcry_mpi_add (p_prime, p_prime, u_prime);
1795
1796   gcry_mpi_release (t);
1797   gcry_mpi_release (u);
1798   gcry_mpi_release (u_prime);
1799   gcry_mpi_release (s);
1800   gcry_mpi_release (s_prime);
1801
1802   // compute product
1803   gcry_mpi_sub (p, p, p_prime);
1804   gcry_mpi_release (p_prime);
1805   tmp = gcry_mpi_set_ui (tmp, 2);
1806   gcry_mpi_div (p, NULL, p, tmp, 0);
1807
1808   gcry_mpi_release (tmp);
1809   for (i = 0; i < count; i++) {
1810     gcry_mpi_release (session->sorted_elements[i]);
1811     gcry_mpi_release (r[i]);
1812     gcry_mpi_release (r_prime[i]);
1813   }
1814   GNUNET_free (session->a_head);
1815   session->a_head = NULL;
1816   GNUNET_free (session->s);
1817   session->s = NULL;
1818   GNUNET_free (session->s_prime);
1819   session->s_prime = NULL;
1820   GNUNET_free (session->r);
1821   session->r = NULL;
1822   GNUNET_free (session->r_prime);
1823   session->r_prime = NULL;
1824
1825   return p;
1826 }
1827
1828
1829 /**
1830  * Handle a multipart-chunk of a request from another service to calculate a scalarproduct with us.
1831  *
1832  * @param cls closure (set from #GNUNET_CADET_connect)
1833  * @param channel connection to the other end
1834  * @param channel_ctx place to store local state associated with the channel
1835  * @param message the actual message
1836  * @return #GNUNET_OK to keep the connection open,
1837  *         #GNUNET_SYSERR to close it (signal serious error)
1838  */
1839 static int
1840 handle_alices_cyrptodata_message_multipart (void *cls,
1841                                             struct GNUNET_CADET_Channel * channel,
1842                                             void **channel_ctx,
1843                                             const struct GNUNET_MessageHeader * message)
1844 {
1845   struct ServiceSession * session;
1846   const struct GNUNET_SCALARPRODUCT_multipart_message * msg = (const struct GNUNET_SCALARPRODUCT_multipart_message *) message;
1847   struct GNUNET_CRYPTO_PaillierCiphertext *payload;
1848   uint32_t contained_elements;
1849   uint32_t msg_length;
1850
1851   // are we in the correct state?
1852   session = (struct ServiceSession *) * channel_ctx;
1853   //we are not bob
1854   if ((NULL == session->e_a) || //or we did not expect this message yet 
1855       (session->used_elements_count == session->transferred_element_count)) { //we are not expecting multipart messages
1856     goto except;
1857   }
1858   // shorter than minimum?
1859   if (ntohs (msg->header.size) <= sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)) {
1860     goto except;
1861   }
1862   contained_elements = ntohl (msg->contained_element_count);
1863   msg_length = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)
1864           +contained_elements * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
1865   //sanity check
1866   if ((ntohs (msg->header.size) != msg_length)
1867       || (session->used_elements_count < contained_elements + session->transferred_element_count)
1868       || (0 == contained_elements)) {
1869     goto except;
1870   }
1871   payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
1872   // Convert each vector element to MPI_value
1873   memcpy (&session->e_a[session->transferred_element_count], payload,
1874           sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * contained_elements);
1875
1876   session->transferred_element_count += contained_elements;
1877
1878   if (contained_elements == session->used_elements_count) {
1879     // single part finished
1880     if (NULL == session->intersection_op)
1881       // intersection has already finished, so we can proceed
1882       compute_service_response (session);
1883   }
1884
1885   return GNUNET_OK;
1886 except:
1887   session->channel = NULL;
1888   // and notify our client-session that we could not complete the session
1889   free_session_variables (session);
1890   if (NULL != session->client){
1891     //Alice
1892     session->client_notification_task =
1893           GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1894                                     session);
1895   }
1896   else {
1897     //Bob
1898     if (NULL != session->response)
1899     session->response->client_notification_task =
1900           GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1901                                     session->response);
1902     GNUNET_CONTAINER_DLL_remove (from_service_head, from_service_tail, session);
1903     GNUNET_free(session);
1904   }
1905   return GNUNET_SYSERR;
1906 }
1907
1908
1909 /**
1910  * Handle a request from another service to calculate a scalarproduct with us.
1911  *
1912  * @param cls closure (set from #GNUNET_CADET_connect)
1913  * @param channel connection to the other end
1914  * @param channel_ctx place to store local state associated with the channel
1915  * @param message the actual message
1916  * @return #GNUNET_OK to keep the connection open,
1917  *         #GNUNET_SYSERR to close it (signal serious error)
1918  */
1919 static int
1920 handle_alices_cyrptodata_message (void *cls,
1921                                   struct GNUNET_CADET_Channel * channel,
1922                                   void **channel_ctx,
1923                                   const struct GNUNET_MessageHeader * message)
1924 {
1925   struct ServiceSession * session;
1926   const struct GNUNET_SCALARPRODUCT_alices_cryptodata_message * msg = (const struct GNUNET_SCALARPRODUCT_alices_cryptodata_message *) message;
1927   struct GNUNET_CRYPTO_PaillierCiphertext *payload;
1928   uint32_t contained_elements = 0;
1929   uint32_t msg_length;
1930
1931   session = (struct ServiceSession *) * channel_ctx;
1932   //we are not bob
1933   if ((BOB != session->role)
1934       //we are expecting multipart messages instead
1935       || (NULL != session->e_a)
1936       //or we did not expect this message yet
1937       || //intersection OP has not yet finished
1938       !((NULL != session->intersection_op)
1939         //intersection OP done
1940         || (session->response->sorted_elements)
1941         )) {
1942     goto invalid_msg;
1943   }
1944
1945   // shorter than minimum?
1946   if (ntohs (msg->header.size) <= sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)) {
1947     goto invalid_msg;
1948   }
1949
1950   contained_elements = ntohl (msg->contained_element_count);
1951   msg_length = sizeof (struct GNUNET_SCALARPRODUCT_alices_cryptodata_message)
1952           +contained_elements * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
1953
1954   //sanity check: is the message as long as the message_count fields suggests?
1955   if ((ntohs (msg->header.size) != msg_length) ||
1956       (session->used_elements_count < session->transferred_element_count + contained_elements) ||
1957       (0 == contained_elements)) {
1958     goto invalid_msg;
1959   }
1960
1961   session->transferred_element_count = contained_elements;
1962   payload = (struct GNUNET_CRYPTO_PaillierCiphertext*) &msg[1];
1963
1964   session->e_a = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * session->used_elements_count);
1965   memcpy (&session->e_a[0], payload, contained_elements * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
1966   if (contained_elements == session->used_elements_count) {
1967     // single part finished
1968     if (NULL == session->intersection_op)
1969       // intersection has already finished, so we can proceed
1970       compute_service_response (session);
1971   }
1972   return GNUNET_OK;
1973 invalid_msg:
1974   GNUNET_break_op (0);
1975   session->channel = NULL;
1976   // and notify our client-session that we could not complete the session
1977   free_session_variables (session);
1978   if (NULL != session->client){
1979     //Alice
1980     session->client_notification_task =
1981           GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1982                                     session);
1983   }
1984   else {
1985     //Bob
1986     if (NULL != session->response)
1987     session->response->client_notification_task =
1988           GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1989                                     session->response);
1990     GNUNET_CONTAINER_DLL_remove (from_service_head, from_service_tail, session);
1991     GNUNET_free(session);
1992   }
1993   return GNUNET_SYSERR;
1994 }
1995
1996
1997 /**
1998  * Handle a request from another service to calculate a scalarproduct with us.
1999  *
2000  * @param cls closure (set from #GNUNET_CADET_connect)
2001  * @param channel connection to the other end
2002  * @param channel_ctx place to store local state associated with the channel
2003  * @param message the actual message
2004  * @return #GNUNET_OK to keep the connection open,
2005  *         #GNUNET_SYSERR to close it (signal serious error)
2006  */
2007 static int
2008 handle_alices_computation_request (void *cls,
2009                         struct GNUNET_CADET_Channel * channel,
2010                         void **channel_ctx,
2011                         const struct GNUNET_MessageHeader * message)
2012 {
2013   struct ServiceSession * session;
2014   struct ServiceSession * client_session;
2015   const struct GNUNET_SCALARPRODUCT_service_request * msg = (const struct GNUNET_SCALARPRODUCT_service_request *) message;
2016   uint32_t total_elements;
2017
2018   session = (struct ServiceSession *) * channel_ctx;
2019   if (session->total != 0) {
2020     // must be a fresh session
2021     goto invalid_msg;
2022   }
2023   // Check if message was sent by me, which would be bad!
2024   if (!memcmp (&session->peer, &me, sizeof (struct GNUNET_PeerIdentity))) {
2025     GNUNET_free (session);
2026     GNUNET_break (0);
2027     return GNUNET_SYSERR;
2028   }
2029   // shorter than expected?
2030   if (ntohs (msg->header.size) != sizeof (struct GNUNET_SCALARPRODUCT_service_request)) {
2031     GNUNET_free (session);
2032     GNUNET_break_op (0);
2033     return GNUNET_SYSERR;
2034   }
2035   total_elements = ntohl (msg->total_element_count);
2036
2037   //sanity check: is the message as long as the message_count fields suggests?
2038   if (1 > total_elements) {
2039     GNUNET_free (session);
2040     GNUNET_break_op (0);
2041     return GNUNET_SYSERR;
2042   }
2043   if (find_matching_session (from_service_tail,
2044                              &msg->session_id,
2045                              total_elements,
2046                              NULL)) {
2047     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2048                 _ ("Got message with duplicate session key (`%s'), ignoring service request.\n"),
2049                 (const char *) &(msg->session_id));
2050     GNUNET_free (session);
2051     return GNUNET_SYSERR;
2052   }
2053
2054   session->total = total_elements;
2055   session->channel = channel;
2056
2057   // session key
2058   memcpy (&session->session_id, &msg->session_id, sizeof (struct GNUNET_HashCode));
2059
2060   // public key
2061   memcpy (&session->remote_pubkey, &msg->public_key, sizeof (struct GNUNET_CRYPTO_PaillierPublicKey));
2062
2063   //check if service queue contains a matching request
2064   client_session = find_matching_session (from_client_tail,
2065                                           &session->session_id,
2066                                           session->total, NULL);
2067
2068   GNUNET_CONTAINER_DLL_insert (from_service_head, from_service_tail, session);
2069
2070   if ((NULL != client_session)
2071       && (client_session->transferred_element_count == client_session->total)) {
2072
2073     GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Got session with key %s and a matching element set, processing.\n"), GNUNET_h2s (&session->session_id));
2074
2075     session->response = client_session;
2076     session->intersected_elements = client_session->intersected_elements;
2077     client_session->intersected_elements = NULL;
2078     session->intersection_set = client_session->intersection_set;
2079     client_session->intersection_set = NULL;
2080
2081     session->intersection_op = GNUNET_SET_prepare (&session->peer,
2082                                                    &session->session_id,
2083                                                    NULL,
2084                                                    GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT16_MAX),
2085                                                    GNUNET_SET_RESULT_REMOVED,
2086                                                    cb_intersection_element_removed,
2087                                                    session);
2088
2089     GNUNET_SET_commit (session->intersection_op, session->intersection_set);
2090   }
2091   else {
2092     GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Got session with key %s without a matching element set, queueing.\n"), GNUNET_h2s (&session->session_id));
2093   }
2094
2095   return GNUNET_OK;
2096 invalid_msg:
2097   GNUNET_break_op (0);
2098   session->channel = NULL;
2099   // and notify our client-session that we could not complete the session
2100   free_session_variables (session);
2101   if (NULL != session->client){
2102     //Alice
2103     session->client_notification_task =
2104           GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
2105                                     session);
2106   }
2107   else {
2108     //Bob
2109     if (NULL != session->response)
2110     session->response->client_notification_task =
2111           GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
2112                                     session->response);
2113     GNUNET_CONTAINER_DLL_remove (from_service_head, from_service_tail, session);
2114     GNUNET_free(session);
2115   }
2116   return GNUNET_SYSERR;
2117 }
2118
2119
2120 /**
2121  * Handle a multipart chunk of a response we got from another service we wanted to calculate a scalarproduct with.
2122  *
2123  * @param cls closure (set from #GNUNET_CADET_connect)
2124  * @param channel connection to the other end
2125  * @param channel_ctx place to store local state associated with the channel
2126  * @param message the actual message
2127  * @return #GNUNET_OK to keep the connection open,
2128  *         #GNUNET_SYSERR to close it (signal serious error)
2129  */
2130 static int
2131 handle_bobs_cryptodata_multipart (void *cls,
2132                                    struct GNUNET_CADET_Channel * channel,
2133                                    void **channel_ctx,
2134                                    const struct GNUNET_MessageHeader * message)
2135 {
2136   struct ServiceSession * session;
2137   const struct GNUNET_SCALARPRODUCT_multipart_message * msg = (const struct GNUNET_SCALARPRODUCT_multipart_message *) message;
2138   struct GNUNET_CRYPTO_PaillierCiphertext * payload;
2139   size_t i;
2140   uint32_t contained = 0;
2141   size_t msg_size;
2142   size_t required_size;
2143
2144   GNUNET_assert (NULL != message);
2145   // are we in the correct state?
2146   session = (struct ServiceSession *) * channel_ctx;
2147   if ((ALICE != session->role) || (NULL == session->sorted_elements)) {
2148     goto invalid_msg;
2149   }
2150   msg_size = ntohs (msg->header.size);
2151   required_size = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)
2152           + 2 * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
2153   // shorter than minimum?
2154   if (required_size > msg_size) {
2155     goto invalid_msg;
2156   }
2157   contained = ntohl (msg->contained_element_count);
2158   required_size = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)
2159           + 2 * contained * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
2160   //sanity check: is the message as long as the message_count fields suggests?
2161   if ((required_size != msg_size) || (session->used_elements_count < session->transferred_element_count + contained)) {
2162     goto invalid_msg;
2163   }
2164   payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
2165   // Convert each k[][perm] to its MPI_value
2166   for (i = 0; i < contained; i++) {
2167     memcpy (&session->r[session->transferred_element_count + i], &payload[2 * i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
2168     memcpy (&session->r_prime[session->transferred_element_count + i], &payload[2 * i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
2169   }
2170   session->transferred_element_count += contained;
2171   if (session->transferred_element_count != session->used_elements_count)
2172     return GNUNET_OK;
2173   session->product = compute_scalar_product (session); //never NULL
2174
2175 invalid_msg:
2176   GNUNET_break_op (NULL != session->product);
2177   session->channel = NULL;
2178   // send message with product to client
2179   if (NULL != session->client){
2180     //Alice
2181     session->client_notification_task =
2182           GNUNET_SCHEDULER_add_now (&prepare_client_response,
2183                                     session);
2184   }
2185   else {
2186     //Bob
2187     if (NULL != session->response)
2188     session->response->client_notification_task =
2189           GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
2190                                     session->response);
2191     GNUNET_CONTAINER_DLL_remove (from_service_head, from_service_tail, session);
2192     free_session_variables (session);
2193     GNUNET_free(session);
2194   }
2195   // the channel has done its job, terminate our connection and the channel
2196   // the peer will be notified that the channel was destroyed via channel_destruction_handler
2197   // just close the connection, as recommended by Christian
2198   return GNUNET_SYSERR;
2199 }
2200
2201
2202 /**
2203  * Handle a response we got from another service we wanted to calculate a scalarproduct with.
2204  *
2205  * @param cls closure (set from #GNUNET_CADET_connect)
2206  * @param channel connection to the other end
2207  * @param channel_ctx place to store local state associated with the channel
2208  * @param message the actual message
2209  * @return #GNUNET_OK to keep the connection open,
2210  *         #GNUNET_SYSERR to close it (we are done)
2211  */
2212 static int
2213 handle_bobs_cryptodata_message (void *cls,
2214                          struct GNUNET_CADET_Channel * channel,
2215                          void **channel_ctx,
2216                          const struct GNUNET_MessageHeader * message)
2217 {
2218   struct ServiceSession * session;
2219   const struct GNUNET_SCALARPRODUCT_service_response * msg = (const struct GNUNET_SCALARPRODUCT_service_response *) message;
2220   struct GNUNET_CRYPTO_PaillierCiphertext * payload;
2221   size_t i;
2222   uint32_t contained = 0;
2223   size_t msg_size;
2224   size_t required_size;
2225
2226   GNUNET_assert (NULL != message);
2227   session = (struct ServiceSession *) * channel_ctx;
2228   // are we in the correct state?
2229   if (0 /*//TODO: correct state*/) {
2230     goto invalid_msg;
2231   }
2232   //we need at least a full message without elements attached
2233   msg_size = ntohs (msg->header.size);
2234   required_size = sizeof (struct GNUNET_SCALARPRODUCT_service_response) + 2 * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
2235
2236   if (required_size > msg_size) {
2237     goto invalid_msg;
2238   }
2239   contained = ntohl (msg->contained_element_count);
2240   required_size = sizeof (struct GNUNET_SCALARPRODUCT_service_response)
2241           + 2 * contained * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)
2242           + 2 * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
2243   //sanity check: is the message as long as the message_count fields suggests?
2244   if ((msg_size != required_size) || (session->used_elements_count < contained)) {
2245     goto invalid_msg;
2246   }
2247   session->transferred_element_count = contained;
2248   //convert s
2249   payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
2250
2251   session->s = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
2252   session->s_prime = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
2253   memcpy (session->s, &payload[0], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
2254   memcpy (session->s_prime, &payload[1], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
2255
2256   session->r = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * session->used_elements_count);
2257   session->r_prime = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * session->used_elements_count);
2258
2259   // Convert each k[][perm] to its MPI_value
2260   for (i = 0; i < contained; i++) {
2261     memcpy (&session->r[i], &payload[2 + 2 * i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
2262     memcpy (&session->r_prime[i], &payload[3 + 2 * i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
2263   }
2264   if (session->transferred_element_count != session->used_elements_count)
2265     return GNUNET_OK; //wait for the other multipart chunks
2266   session->product = compute_scalar_product (session); //never NULL
2267
2268 invalid_msg:
2269   GNUNET_break_op (NULL != session->product);
2270   session->channel = NULL;
2271   // send message with product to client
2272   if (NULL != session->client){
2273     //Alice
2274     session->client_notification_task =
2275           GNUNET_SCHEDULER_add_now (&prepare_client_response,
2276                                     session);
2277   }
2278   else {
2279     //Bob
2280     if (NULL != session->response)
2281     session->response->client_notification_task =
2282           GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
2283                                     session->response);
2284     GNUNET_CONTAINER_DLL_remove (from_service_head, from_service_tail, session);
2285     free_session_variables (session);
2286     GNUNET_free(session);
2287   }
2288   // the channel has done its job, terminate our connection and the channel
2289   // the peer will be notified that the channel was destroyed via channel_destruction_handler
2290   // just close the connection, as recommended by Christian
2291   return GNUNET_SYSERR;
2292 }
2293
2294
2295 /**
2296  * Task run during shutdown.
2297  *
2298  * @param cls unused
2299  * @param tc unused
2300  */
2301 static void
2302 shutdown_task (void *cls,
2303                const struct GNUNET_SCHEDULER_TaskContext *tc)
2304 {
2305   struct ServiceSession * session;
2306   GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Shutting down, initiating cleanup.\n"));
2307
2308   do_shutdown = GNUNET_YES;
2309
2310   // terminate all owned open channels.
2311   for (session = from_client_head; NULL != session; session = session->next) {
2312     if ((0/*//TODO: not finalized*/) && (NULL != session->channel)) {
2313       GNUNET_CADET_channel_destroy (session->channel);
2314       session->channel = NULL;
2315     }
2316     if (GNUNET_SCHEDULER_NO_TASK != session->client_notification_task) {
2317       GNUNET_SCHEDULER_cancel (session->client_notification_task);
2318       session->client_notification_task = GNUNET_SCHEDULER_NO_TASK;
2319     }
2320     if (NULL != session->client) {
2321       GNUNET_SERVER_client_disconnect (session->client);
2322       session->client = NULL;
2323     }
2324   }
2325   for (session = from_service_head; NULL != session; session = session->next)
2326     if (NULL != session->channel) {
2327       GNUNET_CADET_channel_destroy (session->channel);
2328       session->channel = NULL;
2329     }
2330
2331   if (my_cadet) {
2332     GNUNET_CADET_disconnect (my_cadet);
2333     my_cadet = NULL;
2334   }
2335 }
2336
2337
2338 /**
2339  * Initialization of the program and message handlers
2340  *
2341  * @param cls closure
2342  * @param server the initialized server
2343  * @param c configuration to use
2344  */
2345 static void
2346 run (void *cls,
2347      struct GNUNET_SERVER_Handle *server,
2348      const struct GNUNET_CONFIGURATION_Handle *c)
2349 {
2350   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2351     {&handle_client_message, NULL, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE, 0},
2352     {&handle_client_message, NULL, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB, 0},
2353     {&handle_client_message_multipart, NULL, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART, 0},
2354     {NULL, NULL, 0, 0}
2355   };
2356   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
2357     { &handle_alices_computation_request, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA, 0},
2358     { &handle_alices_cyrptodata_message, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA, 0},
2359     { &handle_alices_cyrptodata_message_multipart, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA_MULTIPART, 0},
2360     { &handle_bobs_cryptodata_message, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA, 0},
2361     { &handle_bobs_cryptodata_multipart, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA_MULTIPART, 0},
2362     {NULL, 0, 0}
2363   };
2364   static const uint32_t ports[] = {
2365     GNUNET_APPLICATION_TYPE_SCALARPRODUCT,
2366     0
2367   };
2368   //generate private/public key set
2369   GNUNET_CRYPTO_paillier_create (&my_pubkey, &my_privkey);
2370
2371   // offset has to be sufficiently small to allow computation of:
2372   // m1+m2 mod n == (S + a) + (S + b) mod n,
2373   // if we have more complex operations, this factor needs to be lowered
2374   my_offset = gcry_mpi_new (GNUNET_CRYPTO_PAILLIER_BITS / 3);
2375   gcry_mpi_set_bit (my_offset, GNUNET_CRYPTO_PAILLIER_BITS / 3);
2376
2377   // register server callbacks and disconnect handler
2378   GNUNET_SERVER_add_handlers (server, server_handlers);
2379   GNUNET_SERVER_disconnect_notify (server,
2380                                    &handle_client_disconnect,
2381                                    NULL);
2382   GNUNET_break (GNUNET_OK ==
2383                 GNUNET_CRYPTO_get_peer_identity (c,
2384                                                  &me));
2385   my_cadet = GNUNET_CADET_connect (c, NULL,
2386                                  &cb_channel_incoming,
2387                                  &cb_channel_destruction,
2388                                  cadet_handlers, ports);
2389   if (!my_cadet) {
2390     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Connect to CADET failed\n"));
2391     GNUNET_SCHEDULER_shutdown ();
2392     return;
2393   }
2394   GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("CADET initialized\n"));
2395   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2396                                 &shutdown_task,
2397                                 NULL);
2398 }
2399
2400
2401 /**
2402  * The main function for the scalarproduct service.
2403  *
2404  * @param argc number of arguments from the command line
2405  * @param argv command line arguments
2406  * @return 0 ok, 1 on error
2407  */
2408 int
2409 main (int argc, char *const *argv)
2410 {
2411   return (GNUNET_OK ==
2412           GNUNET_SERVICE_run (argc, argv,
2413                               "scalarproduct",
2414                               GNUNET_SERVICE_OPTION_NONE,
2415                               &run, NULL)) ? 0 : 1;
2416 }
2417
2418 /* end of gnunet-service-scalarproduct.c */