massive rework of scalarproduct service, splitting into Alice and Bob
[oweals/gnunet.git] / src / scalarproduct / scalarproduct_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20 /**
21  * @file scalarproduct/scalarproduct_api.c
22  * @brief API for the scalarproduct
23  * @author Christian Fuchs
24  * @author Gaurav Kukreja
25  * @author Christian Grothoff
26  *
27  * TODO: use MQ
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_scalarproduct_service.h"
33 #include "gnunet_protocols.h"
34 #include "scalarproduct.h"
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
37
38
39 /**
40  * The abstraction function for our internal callback
41  *
42  * @param h computation handle
43  * @param msg response we got, NULL on errors
44  * @param status processing status code
45  */
46 typedef void
47 (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
48                                                 const struct ClientResponseMessage *msg,
49                                                 enum GNUNET_SCALARPRODUCT_ResponseStatus status);
50
51
52 /**
53  * A handle returned for each computation
54  */
55 struct GNUNET_SCALARPRODUCT_ComputationHandle
56 {
57   /**
58    * Our configuration.
59    */
60   const struct GNUNET_CONFIGURATION_Handle *cfg;
61
62   /**
63    * Current connection to the scalarproduct service.
64    */
65   struct GNUNET_CLIENT_Connection *client;
66
67   /**
68    * Current transmit handle.
69    */
70   struct GNUNET_CLIENT_TransmitHandle *th;
71
72   /**
73    * the client's elements which
74    */
75   struct GNUNET_SCALARPRODUCT_Element *elements;
76
77   /**
78    * Message to be sent to the scalarproduct service
79    */
80   struct GNUNET_MessageHeader *msg;
81
82   /**
83    * Function to call after transmission of the request (Bob).
84    */
85   GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
86
87   /**
88    * Function to call after transmission of the request (Alice).
89    */
90   GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
91
92   /**
93    * Closure for @e cont_status or @e cont_datum.
94    */
95   void *cont_cls;
96
97   /**
98    * API internal callback for results and failures to be forwarded to
99    * the client.
100    */
101   GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
102
103   /**
104    * The shared session key identifying this computation
105    */
106   struct GNUNET_HashCode key;
107
108   /**
109    * count of all @e elements we offer for computation
110    */
111   uint32_t element_count_total;
112
113   /**
114    * count of the transfered @e elements we offer for computation
115    */
116   uint32_t element_count_transfered;
117
118   /**
119    * Type to use for the multipart messages.
120    */
121   uint16_t mp_type;
122
123 };
124
125
126 /**
127  * Handles the STATUS received from the service for a response, does
128  * not contain a payload.  Called when we participate as "Bob" via
129  * #GNUNET_SCALARPRODUCT_accept_computation().
130  *
131  * @param h our Handle
132  * @param msg the response received
133  * @param status the condition the request was terminated with (eg: disconnect)
134  */
135 static void
136 process_status_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
137                         const struct ClientResponseMessage *msg,
138                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
139 {
140   if (NULL != h->cont_status)
141     h->cont_status (h->cont_cls,
142                     status);
143   GNUNET_SCALARPRODUCT_cancel (h);
144 }
145
146
147 /**
148  * Handles the RESULT received from the service for a request, should
149  * contain a result MPI value.  Called when we participate as "Alice" via
150  * #GNUNET_SCALARPRODUCT_start_computation().
151  *
152  * @param h our Handle
153  * @param msg Pointer to the response received
154  * @param status the condition the request was terminated with (eg: disconnect)
155  */
156 static void
157 process_result_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
158                         const struct ClientResponseMessage *msg,
159                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
160 {
161   size_t product_len;
162   gcry_mpi_t result = NULL;
163   gcry_error_t rc;
164   gcry_mpi_t num;
165   size_t rsize;
166
167   if ( (GNUNET_SCALARPRODUCT_Status_Success == status) &&
168        ( (NULL == msg) ||
169          ( (ntohs (msg->header.size) - sizeof (struct ClientResponseMessage)
170             != (product_len = ntohl (msg->product_length))) ) ) )
171   {
172     GNUNET_break (0);
173     status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
174   }
175   if (GNUNET_SCALARPRODUCT_Status_Success == status)
176   {
177     result = gcry_mpi_new (0);
178
179     if (0 < product_len)
180     {
181       rsize = 0;
182       if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD,
183                                     &msg[1],
184                                     product_len,
185                                     &rsize)))
186       {
187         LOG_GCRY (GNUNET_ERROR_TYPE_ERROR,
188                   "gcry_mpi_scan",
189                   rc);
190         gcry_mpi_release (result);
191         result = NULL;
192         status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
193       }
194       else
195       {
196         if (0 < ntohl (msg->range))
197           gcry_mpi_add (result, result, num);
198         else if (0 > ntohl (msg->range))
199           gcry_mpi_sub (result, result, num);
200         gcry_mpi_release (num);
201       }
202     }
203   }
204   h->cont_datum (h->cont_cls,
205                  status,
206                  result);
207   if (NULL != result)
208     gcry_mpi_release (result);
209   GNUNET_SCALARPRODUCT_cancel (h);
210 }
211
212
213 /**
214  * Called when a response is received from the service. After basic check, the
215  * handler in qe->response_proc is called. This functions handles the response
216  * to the client which used the API.
217  *
218  * @param cls Pointer to the Master Context
219  * @param msg Pointer to the data received in response
220  */
221 static void
222 receive_cb (void *cls,
223             const struct GNUNET_MessageHeader *msg)
224 {
225   struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
226   const struct ClientResponseMessage *message;
227
228   if (NULL == msg)
229   {
230     LOG (GNUNET_ERROR_TYPE_INFO,
231          "Disconnected from SCALARPRODUCT service.\n");
232     h->response_proc (h,
233                       NULL,
234                       GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
235     return;
236   }
237   if (ntohs (msg->size) != sizeof (struct ClientResponseMessage))
238   {
239     GNUNET_break (0);
240     h->response_proc (h,
241                       NULL,
242                       GNUNET_SCALARPRODUCT_Status_InvalidResponse);
243     return;
244   }
245   message = (const struct ClientResponseMessage *) msg;
246   if (GNUNET_SYSERR == ntohl (message->status))
247   {
248     h->response_proc (h,
249                       NULL,
250                       GNUNET_SCALARPRODUCT_Status_Failure);
251     return;
252   }
253   h->response_proc (h,
254                     message,
255                     GNUNET_SCALARPRODUCT_Status_Success);
256 }
257
258
259 /**
260  * Transmits the request to the SCALARPRODUCT service
261  *
262  * @param cls Closure with the `struct GNUNET_SCALARPRODUCT_ComputationHandle`
263  * @param size Size of the buffer @a buf
264  * @param buf Pointer to the buffer
265  * @return Size of the message sent
266  */
267 static size_t
268 do_send_message (void *cls,
269                  size_t size,
270                  void *buf)
271 {
272   struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
273   struct ComputationMultipartMessage *msg;
274   size_t ret;
275   uint32_t nsize;
276   uint32_t todo;
277
278   h->th = NULL;
279   if (NULL == buf)
280   {
281     LOG (GNUNET_ERROR_TYPE_DEBUG,
282          "Failed to transmit request to SCALARPRODUCT.\n");
283     /* notify caller about the error, done here */
284     h->response_proc (h, NULL,
285                       GNUNET_SCALARPRODUCT_Status_Failure);
286     return 0;
287   }
288   ret = ntohs (h->msg->size);
289   memcpy (buf, h->msg, ret);
290   GNUNET_free (h->msg);
291   h->msg = NULL;
292
293   /* done sending? */
294   if (h->element_count_total == h->element_count_transfered)
295   {
296     GNUNET_CLIENT_receive (h->client,
297                            &receive_cb, h,
298                            GNUNET_TIME_UNIT_FOREVER_REL);
299     return ret;
300   }
301
302   todo = h->element_count_total - h->element_count_transfered;
303   nsize = sizeof (struct ComputationMultipartMessage)
304     + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
305   if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= size)
306   {
307     /* cannot do all of them, limit to what is possible in one message */
308     todo = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationMultipartMessage))
309       / sizeof (struct GNUNET_SCALARPRODUCT_Element);
310     nsize = sizeof (struct ComputationMultipartMessage)
311       + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
312   }
313
314   msg = GNUNET_malloc (nsize);
315   h->msg = &msg->header;
316   msg->header.size = htons (nsize);
317   msg->header.type = htons (h->mp_type);
318   msg->element_count_contained = htonl (todo);
319   memcpy (&msg[1],
320           &h->elements[h->element_count_transfered],
321           todo * sizeof (struct GNUNET_SCALARPRODUCT_Element));
322   h->element_count_transfered += todo;
323   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, nsize,
324                                                GNUNET_TIME_UNIT_FOREVER_REL,
325                                                GNUNET_NO,
326                                                &do_send_message, h);
327   GNUNET_assert (NULL != h->th);
328   return ret;
329 }
330
331
332 /**
333  * Used by Bob's client to cooperate with Alice,
334  *
335  * @param cfg the gnunet configuration handle
336  * @param key Session key unique to the requesting client
337  * @param elements Array of elements of the vector
338  * @param element_count Number of elements in the @a elements vector
339  * @param cont Callback function
340  * @param cont_cls Closure for @a cont
341  * @return a new handle for this computation
342  */
343 struct GNUNET_SCALARPRODUCT_ComputationHandle *
344 GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handle *cfg,
345                                          const struct GNUNET_HashCode *session_key,
346                                          const struct GNUNET_SCALARPRODUCT_Element *elements,
347                                          uint32_t element_count,
348                                          GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
349                                          void *cont_cls)
350 {
351   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
352   struct BobComputationMessage *msg;
353   uint32_t size;
354   uint16_t possible;
355
356   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
357   h->cont_status = cont;
358   h->cont_cls = cont_cls;
359   h->response_proc = &process_status_message;
360   h->cfg = cfg;
361   h->key = *session_key;
362   h->client = GNUNET_CLIENT_connect ("scalarproduct-bob", cfg);
363   h->element_count_total = element_count;
364   h->mp_type = GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_BOB;
365   if (NULL == h->client)
366   {
367     /* scalarproduct configuration error */
368     GNUNET_break (0);
369     GNUNET_free (h);
370     return NULL;
371   }
372   size = sizeof (struct BobComputationMessage)
373     + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
374   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size)
375   {
376     possible = element_count;
377     h->element_count_transfered = element_count;
378   }
379   else
380   {
381     /* create a multipart msg, first we calculate a new msg size for the head msg */
382     possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BobComputationMessage))
383       / sizeof (struct GNUNET_SCALARPRODUCT_Element);
384     h->element_count_transfered = possible;
385     size = sizeof (struct BobComputationMessage)
386       + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element);
387     h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
388     memcpy (h->elements,
389             elements,
390             sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count);
391   }
392
393   msg = GNUNET_malloc (size);
394   h->msg = &msg->header;
395   msg->header.size = htons (size);
396   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB);
397   msg->element_count_total = htonl (element_count);
398   msg->element_count_contained = htonl (possible);
399   msg->session_key = *session_key;
400   memcpy (&msg[1],
401           elements,
402           possible * sizeof (struct GNUNET_SCALARPRODUCT_Element));
403   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
404                                                GNUNET_TIME_UNIT_FOREVER_REL,
405                                                GNUNET_YES, /* retry is OK in the initial stage */
406                                                &do_send_message, h);
407   GNUNET_assert (NULL != h->th);
408   return h;
409 }
410
411
412 /**
413  * Request by Alice's client for computing a scalar product
414  *
415  * @param cfg the gnunet configuration handle
416  * @param session_key Session key should be unique to the requesting client
417  * @param peer PeerID of the other peer
418  * @param elements Array of elements of the vector
419  * @param element_count Number of elements in the @a elements vector
420  * @param cont Callback function
421  * @param cont_cls Closure for @a cont
422  * @return a new handle for this computation
423  */
424 struct GNUNET_SCALARPRODUCT_ComputationHandle *
425 GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle *cfg,
426                                         const struct GNUNET_HashCode *session_key,
427                                         const struct GNUNET_PeerIdentity *peer,
428                                         const struct GNUNET_SCALARPRODUCT_Element *elements,
429                                         uint32_t element_count,
430                                         GNUNET_SCALARPRODUCT_DatumProcessor cont,
431                                         void *cont_cls)
432 {
433   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
434   struct AliceComputationMessage *msg;
435   uint32_t size;
436   uint32_t possible;
437
438   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
439   h->client = GNUNET_CLIENT_connect ("scalarproduct-alice", cfg);
440   if (NULL == h->client)
441   {
442     /* missconfigured scalarproduct service */
443     GNUNET_break (0);
444     GNUNET_free (h);
445     return NULL;
446   }
447   h->element_count_total = element_count;
448   h->cont_datum = cont;
449   h->cont_cls = cont_cls;
450   h->response_proc = &process_result_message;
451   h->cfg = cfg;
452   h->key = *session_key;
453   h->mp_type = GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_ALICE;
454   size = sizeof (struct AliceComputationMessage)
455     + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
456   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size)
457   {
458     possible = element_count;
459     h->element_count_transfered = element_count;
460   }
461   else
462   {
463     /* create a multipart msg, first we calculate a new msg size for the head msg */
464     possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct AliceComputationMessage))
465       / sizeof (struct GNUNET_SCALARPRODUCT_Element);
466     h->element_count_transfered = possible;
467     size = sizeof (struct AliceComputationMessage)
468       + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element);
469     h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
470     memcpy (h->elements,
471             elements,
472             sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count);
473   }
474
475   msg = GNUNET_malloc (size);
476   h->msg = &msg->header;
477   msg->header.size = htons (size);
478   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
479   msg->element_count_total = htonl (element_count);
480   msg->element_count_contained = htonl (possible);
481   msg->reserved = htonl (0);
482   msg->peer = *peer;
483   msg->session_key = *session_key;
484   memcpy (&msg[1],
485           elements,
486           sizeof (struct GNUNET_SCALARPRODUCT_Element) * possible);
487   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
488                                                GNUNET_TIME_UNIT_FOREVER_REL,
489                                                GNUNET_YES, /* retry is OK in the initial stage */
490                                                &do_send_message, h);
491   GNUNET_assert (NULL != h->th);
492   return h;
493 }
494
495
496 /**
497  * Cancel an ongoing computation or revoke our collaboration offer.
498  * Closes the connection to the service
499  *
500  * @param h computation handle to terminate
501  */
502 void
503 GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle *h)
504 {
505   if (NULL != h->th)
506   {
507     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
508     h->th = NULL;
509   }
510   GNUNET_free_non_null (h->elements);
511   GNUNET_free_non_null (h->msg);
512   if (NULL != h->client)
513   {
514     GNUNET_CLIENT_disconnect (h->client);
515     h->client = NULL;
516   }
517   GNUNET_free (h);
518 }
519
520
521 /* end of scalarproduct_api.c */