-style, actually call GNUNET_CADET_receive_done to receive more messages
[oweals/gnunet.git] / src / scalarproduct / scalarproduct_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20 /**
21  * @file scalarproduct/scalarproduct_api.c
22  * @brief API for the scalarproduct
23  * @author Christian Fuchs
24  * @author Gaurav Kukreja
25  * @author Christian Grothoff
26  *
27  * TODO: use MQ
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_scalarproduct_service.h"
33 #include "gnunet_protocols.h"
34 #include "scalarproduct.h"
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
37
38
39 /**
40  * The abstraction function for our internal callback
41  *
42  * @param h computation handle
43  * @param msg response we got, NULL on errors
44  * @param status processing status code
45  */
46 typedef void
47 (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
48                                                 const struct ClientResponseMessage *msg,
49                                                 enum GNUNET_SCALARPRODUCT_ResponseStatus status);
50
51
52 /**
53  * A handle returned for each computation
54  */
55 struct GNUNET_SCALARPRODUCT_ComputationHandle
56 {
57   /**
58    * Our configuration.
59    */
60   const struct GNUNET_CONFIGURATION_Handle *cfg;
61
62   /**
63    * Current connection to the scalarproduct service.
64    */
65   struct GNUNET_CLIENT_Connection *client;
66
67   /**
68    * Current transmit handle.
69    */
70   struct GNUNET_CLIENT_TransmitHandle *th;
71
72   /**
73    * the client's elements which
74    */
75   struct GNUNET_SCALARPRODUCT_Element *elements;
76
77   /**
78    * Message to be sent to the scalarproduct service
79    */
80   struct GNUNET_MessageHeader *msg;
81
82   /**
83    * Function to call after transmission of the request (Bob).
84    */
85   GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
86
87   /**
88    * Function to call after transmission of the request (Alice).
89    */
90   GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
91
92   /**
93    * Closure for @e cont_status or @e cont_datum.
94    */
95   void *cont_cls;
96
97   /**
98    * API internal callback for results and failures to be forwarded to
99    * the client.
100    */
101   GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
102
103   /**
104    * The shared session key identifying this computation
105    */
106   struct GNUNET_HashCode key;
107
108   /**
109    * count of all @e elements we offer for computation
110    */
111   uint32_t element_count_total;
112
113   /**
114    * count of the transfered @e elements we offer for computation
115    */
116   uint32_t element_count_transfered;
117
118   /**
119    * Type to use for the multipart messages.
120    */
121   uint16_t mp_type;
122
123 };
124
125
126 /**
127  * Called when a response is received from the service. After basic
128  * check, the handler in `h->response_proc` is called. This functions
129  * handles the response to the client which used the API.
130  *
131  * @param cls Pointer to the Master Context
132  * @param msg Pointer to the data received in response
133  */
134 static void
135 receive_cb (void *cls,
136             const struct GNUNET_MessageHeader *msg)
137 {
138   struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
139   const struct ClientResponseMessage *message;
140   enum GNUNET_SCALARPRODUCT_ResponseStatus status;
141
142   if (NULL == msg)
143   {
144     LOG (GNUNET_ERROR_TYPE_INFO,
145          "Disconnected from SCALARPRODUCT service.\n");
146     h->response_proc (h,
147                       NULL,
148                       GNUNET_SCALARPRODUCT_STATUS_DISCONNECTED);
149     return;
150   }
151   if (ntohs (msg->size) < sizeof (struct ClientResponseMessage))
152   {
153     GNUNET_break (0);
154     h->response_proc (h,
155                       NULL,
156                       GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE);
157     return;
158   }
159   message = (const struct ClientResponseMessage *) msg;
160   if (ntohs (msg->size) !=
161       ntohl (message->product_length) + sizeof (struct ClientResponseMessage))
162   {
163     GNUNET_break (0);
164     h->response_proc (h,
165                       NULL,
166                       GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE);
167     return;
168   }
169   status = (enum GNUNET_SCALARPRODUCT_ResponseStatus) ntohl (message->status);
170   h->response_proc (h,
171                     message,
172                     status);
173 }
174
175
176 /**
177  * Transmits the request to the SCALARPRODUCT service
178  *
179  * @param cls Closure with the `struct GNUNET_SCALARPRODUCT_ComputationHandle`
180  * @param size Size of the buffer @a buf
181  * @param buf Pointer to the buffer
182  * @return Size of the message sent
183  */
184 static size_t
185 do_send_message (void *cls,
186                  size_t size,
187                  void *buf)
188 {
189   struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
190   struct ComputationBobCryptodataMultipartMessage *msg;
191   size_t ret;
192   uint32_t nsize;
193   uint32_t todo;
194
195   h->th = NULL;
196   if (NULL == buf)
197   {
198     LOG (GNUNET_ERROR_TYPE_DEBUG,
199          "Failed to transmit request to SCALARPRODUCT.\n");
200     /* notify caller about the error, done here */
201     h->response_proc (h, NULL,
202                       GNUNET_SCALARPRODUCT_STATUS_FAILURE);
203     return 0;
204   }
205   ret = ntohs (h->msg->size);
206   memcpy (buf, h->msg, ret);
207   GNUNET_free (h->msg);
208   h->msg = NULL;
209
210   /* done sending? */
211   if (h->element_count_total == h->element_count_transfered)
212   {
213     GNUNET_CLIENT_receive (h->client,
214                            &receive_cb, h,
215                            GNUNET_TIME_UNIT_FOREVER_REL);
216     return ret;
217   }
218
219   todo = h->element_count_total - h->element_count_transfered;
220   nsize = sizeof (struct ComputationBobCryptodataMultipartMessage)
221     + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
222   if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= size)
223   {
224     /* cannot do all of them, limit to what is possible in one message */
225     todo = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationBobCryptodataMultipartMessage))
226       / sizeof (struct GNUNET_SCALARPRODUCT_Element);
227     nsize = sizeof (struct ComputationBobCryptodataMultipartMessage)
228       + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
229   }
230
231   msg = GNUNET_malloc (nsize);
232   h->msg = &msg->header;
233   msg->header.size = htons (nsize);
234   msg->header.type = htons (h->mp_type);
235   msg->element_count_contained = htonl (todo);
236   memcpy (&msg[1],
237           &h->elements[h->element_count_transfered],
238           todo * sizeof (struct GNUNET_SCALARPRODUCT_Element));
239   h->element_count_transfered += todo;
240   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, nsize,
241                                                GNUNET_TIME_UNIT_FOREVER_REL,
242                                                GNUNET_NO,
243                                                &do_send_message, h);
244   GNUNET_assert (NULL != h->th);
245   return ret;
246 }
247
248
249 /**
250  * Handles the STATUS received from the service for a response, does
251  * not contain a payload.  Called when we participate as "Bob" via
252  * #GNUNET_SCALARPRODUCT_accept_computation().
253  *
254  * @param h our Handle
255  * @param msg the response received
256  * @param status the condition the request was terminated with (eg: disconnect)
257  */
258 static void
259 process_status_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
260                         const struct ClientResponseMessage *msg,
261                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
262 {
263   if (NULL != h->cont_status)
264     h->cont_status (h->cont_cls,
265                     status);
266   GNUNET_SCALARPRODUCT_cancel (h);
267 }
268
269
270 /**
271  * Used by Bob's client to cooperate with Alice,
272  *
273  * @param cfg the gnunet configuration handle
274  * @param key Session key unique to the requesting client
275  * @param elements Array of elements of the vector
276  * @param element_count Number of elements in the @a elements vector
277  * @param cont Callback function
278  * @param cont_cls Closure for @a cont
279  * @return a new handle for this computation
280  */
281 struct GNUNET_SCALARPRODUCT_ComputationHandle *
282 GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handle *cfg,
283                                          const struct GNUNET_HashCode *session_key,
284                                          const struct GNUNET_SCALARPRODUCT_Element *elements,
285                                          uint32_t element_count,
286                                          GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
287                                          void *cont_cls)
288 {
289   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
290   struct BobComputationMessage *msg;
291   uint32_t size;
292   uint16_t possible;
293
294   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
295   h->cont_status = cont;
296   h->cont_cls = cont_cls;
297   h->response_proc = &process_status_message;
298   h->cfg = cfg;
299   h->key = *session_key;
300   h->client = GNUNET_CLIENT_connect ("scalarproduct-bob", cfg);
301   h->element_count_total = element_count;
302   h->mp_type = GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_BOB;
303   if (NULL == h->client)
304   {
305     /* scalarproduct configuration error */
306     GNUNET_break (0);
307     GNUNET_free (h);
308     return NULL;
309   }
310   size = sizeof (struct BobComputationMessage)
311     + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
312   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size)
313   {
314     possible = element_count;
315     h->element_count_transfered = element_count;
316   }
317   else
318   {
319     /* create a multipart msg, first we calculate a new msg size for the head msg */
320     possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BobComputationMessage))
321       / sizeof (struct GNUNET_SCALARPRODUCT_Element);
322     h->element_count_transfered = possible;
323     size = sizeof (struct BobComputationMessage)
324       + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element);
325     h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
326     memcpy (h->elements,
327             elements,
328             sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count);
329   }
330
331   msg = GNUNET_malloc (size);
332   h->msg = &msg->header;
333   msg->header.size = htons (size);
334   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB);
335   msg->element_count_total = htonl (element_count);
336   msg->element_count_contained = htonl (possible);
337   msg->session_key = *session_key;
338   memcpy (&msg[1],
339           elements,
340           possible * sizeof (struct GNUNET_SCALARPRODUCT_Element));
341   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
342                                                GNUNET_TIME_UNIT_FOREVER_REL,
343                                                GNUNET_YES, /* retry is OK in the initial stage */
344                                                &do_send_message, h);
345   GNUNET_assert (NULL != h->th);
346   return h;
347 }
348
349
350 /**
351  * Handles the RESULT received from the service for a request, should
352  * contain a result MPI value.  Called when we participate as "Alice" via
353  * #GNUNET_SCALARPRODUCT_start_computation().
354  *
355  * @param h our Handle
356  * @param msg Pointer to the response received
357  * @param status the condition the request was terminated with (eg: disconnect)
358  */
359 static void
360 process_result_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
361                         const struct ClientResponseMessage *msg,
362                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
363 {
364   uint32_t product_len;
365   gcry_mpi_t result = NULL;
366   gcry_error_t rc;
367   gcry_mpi_t num;
368   size_t rsize;
369
370   if (GNUNET_SCALARPRODUCT_STATUS_SUCCESS == status)
371   {
372     result = gcry_mpi_new (0);
373
374     product_len = ntohl (msg->product_length);
375     if (0 < product_len)
376     {
377       rsize = 0;
378       if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD,
379                                     &msg[1],
380                                     product_len,
381                                     &rsize)))
382       {
383         LOG_GCRY (GNUNET_ERROR_TYPE_ERROR,
384                   "gcry_mpi_scan",
385                   rc);
386         gcry_mpi_release (result);
387         result = NULL;
388         status = GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE;
389       }
390       else
391       {
392         if (0 < ntohl (msg->range))
393           gcry_mpi_add (result, result, num);
394         else if (0 > ntohl (msg->range))
395           gcry_mpi_sub (result, result, num);
396         gcry_mpi_release (num);
397       }
398     }
399   }
400   if (NULL != h->cont_datum)
401     h->cont_datum (h->cont_cls,
402                    status,
403                    result);
404   if (NULL != result)
405     gcry_mpi_release (result);
406   GNUNET_SCALARPRODUCT_cancel (h);
407 }
408
409
410 /**
411  * Request by Alice's client for computing a scalar product
412  *
413  * @param cfg the gnunet configuration handle
414  * @param session_key Session key should be unique to the requesting client
415  * @param peer PeerID of the other peer
416  * @param elements Array of elements of the vector
417  * @param element_count Number of elements in the @a elements vector
418  * @param cont Callback function
419  * @param cont_cls Closure for @a cont
420  * @return a new handle for this computation
421  */
422 struct GNUNET_SCALARPRODUCT_ComputationHandle *
423 GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle *cfg,
424                                         const struct GNUNET_HashCode *session_key,
425                                         const struct GNUNET_PeerIdentity *peer,
426                                         const struct GNUNET_SCALARPRODUCT_Element *elements,
427                                         uint32_t element_count,
428                                         GNUNET_SCALARPRODUCT_DatumProcessor cont,
429                                         void *cont_cls)
430 {
431   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
432   struct AliceComputationMessage *msg;
433   uint32_t size;
434   uint32_t possible;
435
436   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
437   h->client = GNUNET_CLIENT_connect ("scalarproduct-alice", cfg);
438   if (NULL == h->client)
439   {
440     /* missconfigured scalarproduct service */
441     GNUNET_break (0);
442     GNUNET_free (h);
443     return NULL;
444   }
445   h->element_count_total = element_count;
446   h->cont_datum = cont;
447   h->cont_cls = cont_cls;
448   h->response_proc = &process_result_message;
449   h->cfg = cfg;
450   h->key = *session_key;
451   h->mp_type = GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_ALICE;
452   size = sizeof (struct AliceComputationMessage)
453     + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
454   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size)
455   {
456     possible = element_count;
457     h->element_count_transfered = element_count;
458   }
459   else
460   {
461     /* create a multipart msg, first we calculate a new msg size for the head msg */
462     possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct AliceComputationMessage))
463       / sizeof (struct GNUNET_SCALARPRODUCT_Element);
464     h->element_count_transfered = possible;
465     size = sizeof (struct AliceComputationMessage)
466       + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element);
467     h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
468     memcpy (h->elements,
469             elements,
470             sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count);
471   }
472
473   msg = GNUNET_malloc (size);
474   h->msg = &msg->header;
475   msg->header.size = htons (size);
476   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
477   msg->element_count_total = htonl (element_count);
478   msg->element_count_contained = htonl (possible);
479   msg->reserved = htonl (0);
480   msg->peer = *peer;
481   msg->session_key = *session_key;
482   memcpy (&msg[1],
483           elements,
484           sizeof (struct GNUNET_SCALARPRODUCT_Element) * possible);
485   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
486                                                GNUNET_TIME_UNIT_FOREVER_REL,
487                                                GNUNET_YES, /* retry is OK in the initial stage */
488                                                &do_send_message, h);
489   GNUNET_assert (NULL != h->th);
490   return h;
491 }
492
493
494 /**
495  * Cancel an ongoing computation or revoke our collaboration offer.
496  * Closes the connection to the service
497  *
498  * @param h computation handle to terminate
499  */
500 void
501 GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle *h)
502 {
503   if (NULL != h->th)
504   {
505     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
506     h->th = NULL;
507   }
508   GNUNET_free_non_null (h->elements);
509   GNUNET_free_non_null (h->msg);
510   if (NULL != h->client)
511   {
512     GNUNET_CLIENT_disconnect (h->client);
513     h->client = NULL;
514   }
515   GNUNET_free (h);
516 }
517
518
519 /* end of scalarproduct_api.c */