- fix CADET-using services
[oweals/gnunet.git] / src / scalarproduct / gnunet-service-scalarproduct-ecc_bob.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19  */
20 /**
21  * @file scalarproduct/gnunet-service-scalarproduct-ecc_bob.c
22  * @brief scalarproduct service implementation
23  * @author Christian M. Fuchs
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include <limits.h>
28 #include <gcrypt.h>
29 #include "gnunet_util_lib.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_scalarproduct_service.h"
35 #include "gnunet_set_service.h"
36 #include "scalarproduct.h"
37 #include "gnunet-service-scalarproduct-ecc.h"
38
39 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-bob", __VA_ARGS__)
40
41
42 /**
43  * An encrypted element key-value pair.
44  */
45 struct MpiElement
46 {
47   /**
48    * Key used to identify matching pairs of values to multiply.
49    * Points into an existing data structure, to avoid copying
50    * and doubling memory use.
51    */
52   const struct GNUNET_HashCode *key;
53
54   /**
55    * Value represented (a).
56    */
57   gcry_mpi_t value;
58 };
59
60
61 /**
62  * An incoming session from CADET.
63  */
64 struct CadetIncomingSession;
65
66
67 /**
68  * A scalarproduct session which tracks an offer for a
69  * multiplication service by a local client.
70  */
71 struct BobServiceSession
72 {
73
74   /**
75    * (hopefully) unique transaction ID
76    */
77   struct GNUNET_HashCode session_id;
78
79   /**
80    * The client this request is related to.
81    */
82   struct GNUNET_SERVER_Client *client;
83
84   /**
85    * Client message queue.
86    */
87   struct GNUNET_MQ_Handle *client_mq;
88
89   /**
90    * All non-0-value'd elements transmitted to us.
91    */
92   struct GNUNET_CONTAINER_MultiHashMap *intersected_elements;
93
94   /**
95    * Set of elements for which we will be conducting an intersection.
96    * The resulting elements are then used for computing the scalar product.
97    */
98   struct GNUNET_SET_Handle *intersection_set;
99
100   /**
101    * Set of elements for which will conduction an intersection.
102    * the resulting elements are then used for computing the scalar product.
103    */
104   struct GNUNET_SET_OperationHandle *intersection_op;
105
106   /**
107    * b(Bob)
108    */
109   struct MpiElement *sorted_elements;
110
111   /**
112    * Product of the g_i^{b_i}
113    */
114   gcry_mpi_point_t prod_g_i_b_i;
115
116   /**
117    * Product of the h_i^{b_i}
118    */
119   gcry_mpi_point_t prod_h_i_b_i;
120
121   /**
122    * Handle for our associated incoming CADET session, or NULL
123    * if we have not gotten one yet.
124    */
125   struct CadetIncomingSession *cadet;
126
127   /**
128    * How many elements will be supplied in total from the client.
129    */
130   uint32_t total;
131
132   /**
133    * Already transferred elements (received) for multipart
134    * messages from client. Always less than @e total.
135    */
136   uint32_t client_received_element_count;
137
138   /**
139    * How many elements actually are used for the scalar product.
140    * Size of the arrays in @e r and @e r_prime.  Also sometimes
141    * used as an index into the arrays during construction.
142    */
143   uint32_t used_element_count;
144
145   /**
146    * Counts the number of values received from Alice by us.
147    * Always less than @e used_element_count.
148    */
149   uint32_t cadet_received_element_count;
150
151   /**
152    * State of this session.   In
153    * #GNUNET_SCALARPRODUCT_STATUS_ACTIVE while operation is
154    * ongoing, afterwards in #GNUNET_SCALARPRODUCT_STATUS_SUCCESS or
155    * #GNUNET_SCALARPRODUCT_STATUS_FAILURE.
156    */
157   enum GNUNET_SCALARPRODUCT_ResponseStatus status;
158
159   /**
160    * Are we already in #destroy_service_session()?
161    */
162   int in_destroy;
163
164 };
165
166
167 /**
168  * An incoming session from CADET.
169  */
170 struct CadetIncomingSession
171 {
172
173   /**
174    * Associated client session, or NULL.
175    */
176   struct BobServiceSession *s;
177
178   /**
179    * The CADET channel.
180    */
181   struct GNUNET_CADET_Channel *channel;
182
183   /**
184    * Originator's peer identity. (Only for diagnostics.)
185    */
186   struct GNUNET_PeerIdentity peer;
187
188   /**
189    * (hopefully) unique transaction ID
190    */
191   struct GNUNET_HashCode session_id;
192
193   /**
194    * The message queue for this channel.
195    */
196   struct GNUNET_MQ_Handle *cadet_mq;
197
198   /**
199    * Has this CADET session been added to the map yet?
200    * #GNUNET_YES if so, in which case @e session_id is
201    * the key.
202    */
203   int in_map;
204
205   /**
206    * Are we already in #destroy_cadet_session()?
207    */
208   int in_destroy;
209
210 };
211
212
213 /**
214  * GNUnet configuration handle
215  */
216 static const struct GNUNET_CONFIGURATION_Handle *cfg;
217
218 /**
219  * Map of `struct BobServiceSession`, by session keys.
220  */
221 static struct GNUNET_CONTAINER_MultiHashMap *client_sessions;
222
223 /**
224  * Map of `struct CadetIncomingSession`, by session keys.
225  */
226 static struct GNUNET_CONTAINER_MultiHashMap *cadet_sessions;
227
228 /**
229  * Handle to the CADET service.
230  */
231 static struct GNUNET_CADET_Handle *my_cadet;
232
233 /**
234  * Context for DLOG operations on a curve.
235  */
236 static struct GNUNET_CRYPTO_EccDlogContext *edc;
237
238
239
240 /**
241  * Finds a not terminated client session in the respective map based on
242  * session key.
243  *
244  * @param key the session key we want to search for
245  * @return the matching session, or NULL for none
246  */
247 static struct BobServiceSession *
248 find_matching_client_session (const struct GNUNET_HashCode *key)
249 {
250   return GNUNET_CONTAINER_multihashmap_get (client_sessions,
251                                             key);
252 }
253
254
255 /**
256  * Finds a CADET session in the respective map based on session key.
257  *
258  * @param key the session key we want to search for
259  * @return the matching session, or NULL for none
260  */
261 static struct CadetIncomingSession *
262 find_matching_cadet_session (const struct GNUNET_HashCode *key)
263 {
264   return GNUNET_CONTAINER_multihashmap_get (cadet_sessions,
265                                             key);
266 }
267
268
269 /**
270  * Callback used to free the elements in the map.
271  *
272  * @param cls NULL
273  * @param key key of the element
274  * @param value the value to free
275  */
276 static int
277 free_element_cb (void *cls,
278                  const struct GNUNET_HashCode *key,
279                  void *value)
280 {
281   struct GNUNET_SCALARPRODUCT_Element *element = value;
282
283   GNUNET_free (element);
284   return GNUNET_OK;
285 }
286
287
288 /**
289  * Destroy session state, we are done with it.
290  *
291  * @param session the session to free elements from
292  */
293 static void
294 destroy_cadet_session (struct CadetIncomingSession *s);
295
296
297 /**
298  * Destroy session state, we are done with it.
299  *
300  * @param session the session to free elements from
301  */
302 static void
303 destroy_service_session (struct BobServiceSession *s)
304 {
305   struct CadetIncomingSession *in;
306   unsigned int i;
307
308   if (GNUNET_YES == s->in_destroy)
309     return;
310   s->in_destroy = GNUNET_YES;
311   if (NULL != (in = s->cadet))
312   {
313     s->cadet = NULL;
314     destroy_cadet_session (in);
315   }
316   if (NULL != s->client_mq)
317   {
318     GNUNET_MQ_destroy (s->client_mq);
319     s->client_mq = NULL;
320   }
321   if (NULL != s->client)
322   {
323     GNUNET_SERVER_client_disconnect (s->client);
324     s->client = NULL;
325   }
326   GNUNET_assert (GNUNET_YES ==
327                  GNUNET_CONTAINER_multihashmap_remove (client_sessions,
328                                                        &s->session_id,
329                                                        s));
330   if (NULL != s->intersected_elements)
331   {
332     GNUNET_CONTAINER_multihashmap_iterate (s->intersected_elements,
333                                            &free_element_cb,
334                                            NULL);
335     GNUNET_CONTAINER_multihashmap_destroy (s->intersected_elements);
336     s->intersected_elements = NULL;
337   }
338   if (NULL != s->intersection_op)
339   {
340     GNUNET_SET_operation_cancel (s->intersection_op);
341     s->intersection_op = NULL;
342   }
343   if (NULL != s->intersection_set)
344   {
345     GNUNET_SET_destroy (s->intersection_set);
346     s->intersection_set = NULL;
347   }
348   if (NULL != s->sorted_elements)
349   {
350     for (i=0;i<s->used_element_count;i++)
351       gcry_mpi_release (s->sorted_elements[i].value);
352     GNUNET_free (s->sorted_elements);
353     s->sorted_elements = NULL;
354   }
355   if (NULL != s->prod_g_i_b_i)
356   {
357     gcry_mpi_point_release (s->prod_g_i_b_i);
358     s->prod_g_i_b_i = NULL;
359   }
360   if (NULL != s->prod_h_i_b_i)
361   {
362     gcry_mpi_point_release (s->prod_h_i_b_i);
363     s->prod_h_i_b_i = NULL;
364   }
365   GNUNET_free (s);
366 }
367
368
369 /**
370  * Destroy incoming CADET session state, we are done with it.
371  *
372  * @param in the session to free elements from
373  */
374 static void
375 destroy_cadet_session (struct CadetIncomingSession *in)
376 {
377   struct BobServiceSession *s;
378
379   if (GNUNET_YES == in->in_destroy)
380     return;
381   in->in_destroy = GNUNET_YES;
382   if (NULL != (s = in->s))
383   {
384     in->s = NULL;
385     destroy_service_session (s);
386   }
387   if (GNUNET_YES == in->in_map)
388   {
389     GNUNET_assert (GNUNET_YES ==
390                    GNUNET_CONTAINER_multihashmap_remove (cadet_sessions,
391                                                          &in->session_id,
392                                                          in));
393     in->in_map = GNUNET_NO;
394   }
395   if (NULL != in->cadet_mq)
396   {
397     GNUNET_MQ_destroy (in->cadet_mq);
398     in->cadet_mq = NULL;
399   }
400   if (NULL != in->channel)
401   {
402     GNUNET_CADET_channel_destroy (in->channel);
403     in->channel = NULL;
404   }
405   GNUNET_free (in);
406 }
407
408
409 /**
410  * Notify the client that the session has succeeded or failed.  This
411  * message gets sent to Bob's client if the operation completed or
412  * Alice disconnected.
413  *
414  * @param session the associated client session to fail or succeed
415  */
416 static void
417 prepare_client_end_notification (struct BobServiceSession *session)
418 {
419   struct ClientResponseMessage *msg;
420   struct GNUNET_MQ_Envelope *e;
421
422   if (NULL == session->client_mq)
423     return; /* no client left to be notified */
424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425               "Sending session-end notification with status %d to client for session %s\n",
426               session->status,
427               GNUNET_h2s (&session->session_id));
428   e = GNUNET_MQ_msg (msg,
429                      GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
430   msg->range = 0;
431   msg->product_length = htonl (0);
432   msg->status = htonl (session->status);
433   GNUNET_MQ_send (session->client_mq,
434                   e);
435 }
436
437
438 /**
439  * Function called whenever a channel is destroyed.  Should clean up
440  * any associated state.
441  *
442  * It must NOT call #GNUNET_CADET_channel_destroy() on the channel.
443  *
444  * @param cls closure (set from #GNUNET_CADET_connect())
445  * @param channel connection to the other end (henceforth invalid)
446  * @param channel_ctx place where local state associated
447  *                   with the channel is stored
448  */
449 static void
450 cb_channel_destruction (void *cls,
451                         const struct GNUNET_CADET_Channel *channel,
452                         void *channel_ctx)
453 {
454   struct CadetIncomingSession *in = channel_ctx;
455   struct BobServiceSession *s;
456
457   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
458               "Peer disconnected, terminating session %s with peer %s\n",
459               GNUNET_h2s (&in->session_id),
460               GNUNET_i2s (&in->peer));
461   if (NULL != in->cadet_mq)
462   {
463     GNUNET_MQ_destroy (in->cadet_mq);
464     in->cadet_mq = NULL;
465   }
466   in->channel = NULL;
467   if (NULL != (s = in->s))
468   {
469     if (GNUNET_SCALARPRODUCT_STATUS_ACTIVE == s->status)
470     {
471       s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
472       prepare_client_end_notification (s);
473     }
474   }
475   destroy_cadet_session (in);
476 }
477
478
479 /**
480  * MQ finished giving our last message to CADET, now notify
481  * the client that we are finished.
482  */
483 static void
484 bob_cadet_done_cb (void *cls)
485 {
486   struct BobServiceSession *session = cls;
487
488   session->status = GNUNET_SCALARPRODUCT_STATUS_SUCCESS;
489   prepare_client_end_notification (session);
490 }
491
492
493 /**
494  * Bob generates the response message to be sent to Alice.
495  *
496  * @param s the associated requesting session with Alice
497  */
498 static void
499 transmit_bobs_cryptodata_message (struct BobServiceSession *s)
500 {
501   struct EccBobCryptodataMessage *msg;
502   struct GNUNET_MQ_Envelope *e;
503
504   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
505               "Sending response to Alice\n");
506   e = GNUNET_MQ_msg (msg,
507                      GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ECC_BOB_CRYPTODATA);
508   msg->contained_element_count = htonl (2);
509   if (NULL != s->prod_g_i_b_i)
510     GNUNET_CRYPTO_ecc_point_to_bin (edc,
511                                     s->prod_g_i_b_i,
512                                     &msg->prod_g_i_b_i);
513   if (NULL != s->prod_h_i_b_i)
514     GNUNET_CRYPTO_ecc_point_to_bin (edc,
515                                     s->prod_h_i_b_i,
516                                     &msg->prod_h_i_b_i);
517   GNUNET_MQ_notify_sent (e,
518                          &bob_cadet_done_cb,
519                          s);
520   GNUNET_MQ_send (s->cadet->cadet_mq,
521                   e);
522 }
523
524
525 /**
526  * Iterator to copy over messages from the hash map
527  * into an array for sorting.
528  *
529  * @param cls the `struct BobServiceSession *`
530  * @param key the key (unused)
531  * @param value the `struct GNUNET_SCALARPRODUCT_Element *`
532  * TODO: code duplication with Alice!
533  */
534 static int
535 copy_element_cb (void *cls,
536                  const struct GNUNET_HashCode *key,
537                  void *value)
538 {
539   struct BobServiceSession *s = cls;
540   struct GNUNET_SCALARPRODUCT_Element *e = value;
541   gcry_mpi_t mval;
542   int64_t val;
543
544   mval = gcry_mpi_new (0);
545   val = (int64_t) GNUNET_ntohll (e->value);
546   if (0 > val)
547     gcry_mpi_sub_ui (mval, mval, -val);
548   else
549     gcry_mpi_add_ui (mval, mval, val);
550   s->sorted_elements [s->used_element_count].value = mval;
551   s->sorted_elements [s->used_element_count].key = &e->key;
552   s->used_element_count++;
553   return GNUNET_OK;
554 }
555
556
557 /**
558  * Compare two `struct MpiValue`s by key for sorting.
559  *
560  * @param a pointer to first `struct MpiValue *`
561  * @param b pointer to first `struct MpiValue *`
562  * @return -1 for a < b, 0 for a=b, 1 for a > b.
563  * TODO: code duplication with Alice!
564  */
565 static int
566 element_cmp (const void *a,
567              const void *b)
568 {
569   const struct MpiElement *ma = a;
570   const struct MpiElement *mb = b;
571
572   return GNUNET_CRYPTO_hash_cmp (ma->key,
573                                  mb->key);
574 }
575
576
577 /**
578  * Handle a multipart-chunk of a request from another service to
579  * calculate a scalarproduct with us.
580  *
581  * @param cls closure (set from #GNUNET_CADET_connect)
582  * @param channel connection to the other end
583  * @param channel_ctx place to store local state associated with the @a channel
584  * @param message the actual message
585  * @return #GNUNET_OK to keep the connection open,
586  *         #GNUNET_SYSERR to close it (signal serious error)
587  */
588 static int
589 handle_alices_cryptodata_message (void *cls,
590                                   struct GNUNET_CADET_Channel *channel,
591                                   void **channel_ctx,
592                                   const struct GNUNET_MessageHeader *message)
593 {
594   struct CadetIncomingSession *in = *channel_ctx;
595   struct BobServiceSession *s;
596   const struct EccAliceCryptodataMessage *msg;
597   const struct GNUNET_CRYPTO_EccPoint *payload;
598   uint32_t contained_elements;
599   size_t msg_length;
600   uint16_t msize;
601   unsigned int max;
602   unsigned int i;
603   const struct MpiElement *b_i;
604   gcry_mpi_point_t tmp;
605   gcry_mpi_point_t g_i;
606   gcry_mpi_point_t h_i;
607   gcry_mpi_point_t g_i_b_i;
608   gcry_mpi_point_t h_i_b_i;
609
610   /* sanity checks */
611   if (NULL == in)
612   {
613     GNUNET_break_op (0);
614     return GNUNET_SYSERR;
615   }
616   s = in->s;
617   if (NULL == s)
618   {
619     GNUNET_break_op (0);
620     return GNUNET_SYSERR;
621   }
622   /* sort our vector for the computation */
623   if (NULL == s->sorted_elements)
624   {
625     s->sorted_elements
626       = GNUNET_malloc (GNUNET_CONTAINER_multihashmap_size (s->intersected_elements) *
627                        sizeof (struct MpiElement));
628     s->used_element_count = 0;
629     GNUNET_CONTAINER_multihashmap_iterate (s->intersected_elements,
630                                            &copy_element_cb,
631                                            s);
632     qsort (s->sorted_elements,
633            s->used_element_count,
634            sizeof (struct MpiElement),
635            &element_cmp);
636   }
637
638   /* parse message */
639   msize = ntohs (message->size);
640   if (msize <= sizeof (struct EccAliceCryptodataMessage))
641   {
642     GNUNET_break_op (0);
643     return GNUNET_SYSERR;
644   }
645   msg = (const struct EccAliceCryptodataMessage *) message;
646   contained_elements = ntohl (msg->contained_element_count);
647   /* Our intersection may still be ongoing, but this is nevertheless
648      an upper bound on the required array size */
649   max = GNUNET_CONTAINER_multihashmap_size (s->intersected_elements);
650   msg_length = sizeof (struct EccAliceCryptodataMessage)
651     + contained_elements * sizeof (struct GNUNET_CRYPTO_EccPoint) * 2;
652   if ( (msize != msg_length) ||
653        (0 == contained_elements) ||
654        (contained_elements > UINT16_MAX) ||
655        (max < contained_elements + s->cadet_received_element_count) )
656   {
657     GNUNET_break_op (0);
658     return GNUNET_SYSERR;
659   }
660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
661               "Received %u crypto values from Alice\n",
662               (unsigned int) contained_elements);
663   payload = (const struct GNUNET_CRYPTO_EccPoint *) &msg[1];
664
665   for (i=0;i<contained_elements;i++)
666   {
667     b_i = &s->sorted_elements[i + s->cadet_received_element_count];
668     g_i = GNUNET_CRYPTO_ecc_bin_to_point (edc,
669                                           &payload[i * 2]);
670     g_i_b_i = GNUNET_CRYPTO_ecc_pmul_mpi (edc,
671                                           g_i,
672                                           b_i->value);
673     gcry_mpi_point_release (g_i);
674     h_i = GNUNET_CRYPTO_ecc_bin_to_point (edc,
675                                           &payload[i * 2 + 1]);
676     h_i_b_i = GNUNET_CRYPTO_ecc_pmul_mpi (edc,
677                                           h_i,
678                                           b_i->value);
679     gcry_mpi_point_release (h_i);
680     if (0 == i + s->cadet_received_element_count)
681     {
682       /* first iteration, nothing to add */
683       s->prod_g_i_b_i = g_i_b_i;
684       s->prod_h_i_b_i = h_i_b_i;
685     }
686     else
687     {
688       /* further iterations, cummulate resulting value */
689       tmp = GNUNET_CRYPTO_ecc_add (edc,
690                                    s->prod_g_i_b_i,
691                                    g_i_b_i);
692       gcry_mpi_point_release (s->prod_g_i_b_i);
693       gcry_mpi_point_release (g_i_b_i);
694       s->prod_g_i_b_i = tmp;
695       tmp = GNUNET_CRYPTO_ecc_add (edc,
696                                    s->prod_h_i_b_i,
697                                    h_i_b_i);
698       gcry_mpi_point_release (s->prod_h_i_b_i);
699       gcry_mpi_point_release (h_i_b_i);
700       s->prod_h_i_b_i = tmp;
701     }
702   }
703   s->cadet_received_element_count += contained_elements;
704   if ( (s->cadet_received_element_count == max) &&
705        (NULL == s->intersection_op) )
706   {
707     /* intersection has finished also on our side, and
708        we got the full set, so we can proceed with the
709        CADET response(s) */
710     transmit_bobs_cryptodata_message (s);
711   }
712   GNUNET_CADET_receive_done (s->cadet->channel);
713   return GNUNET_OK;
714 }
715
716
717 /**
718  * Callback for set operation results. Called for each element
719  * that needs to be removed from the result set.
720  *
721  * @param cls closure with the `struct BobServiceSession`
722  * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
723  * @param status what has happened with the set intersection?
724  */
725 static void
726 cb_intersection_element_removed (void *cls,
727                                  const struct GNUNET_SET_Element *element,
728                                  enum GNUNET_SET_Status status)
729 {
730   struct BobServiceSession *s = cls;
731   struct GNUNET_SCALARPRODUCT_Element *se;
732
733   switch (status)
734   {
735   case GNUNET_SET_STATUS_OK:
736     /* this element has been removed from the set */
737     se = GNUNET_CONTAINER_multihashmap_get (s->intersected_elements,
738                                             element->data);
739     GNUNET_assert (NULL != se);
740     LOG (GNUNET_ERROR_TYPE_DEBUG,
741          "Removed element with key %s and value %lld\n",
742          GNUNET_h2s (&se->key),
743          (long long) GNUNET_ntohll (se->value));
744     GNUNET_assert (GNUNET_YES ==
745                    GNUNET_CONTAINER_multihashmap_remove (s->intersected_elements,
746                                                          element->data,
747                                                          se));
748     GNUNET_free (se);
749     return;
750   case GNUNET_SET_STATUS_DONE:
751     s->intersection_op = NULL;
752     GNUNET_break (NULL == s->intersection_set);
753     GNUNET_CADET_receive_done (s->cadet->channel);
754     LOG (GNUNET_ERROR_TYPE_DEBUG,
755          "Finished intersection, %d items remain\n",
756          GNUNET_CONTAINER_multihashmap_size (s->intersected_elements));
757     if (s->client_received_element_count ==
758         GNUNET_CONTAINER_multihashmap_size (s->intersected_elements))
759     {
760       /* CADET transmission from Alice is also already done,
761          start with our own reply */
762       transmit_bobs_cryptodata_message (s);
763     }
764     return;
765   case GNUNET_SET_STATUS_HALF_DONE:
766     /* unexpected for intersection */
767     GNUNET_break (0);
768     return;
769   case GNUNET_SET_STATUS_FAILURE:
770     /* unhandled status code */
771     LOG (GNUNET_ERROR_TYPE_DEBUG,
772          "Set intersection failed!\n");
773     s->intersection_op = NULL;
774     if (NULL != s->intersection_set)
775     {
776       GNUNET_SET_destroy (s->intersection_set);
777       s->intersection_set = NULL;
778     }
779     s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
780     prepare_client_end_notification (s);
781     return;
782   default:
783     GNUNET_break (0);
784     return;
785   }
786 }
787
788
789 /**
790  * We've paired up a client session with an incoming CADET request.
791  * Initiate set intersection work.
792  *
793  * @param s client session to start intersection for
794  */
795 static void
796 start_intersection (struct BobServiceSession *s)
797 {
798   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799               "Got session with key %s and %u elements, starting intersection.\n",
800               GNUNET_h2s (&s->session_id),
801               (unsigned int) s->total);
802
803   s->intersection_op
804     = GNUNET_SET_prepare (&s->cadet->peer,
805                           &s->session_id,
806                           NULL,
807                           GNUNET_SET_RESULT_REMOVED,
808                           &cb_intersection_element_removed,
809                           s);
810   if (GNUNET_OK !=
811       GNUNET_SET_commit (s->intersection_op,
812                          s->intersection_set))
813   {
814     GNUNET_break (0);
815     s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE;
816     prepare_client_end_notification (s);
817     return;
818   }
819   GNUNET_SET_destroy (s->intersection_set);
820   s->intersection_set = NULL;
821 }
822
823
824 /**
825  * Handle a request from Alice to calculate a scalarproduct with us (Bob).
826  *
827  * @param cls closure (set from #GNUNET_CADET_connect)
828  * @param channel connection to the other end
829  * @param channel_ctx place to store the `struct CadetIncomingSession *`
830  * @param message the actual message
831  * @return #GNUNET_OK to keep the connection open,
832  *         #GNUNET_SYSERR to close it (signal serious error)
833  */
834 static int
835 handle_alices_computation_request (void *cls,
836                                    struct GNUNET_CADET_Channel *channel,
837                                    void **channel_ctx,
838                                    const struct GNUNET_MessageHeader *message)
839 {
840   struct CadetIncomingSession *in = *channel_ctx;
841   struct BobServiceSession *s;
842   const struct EccServiceRequestMessage *msg;
843
844   msg = (const struct EccServiceRequestMessage *) message;
845   if (GNUNET_YES == in->in_map)
846   {
847     GNUNET_break_op (0);
848     return GNUNET_SYSERR;
849   }
850   if (NULL != find_matching_cadet_session (&msg->session_id))
851   {
852     /* not unique, got one like this already */
853     GNUNET_break_op (0);
854     return GNUNET_SYSERR;
855   }
856   in->session_id = msg->session_id;
857   GNUNET_assert (GNUNET_YES ==
858                  GNUNET_CONTAINER_multihashmap_put (cadet_sessions,
859                                                     &in->session_id,
860                                                     in,
861                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
862   s = find_matching_client_session (&in->session_id);
863   if (NULL == s)
864   {
865     /* no client waiting for this request, wait for client */
866     return GNUNET_OK;
867   }
868   GNUNET_assert (NULL == s->cadet);
869   /* pair them up */
870   in->s = s;
871   s->cadet = in;
872   if (s->client_received_element_count == s->total)
873     start_intersection (s);
874   return GNUNET_OK;
875 }
876
877
878 /**
879  * Function called for inbound channels on Bob's end.  Does some
880  * preliminary initialization, more happens after we get Alice's first
881  * message.
882  *
883  * @param cls closure
884  * @param channel new handle to the channel
885  * @param initiator peer that started the channel
886  * @param port unused
887  * @param options unused
888  * @return session associated with the channel
889  */
890 static void *
891 cb_channel_incoming (void *cls,
892                      struct GNUNET_CADET_Channel *channel,
893                      const struct GNUNET_PeerIdentity *initiator,
894                      const struct GNUNET_HashCode *port,
895                      enum GNUNET_CADET_ChannelOption options)
896 {
897   struct CadetIncomingSession *in;
898
899   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
900               "New incoming channel from peer %s.\n",
901               GNUNET_i2s (initiator));
902   in = GNUNET_new (struct CadetIncomingSession);
903   in->peer = *initiator;
904   in->channel = channel;
905   in->cadet_mq = GNUNET_CADET_mq_create (in->channel);
906   return in;
907 }
908
909
910 /**
911  * We're receiving additional set data. Add it to our
912  * set and if we are done, initiate the transaction.
913  *
914  * @param cls closure
915  * @param client identification of the client
916  * @param message the actual message
917  */
918 static void
919 GSS_handle_bob_client_message_multipart (void *cls,
920                                          struct GNUNET_SERVER_Client *client,
921                                          const struct GNUNET_MessageHeader *message)
922 {
923   const struct ComputationBobCryptodataMultipartMessage * msg;
924   struct BobServiceSession *s;
925   uint32_t contained_count;
926   const struct GNUNET_SCALARPRODUCT_Element *elements;
927   uint32_t i;
928   uint16_t msize;
929   struct GNUNET_SET_Element set_elem;
930   struct GNUNET_SCALARPRODUCT_Element *elem;
931
932   s = GNUNET_SERVER_client_get_user_context (client,
933                                              struct BobServiceSession);
934   if (NULL == s)
935   {
936     /* session needs to already exist */
937     GNUNET_break (0);
938     GNUNET_SERVER_receive_done (client,
939                                 GNUNET_SYSERR);
940     return;
941   }
942   msize = ntohs (message->size);
943   if (msize < sizeof (struct ComputationBobCryptodataMultipartMessage))
944   {
945     GNUNET_break (0);
946     GNUNET_SERVER_receive_done (client,
947                                 GNUNET_SYSERR);
948     return;
949   }
950   msg = (const struct ComputationBobCryptodataMultipartMessage *) message;
951   contained_count = ntohl (msg->element_count_contained);
952
953   if ( (msize != (sizeof (struct ComputationBobCryptodataMultipartMessage) +
954                   contained_count * sizeof (struct GNUNET_SCALARPRODUCT_Element))) ||
955        (0 == contained_count) ||
956        (UINT16_MAX < contained_count) ||
957        (s->total == s->client_received_element_count) ||
958        (s->total < s->client_received_element_count + contained_count) )
959   {
960     GNUNET_break_op (0);
961     GNUNET_SERVER_receive_done (client,
962                                 GNUNET_SYSERR);
963     return;
964   }
965   elements = (const struct GNUNET_SCALARPRODUCT_Element *) &msg[1];
966   for (i = 0; i < contained_count; i++)
967   {
968     elem = GNUNET_new (struct GNUNET_SCALARPRODUCT_Element);
969     GNUNET_memcpy (elem,
970             &elements[i],
971             sizeof (struct GNUNET_SCALARPRODUCT_Element));
972     if (GNUNET_SYSERR ==
973         GNUNET_CONTAINER_multihashmap_put (s->intersected_elements,
974                                            &elem->key,
975                                            elem,
976                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
977     {
978       GNUNET_break (0);
979       GNUNET_free (elem);
980       continue;
981     }
982     set_elem.data = &elem->key;
983     set_elem.size = sizeof (elem->key);
984     set_elem.element_type = 0;
985     GNUNET_SET_add_element (s->intersection_set,
986                             &set_elem,
987                             NULL, NULL);
988   }
989   s->client_received_element_count += contained_count;
990   GNUNET_SERVER_receive_done (client,
991                               GNUNET_OK);
992   if (s->total != s->client_received_element_count)
993   {
994     /* more to come */
995     return;
996   }
997   if (NULL == s->cadet)
998   {
999     /* no Alice waiting for this request, wait for Alice */
1000     return;
1001   }
1002   start_intersection (s);
1003 }
1004
1005
1006 /**
1007  * Handler for Bob's a client request message.  Bob is in the response
1008  * role, keep the values + session and waiting for a matching session
1009  * or process a waiting request from Alice.
1010  *
1011  * @param cls closure
1012  * @param client identification of the client
1013  * @param message the actual message
1014  */
1015 static void
1016 GSS_handle_bob_client_message (void *cls,
1017                                struct GNUNET_SERVER_Client *client,
1018                                const struct GNUNET_MessageHeader *message)
1019 {
1020   const struct BobComputationMessage *msg;
1021   struct BobServiceSession *s;
1022   struct CadetIncomingSession *in;
1023   uint32_t contained_count;
1024   uint32_t total_count;
1025   const struct GNUNET_SCALARPRODUCT_Element *elements;
1026   uint32_t i;
1027   struct GNUNET_SET_Element set_elem;
1028   struct GNUNET_SCALARPRODUCT_Element *elem;
1029   uint16_t msize;
1030
1031   s = GNUNET_SERVER_client_get_user_context (client,
1032                                              struct BobServiceSession);
1033   if (NULL != s)
1034   {
1035     /* only one concurrent session per client connection allowed,
1036        simplifies logic a lot... */
1037     GNUNET_break (0);
1038     GNUNET_SERVER_receive_done (client,
1039                                 GNUNET_SYSERR);
1040     return;
1041   }
1042   msize = ntohs (message->size);
1043   if (msize < sizeof (struct BobComputationMessage))
1044   {
1045     GNUNET_break (0);
1046     GNUNET_SERVER_receive_done (client,
1047                                 GNUNET_SYSERR);
1048     return;
1049   }
1050   msg = (const struct BobComputationMessage *) message;
1051   total_count = ntohl (msg->element_count_total);
1052   contained_count = ntohl (msg->element_count_contained);
1053   if ( (0 == total_count) ||
1054        (0 == contained_count) ||
1055        (UINT16_MAX < contained_count) ||
1056        (msize != (sizeof (struct BobComputationMessage) +
1057                   contained_count * sizeof (struct GNUNET_SCALARPRODUCT_Element))) )
1058   {
1059     GNUNET_break_op (0);
1060     GNUNET_SERVER_receive_done (client,
1061                                 GNUNET_SYSERR);
1062     return;
1063   }
1064   if (NULL != find_matching_client_session (&msg->session_key))
1065   {
1066     GNUNET_break (0);
1067     GNUNET_SERVER_receive_done (client,
1068                                 GNUNET_SYSERR);
1069     return;
1070   }
1071
1072   s = GNUNET_new (struct BobServiceSession);
1073   s->status = GNUNET_SCALARPRODUCT_STATUS_ACTIVE;
1074   s->client = client;
1075   s->client_mq = GNUNET_MQ_queue_for_server_client (client);
1076   s->total = total_count;
1077   s->client_received_element_count = contained_count;
1078   s->session_id = msg->session_key;
1079   GNUNET_break (GNUNET_YES ==
1080                 GNUNET_CONTAINER_multihashmap_put (client_sessions,
1081                                                    &s->session_id,
1082                                                    s,
1083                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1084   elements = (const struct GNUNET_SCALARPRODUCT_Element *) &msg[1];
1085   s->intersected_elements = GNUNET_CONTAINER_multihashmap_create (s->total,
1086                                                                   GNUNET_YES);
1087   s->intersection_set = GNUNET_SET_create (cfg,
1088                                            GNUNET_SET_OPERATION_INTERSECTION);
1089   for (i = 0; i < contained_count; i++)
1090   {
1091     if (0 == GNUNET_ntohll (elements[i].value))
1092       continue;
1093     elem = GNUNET_new (struct GNUNET_SCALARPRODUCT_Element);
1094     GNUNET_memcpy (elem,
1095             &elements[i],
1096             sizeof (struct GNUNET_SCALARPRODUCT_Element));
1097     if (GNUNET_SYSERR ==
1098         GNUNET_CONTAINER_multihashmap_put (s->intersected_elements,
1099                                            &elem->key,
1100                                            elem,
1101                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1102     {
1103       GNUNET_break (0);
1104       GNUNET_free (elem);
1105       continue;
1106     }
1107     set_elem.data = &elem->key;
1108     set_elem.size = sizeof (elem->key);
1109     set_elem.element_type = 0;
1110     GNUNET_SET_add_element (s->intersection_set,
1111                             &set_elem,
1112                             NULL, NULL);
1113     s->used_element_count++;
1114   }
1115   GNUNET_SERVER_client_set_user_context (client,
1116                                          s);
1117   GNUNET_SERVER_receive_done (client,
1118                               GNUNET_YES);
1119   if (s->total != s->client_received_element_count)
1120   {
1121     /* multipart msg */
1122     return;
1123   }
1124   in = find_matching_cadet_session (&s->session_id);
1125   if (NULL == in)
1126   {
1127     /* nothing yet, wait for Alice */
1128     return;
1129   }
1130   GNUNET_assert (NULL == in->s);
1131   /* pair them up */
1132   in->s = s;
1133   s->cadet = in;
1134   start_intersection (s);
1135 }
1136
1137
1138 /**
1139  * Task run during shutdown.
1140  *
1141  * @param cls unused
1142  */
1143 static void
1144 shutdown_task (void *cls)
1145 {
1146   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147               "Shutting down, initiating cleanup.\n");
1148   // FIXME: we have to cut our connections to CADET first!
1149   if (NULL != my_cadet)
1150   {
1151     GNUNET_CADET_disconnect (my_cadet);
1152     my_cadet = NULL;
1153   }
1154   if (NULL != edc)
1155   {
1156     GNUNET_CRYPTO_ecc_dlog_release (edc);
1157     edc = NULL;
1158   }
1159   GNUNET_CONTAINER_multihashmap_destroy (client_sessions);
1160   client_sessions = NULL;
1161   GNUNET_CONTAINER_multihashmap_destroy (cadet_sessions);
1162   cadet_sessions = NULL;
1163 }
1164
1165
1166 /**
1167  * A client disconnected.
1168  *
1169  * Remove the associated session(s), release data structures
1170  * and cancel pending outgoing transmissions to the client.
1171  *
1172  * @param cls closure, NULL
1173  * @param client identification of the client
1174  */
1175 static void
1176 handle_client_disconnect (void *cls,
1177                           struct GNUNET_SERVER_Client *client)
1178 {
1179   struct BobServiceSession *s;
1180
1181   if (NULL == client)
1182     return;
1183   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1184               "Client disconnected from us.\n");
1185   s = GNUNET_SERVER_client_get_user_context (client,
1186                                              struct BobServiceSession);
1187   if (NULL == s)
1188     return;
1189   s->client = NULL;
1190   destroy_service_session (s);
1191 }
1192
1193
1194 /**
1195  * Initialization of the program and message handlers
1196  *
1197  * @param cls closure
1198  * @param server the initialized server
1199  * @param c configuration to use
1200  */
1201 static void
1202 run (void *cls,
1203      struct GNUNET_SERVER_Handle *server,
1204      const struct GNUNET_CONFIGURATION_Handle *c)
1205 {
1206   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1207     { &GSS_handle_bob_client_message, NULL,
1208       GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB,
1209       0},
1210     { &GSS_handle_bob_client_message_multipart, NULL,
1211       GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_BOB,
1212       0},
1213     { NULL, NULL, 0, 0}
1214   };
1215   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1216     { &handle_alices_computation_request,
1217       GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ECC_SESSION_INITIALIZATION,
1218       sizeof (struct EccServiceRequestMessage) },
1219     { &handle_alices_cryptodata_message,
1220       GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ECC_ALICE_CRYPTODATA,
1221       0},
1222     { NULL, 0, 0}
1223   };
1224
1225   cfg = c;
1226   /* We don't really do DLOG, so we can setup with very minimal resources */
1227   edc = GNUNET_CRYPTO_ecc_dlog_prepare (4 /* max value */,
1228                                         2 /* RAM */);
1229
1230   GNUNET_SERVER_add_handlers (server,
1231                               server_handlers);
1232   GNUNET_SERVER_disconnect_notify (server,
1233                                    &handle_client_disconnect,
1234                                    NULL);
1235   client_sessions = GNUNET_CONTAINER_multihashmap_create (128,
1236                                                           GNUNET_YES);
1237   cadet_sessions = GNUNET_CONTAINER_multihashmap_create (128,
1238                                                          GNUNET_YES);
1239   my_cadet = GNUNET_CADET_connect (cfg, NULL,
1240                                    &cb_channel_destruction,
1241                                    cadet_handlers);
1242   if (NULL == my_cadet)
1243   {
1244     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1245                 _("Connect to CADET failed\n"));
1246     GNUNET_SCHEDULER_shutdown ();
1247     return;
1248   }
1249   GNUNET_CADET_open_port (my_cadet,
1250                           GC_u2h (GNUNET_APPLICATION_TYPE_SCALARPRODUCT_ECC),
1251                           &cb_channel_incoming, NULL);
1252   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1253                                  NULL);
1254 }
1255
1256
1257 /**
1258  * The main function for the scalarproduct service.
1259  *
1260  * @param argc number of arguments from the command line
1261  * @param argv command line arguments
1262  * @return 0 ok, 1 on error
1263  */
1264 int
1265 main (int argc,
1266       char *const *argv)
1267 {
1268   return (GNUNET_OK ==
1269           GNUNET_SERVICE_run (argc, argv,
1270                               "scalarproduct-bob",
1271                               GNUNET_SERVICE_OPTION_NONE,
1272                               &run, NULL)) ? 0 : 1;
1273 }
1274
1275 /* end of gnunet-service-scalarproduct-ecc_bob.c */