add timeouts to test
[oweals/gnunet.git] / src / scalarproduct / scalarproduct_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013, 2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19  */
20 /**
21  * @file scalarproduct/scalarproduct_api.c
22  * @brief API for the scalarproduct
23  * @author Christian Fuchs
24  * @author Gaurav Kukreja
25  * @author Christian Grothoff
26  *
27  * TODO: use MQ
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_scalarproduct_service.h"
33 #include "gnunet_protocols.h"
34 #include "scalarproduct.h"
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
37
38
39 /**
40  * The abstraction function for our internal callback
41  *
42  * @param h computation handle
43  * @param msg response we got, NULL on errors
44  * @param status processing status code
45  */
46 typedef void
47 (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
48                                                 const struct ClientResponseMessage *msg,
49                                                 enum GNUNET_SCALARPRODUCT_ResponseStatus status);
50
51
52 /**
53  * A handle returned for each computation
54  */
55 struct GNUNET_SCALARPRODUCT_ComputationHandle
56 {
57   /**
58    * Our configuration.
59    */
60   const struct GNUNET_CONFIGURATION_Handle *cfg;
61
62   /**
63    * Current connection to the scalarproduct service.
64    */
65   struct GNUNET_CLIENT_Connection *client;
66
67   /**
68    * Current transmit handle.
69    */
70   struct GNUNET_CLIENT_TransmitHandle *th;
71
72   /**
73    * the client's elements which
74    */
75   struct GNUNET_SCALARPRODUCT_Element *elements;
76
77   /**
78    * Message to be sent to the scalarproduct service
79    */
80   struct GNUNET_MessageHeader *msg;
81
82   /**
83    * Function to call after transmission of the request (Bob).
84    */
85   GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
86
87   /**
88    * Function to call after transmission of the request (Alice).
89    */
90   GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
91
92   /**
93    * Closure for @e cont_status or @e cont_datum.
94    */
95   void *cont_cls;
96
97   /**
98    * API internal callback for results and failures to be forwarded to
99    * the client.
100    */
101   GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
102
103   /**
104    * The shared session key identifying this computation
105    */
106   struct GNUNET_HashCode key;
107
108   /**
109    * count of all @e elements we offer for computation
110    */
111   uint32_t element_count_total;
112
113   /**
114    * count of the transfered @e elements we offer for computation
115    */
116   uint32_t element_count_transfered;
117
118   /**
119    * Type to use for the multipart messages.
120    */
121   uint16_t mp_type;
122
123 };
124
125
126 /**
127  * Called when a response is received from the service. After basic
128  * check, the handler in `h->response_proc` is called. This functions
129  * handles the response to the client which used the API.
130  *
131  * @param cls Pointer to the Master Context
132  * @param msg Pointer to the data received in response
133  */
134 static void
135 receive_cb (void *cls,
136             const struct GNUNET_MessageHeader *msg)
137 {
138   struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
139   const struct ClientResponseMessage *message;
140   enum GNUNET_SCALARPRODUCT_ResponseStatus status;
141
142   if (NULL == msg)
143   {
144     LOG (GNUNET_ERROR_TYPE_INFO,
145          "Disconnected from SCALARPRODUCT service.\n");
146     h->response_proc (h,
147                       NULL,
148                       GNUNET_SCALARPRODUCT_STATUS_DISCONNECTED);
149     return;
150   }
151   if (ntohs (msg->size) < sizeof (struct ClientResponseMessage))
152   {
153     GNUNET_break (0);
154     h->response_proc (h,
155                       NULL,
156                       GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE);
157     return;
158   }
159   message = (const struct ClientResponseMessage *) msg;
160   if (ntohs (msg->size) !=
161       ntohl (message->product_length) + sizeof (struct ClientResponseMessage))
162   {
163     GNUNET_break (0);
164     h->response_proc (h,
165                       NULL,
166                       GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE);
167     return;
168   }
169   status = (enum GNUNET_SCALARPRODUCT_ResponseStatus) ntohl (message->status);
170   h->response_proc (h,
171                     message,
172                     status);
173 }
174
175
176 /**
177  * Transmits the request to the SCALARPRODUCT service
178  *
179  * @param cls Closure with the `struct GNUNET_SCALARPRODUCT_ComputationHandle`
180  * @param size Size of the buffer @a buf
181  * @param buf Pointer to the buffer
182  * @return Size of the message sent
183  */
184 static size_t
185 do_send_message (void *cls,
186                  size_t size,
187                  void *buf)
188 {
189   struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
190   struct ComputationBobCryptodataMultipartMessage *msg;
191   size_t ret;
192   uint32_t nsize;
193   uint32_t todo;
194
195   h->th = NULL;
196   if (NULL == buf)
197   {
198     LOG (GNUNET_ERROR_TYPE_DEBUG,
199          "Failed to transmit request to SCALARPRODUCT.\n");
200     /* notify caller about the error, done here */
201     h->response_proc (h, NULL,
202                       GNUNET_SCALARPRODUCT_STATUS_FAILURE);
203     return 0;
204   }
205   ret = ntohs (h->msg->size);
206   memcpy (buf, h->msg, ret);
207   GNUNET_free (h->msg);
208   h->msg = NULL;
209
210   /* done sending? */
211   if (h->element_count_total == h->element_count_transfered)
212   {
213     GNUNET_CLIENT_receive (h->client,
214                            &receive_cb, h,
215                            GNUNET_TIME_UNIT_FOREVER_REL);
216     return ret;
217   }
218
219   todo = h->element_count_total - h->element_count_transfered;
220   nsize = sizeof (struct ComputationBobCryptodataMultipartMessage)
221     + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
222   if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= size)
223   {
224     /* cannot do all of them, limit to what is possible in one message */
225     todo = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationBobCryptodataMultipartMessage))
226       / sizeof (struct GNUNET_SCALARPRODUCT_Element);
227     nsize = sizeof (struct ComputationBobCryptodataMultipartMessage)
228       + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
229   }
230
231   msg = GNUNET_malloc (nsize);
232   h->msg = &msg->header;
233   msg->header.size = htons (nsize);
234   msg->header.type = htons (h->mp_type);
235   msg->element_count_contained = htonl (todo);
236   memcpy (&msg[1],
237           &h->elements[h->element_count_transfered],
238           todo * sizeof (struct GNUNET_SCALARPRODUCT_Element));
239   h->element_count_transfered += todo;
240   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, nsize,
241                                                GNUNET_TIME_UNIT_FOREVER_REL,
242                                                GNUNET_NO,
243                                                &do_send_message, h);
244   GNUNET_assert (NULL != h->th);
245   return ret;
246 }
247
248
249 /**
250  * Handles the STATUS received from the service for a response, does
251  * not contain a payload.  Called when we participate as "Bob" via
252  * #GNUNET_SCALARPRODUCT_accept_computation().
253  *
254  * @param h our Handle
255  * @param msg the response received
256  * @param status the condition the request was terminated with (eg: disconnect)
257  */
258 static void
259 process_status_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
260                         const struct ClientResponseMessage *msg,
261                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
262 {
263   if (NULL != h->cont_status)
264     h->cont_status (h->cont_cls,
265                     status);
266   GNUNET_SCALARPRODUCT_cancel (h);
267 }
268
269
270 /**
271  * Check if the keys for all given elements are unique.
272  *
273  * @param elements elements to check
274  * @param element_count size of the @a elements array
275  * @return #GNUNET_OK if all keys are unique
276  */
277 static int
278 check_unique (const struct GNUNET_SCALARPRODUCT_Element *elements,
279               uint32_t element_count)
280 {
281   struct GNUNET_CONTAINER_MultiHashMap *map;
282   uint32_t i;
283   int ok;
284
285   ok = GNUNET_OK;
286   map = GNUNET_CONTAINER_multihashmap_create (2 * element_count,
287                                               GNUNET_YES);
288   for (i=0;i<element_count;i++)
289     if (GNUNET_OK !=
290         GNUNET_CONTAINER_multihashmap_put (map,
291                                            &elements[i].key,
292                                            map,
293                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
294     {
295       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
296                   _("Keys given to SCALARPRODUCT not unique!\n"));
297       ok = GNUNET_SYSERR;
298     }
299   GNUNET_CONTAINER_multihashmap_destroy (map);
300   return ok;
301 }
302
303
304 /**
305  * Used by Bob's client to cooperate with Alice,
306  *
307  * @param cfg the gnunet configuration handle
308  * @param key Session key unique to the requesting client
309  * @param elements Array of elements of the vector
310  * @param element_count Number of elements in the @a elements vector
311  * @param cont Callback function
312  * @param cont_cls Closure for @a cont
313  * @return a new handle for this computation
314  */
315 struct GNUNET_SCALARPRODUCT_ComputationHandle *
316 GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handle *cfg,
317                                          const struct GNUNET_HashCode *session_key,
318                                          const struct GNUNET_SCALARPRODUCT_Element *elements,
319                                          uint32_t element_count,
320                                          GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
321                                          void *cont_cls)
322 {
323   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
324   struct BobComputationMessage *msg;
325   uint32_t size;
326   uint16_t possible;
327
328   if (GNUNET_SYSERR == check_unique (elements, element_count))
329     return NULL;
330   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
331   h->cont_status = cont;
332   h->cont_cls = cont_cls;
333   h->response_proc = &process_status_message;
334   h->cfg = cfg;
335   h->key = *session_key;
336   h->client = GNUNET_CLIENT_connect ("scalarproduct-bob", cfg);
337   h->element_count_total = element_count;
338   h->mp_type = GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_BOB;
339   if (NULL == h->client)
340   {
341     /* scalarproduct configuration error */
342     GNUNET_break (0);
343     GNUNET_free (h);
344     return NULL;
345   }
346   size = sizeof (struct BobComputationMessage)
347     + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
348   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size)
349   {
350     possible = element_count;
351     h->element_count_transfered = element_count;
352   }
353   else
354   {
355     /* create a multipart msg, first we calculate a new msg size for the head msg */
356     possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BobComputationMessage))
357       / sizeof (struct GNUNET_SCALARPRODUCT_Element);
358     h->element_count_transfered = possible;
359     size = sizeof (struct BobComputationMessage)
360       + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element);
361     h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
362     memcpy (h->elements,
363             elements,
364             sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count);
365   }
366
367   msg = GNUNET_malloc (size);
368   h->msg = &msg->header;
369   msg->header.size = htons (size);
370   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB);
371   msg->element_count_total = htonl (element_count);
372   msg->element_count_contained = htonl (possible);
373   msg->session_key = *session_key;
374   memcpy (&msg[1],
375           elements,
376           possible * sizeof (struct GNUNET_SCALARPRODUCT_Element));
377   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
378                                                GNUNET_TIME_UNIT_FOREVER_REL,
379                                                GNUNET_YES, /* retry is OK in the initial stage */
380                                                &do_send_message, h);
381   GNUNET_assert (NULL != h->th);
382   return h;
383 }
384
385
386 /**
387  * Handles the RESULT received from the service for a request, should
388  * contain a result MPI value.  Called when we participate as "Alice" via
389  * #GNUNET_SCALARPRODUCT_start_computation().
390  *
391  * @param h our Handle
392  * @param msg Pointer to the response received
393  * @param status the condition the request was terminated with (eg: disconnect)
394  */
395 static void
396 process_result_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
397                         const struct ClientResponseMessage *msg,
398                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
399 {
400   uint32_t product_len;
401   gcry_mpi_t result = NULL;
402   gcry_error_t rc;
403   gcry_mpi_t num;
404   size_t rsize;
405
406   if (GNUNET_SCALARPRODUCT_STATUS_SUCCESS == status)
407   {
408     result = gcry_mpi_new (0);
409
410     product_len = ntohl (msg->product_length);
411     if (0 < product_len)
412     {
413       rsize = 0;
414       if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD,
415                                     &msg[1],
416                                     product_len,
417                                     &rsize)))
418       {
419         LOG_GCRY (GNUNET_ERROR_TYPE_ERROR,
420                   "gcry_mpi_scan",
421                   rc);
422         gcry_mpi_release (result);
423         result = NULL;
424         status = GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE;
425       }
426       else
427       {
428         if (0 < (int32_t) ntohl (msg->range))
429           gcry_mpi_add (result, result, num);
430         else
431           gcry_mpi_sub (result, result, num);
432         gcry_mpi_release (num);
433       }
434     }
435   }
436   if (NULL != h->cont_datum)
437     h->cont_datum (h->cont_cls,
438                    status,
439                    result);
440   if (NULL != result)
441     gcry_mpi_release (result);
442   GNUNET_SCALARPRODUCT_cancel (h);
443 }
444
445
446 /**
447  * Request by Alice's client for computing a scalar product
448  *
449  * @param cfg the gnunet configuration handle
450  * @param session_key Session key should be unique to the requesting client
451  * @param peer PeerID of the other peer
452  * @param elements Array of elements of the vector
453  * @param element_count Number of elements in the @a elements vector
454  * @param cont Callback function
455  * @param cont_cls Closure for @a cont
456  * @return a new handle for this computation
457  */
458 struct GNUNET_SCALARPRODUCT_ComputationHandle *
459 GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle *cfg,
460                                         const struct GNUNET_HashCode *session_key,
461                                         const struct GNUNET_PeerIdentity *peer,
462                                         const struct GNUNET_SCALARPRODUCT_Element *elements,
463                                         uint32_t element_count,
464                                         GNUNET_SCALARPRODUCT_DatumProcessor cont,
465                                         void *cont_cls)
466 {
467   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
468   struct AliceComputationMessage *msg;
469   uint32_t size;
470   uint32_t possible;
471
472   if (GNUNET_SYSERR == check_unique (elements, element_count))
473     return NULL;
474   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
475   h->client = GNUNET_CLIENT_connect ("scalarproduct-alice", cfg);
476   if (NULL == h->client)
477   {
478     /* missconfigured scalarproduct service */
479     GNUNET_break (0);
480     GNUNET_free (h);
481     return NULL;
482   }
483   h->element_count_total = element_count;
484   h->cont_datum = cont;
485   h->cont_cls = cont_cls;
486   h->response_proc = &process_result_message;
487   h->cfg = cfg;
488   h->key = *session_key;
489   h->mp_type = GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_ALICE;
490   size = sizeof (struct AliceComputationMessage)
491     + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
492   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size)
493   {
494     possible = element_count;
495     h->element_count_transfered = element_count;
496   }
497   else
498   {
499     /* create a multipart msg, first we calculate a new msg size for the head msg */
500     possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct AliceComputationMessage))
501       / sizeof (struct GNUNET_SCALARPRODUCT_Element);
502     h->element_count_transfered = possible;
503     size = sizeof (struct AliceComputationMessage)
504       + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element);
505     h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
506     memcpy (h->elements,
507             elements,
508             sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count);
509   }
510
511   msg = GNUNET_malloc (size);
512   h->msg = &msg->header;
513   msg->header.size = htons (size);
514   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
515   msg->element_count_total = htonl (element_count);
516   msg->element_count_contained = htonl (possible);
517   msg->reserved = htonl (0);
518   msg->peer = *peer;
519   msg->session_key = *session_key;
520   memcpy (&msg[1],
521           elements,
522           sizeof (struct GNUNET_SCALARPRODUCT_Element) * possible);
523   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
524                                                GNUNET_TIME_UNIT_FOREVER_REL,
525                                                GNUNET_YES, /* retry is OK in the initial stage */
526                                                &do_send_message, h);
527   GNUNET_assert (NULL != h->th);
528   return h;
529 }
530
531
532 /**
533  * Cancel an ongoing computation or revoke our collaboration offer.
534  * Closes the connection to the service
535  *
536  * @param h computation handle to terminate
537  */
538 void
539 GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle *h)
540 {
541   if (NULL != h->th)
542   {
543     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
544     h->th = NULL;
545   }
546   GNUNET_free_non_null (h->elements);
547   GNUNET_free_non_null (h->msg);
548   if (NULL != h->client)
549   {
550     GNUNET_CLIENT_disconnect (h->client);
551     h->client = NULL;
552   }
553   GNUNET_free (h);
554 }
555
556
557 /* end of scalarproduct_api.c */