-simplify logic
[oweals/gnunet.git] / src / scalarproduct / scalarproduct_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20 /**
21  * @file scalarproduct/scalarproduct_api.c
22  * @brief API for the scalarproduct
23  * @author Christian Fuchs
24  * @author Gaurav Kukreja
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_statistics_service.h"
30 #include "gnunet_scalarproduct_service.h"
31 #include "gnunet_protocols.h"
32 #include "scalarproduct.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
35
36
37 /**
38  * The abstraction function for our internal callback
39  *
40  * @param h computation handle
41  * @param msg response we got, NULL on errors
42  * @param status processing status code
43  */
44 typedef void
45 (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
46                                                 const struct ClientResponseMessage *msg,
47                                                 enum GNUNET_SCALARPRODUCT_ResponseStatus status);
48
49
50 /**
51  * A handle returned for each computation
52  */
53 struct GNUNET_SCALARPRODUCT_ComputationHandle
54 {
55   /**
56    * Our configuration.
57    */
58   const struct GNUNET_CONFIGURATION_Handle *cfg;
59
60   /**
61    * Current connection to the scalarproduct service.
62    */
63   struct GNUNET_CLIENT_Connection *client;
64
65   /**
66    * Current transmit handle.
67    */
68   struct GNUNET_CLIENT_TransmitHandle *th;
69
70   /**
71    * the client's elements which
72    */
73   struct GNUNET_SCALARPRODUCT_Element *elements;
74
75   /**
76    * Message to be sent to the scalarproduct service
77    */
78   struct GNUNET_MessageHeader *msg;
79
80   /**
81    * Function to call after transmission of the request (Bob).
82    */
83   GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
84
85   /**
86    * Function to call after transmission of the request (Alice).
87    */
88   GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
89
90   /**
91    * Closure for @e cont_status or @e cont_datum.
92    */
93   void *cont_cls;
94
95   /**
96    * API internal callback for results and failures to be forwarded to
97    * the client.
98    */
99   GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
100
101   /**
102    * The shared session key identifying this computation
103    */
104   struct GNUNET_HashCode key;
105
106   /**
107    * count of all @e elements we offer for computation
108    */
109   uint32_t element_count_total;
110
111   /**
112    * count of the transfered @e elements we offer for computation
113    */
114   uint32_t element_count_transfered;
115
116 };
117
118
119 /**
120  * Handles the STATUS received from the service for a response, does
121  * not contain a payload.  Called when we participate as "Bob" via
122  * #GNUNET_SCALARPRODUCT_accept_computation().
123  *
124  * @param h our Handle
125  * @param msg the response received
126  * @param status the condition the request was terminated with (eg: disconnect)
127  */
128 static void
129 process_status_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
130                         const struct ClientResponseMessage *msg,
131                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
132 {
133   if (NULL != h->cont_status)
134     h->cont_status (h->cont_cls,
135                     status);
136   GNUNET_SCALARPRODUCT_cancel (h);
137 }
138
139
140 /**
141  * Handles the RESULT received from the service for a request, should
142  * contain a result MPI value.  Called when we participate as "Alice" via
143  * #GNUNET_SCALARPRODUCT_start_computation().
144  *
145  * @param h our Handle
146  * @param msg Pointer to the response received
147  * @param status the condition the request was terminated with (eg: disconnect)
148  */
149 static void
150 process_result_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
151                         const struct ClientResponseMessage *msg,
152                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
153 {
154   size_t product_len;
155   gcry_mpi_t result = NULL;
156   gcry_error_t rc;
157   gcry_mpi_t num;
158   size_t rsize;
159
160   if ( (GNUNET_SCALARPRODUCT_Status_Success == status) &&
161        ( (NULL == msg) ||
162          ( (ntohs (msg->header.size) - sizeof (struct ClientResponseMessage)
163             != (product_len = ntohl (msg->product_length))) ) ) )
164   {
165     GNUNET_break (0);
166     status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
167   }
168   if (GNUNET_SCALARPRODUCT_Status_Success == status)
169   {
170     result = gcry_mpi_new (0);
171
172     if (0 < product_len)
173     {
174       rsize = 0;
175       if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD,
176                                     &msg[1],
177                                     product_len,
178                                     &rsize)))
179       {
180         LOG_GCRY (GNUNET_ERROR_TYPE_ERROR,
181                   "gcry_mpi_scan",
182                   rc);
183         gcry_mpi_release (result);
184         result = NULL;
185         status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
186       }
187       else
188       {
189         if (0 < ntohl (msg->range))
190           gcry_mpi_add (result, result, num);
191         else if (0 > ntohl (msg->range))
192           gcry_mpi_sub (result, result, num);
193         gcry_mpi_release (num);
194       }
195     }
196   }
197   h->cont_datum (h->cont_cls,
198                  status,
199                  result);
200   if (NULL != result)
201     gcry_mpi_release (result);
202   GNUNET_SCALARPRODUCT_cancel (h);
203 }
204
205
206 /**
207  * Called when a response is received from the service. After basic check, the
208  * handler in qe->response_proc is called. This functions handles the response
209  * to the client which used the API.
210  *
211  * @param cls Pointer to the Master Context
212  * @param msg Pointer to the data received in response
213  */
214 static void
215 receive_cb (void *cls,
216             const struct GNUNET_MessageHeader *msg)
217 {
218   struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
219   const struct ClientResponseMessage *message;
220
221   if (NULL == msg)
222   {
223     LOG (GNUNET_ERROR_TYPE_INFO,
224          "Disconnected from SCALARPRODUCT service.\n");
225     h->response_proc (h,
226                       NULL,
227                       GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
228     return;
229   }
230   if (ntohs (msg->size) != sizeof (struct ClientResponseMessage))
231   {
232     GNUNET_break (0);
233     h->response_proc (h,
234                       NULL,
235                       GNUNET_SCALARPRODUCT_Status_InvalidResponse);
236     return;
237   }
238   message = (const struct ClientResponseMessage *) msg;
239   if (GNUNET_SYSERR == ntohl (message->status))
240   {
241     h->response_proc (h,
242                       NULL,
243                       GNUNET_SCALARPRODUCT_Status_Failure);
244     return;
245   }
246   h->response_proc (h,
247                     message,
248                     GNUNET_SCALARPRODUCT_Status_Success);
249 }
250
251
252 /**
253  * Transmits the request to the SCALARPRODUCT service
254  *
255  * @param cls Closure with the `struct GNUNET_SCALARPRODUCT_ComputationHandle`
256  * @param size Size of the buffer @a buf
257  * @param buf Pointer to the buffer
258  * @return Size of the message sent
259  */
260 static size_t
261 do_send_message (void *cls,
262                  size_t size,
263                  void *buf)
264 {
265   struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
266   struct ComputationMultipartMessage *msg;
267   size_t ret;
268   uint32_t nsize;
269   uint32_t todo;
270
271   h->th = NULL;
272   if (NULL == buf)
273   {
274     LOG (GNUNET_ERROR_TYPE_DEBUG,
275          "Failed to transmit request to SCALARPRODUCT.\n");
276     /* notify caller about the error, done here */
277     h->response_proc (h, NULL,
278                       GNUNET_SCALARPRODUCT_Status_Failure);
279     return 0;
280   }
281   ret = ntohs (h->msg->size);
282   memcpy (buf, h->msg, ret);
283   GNUNET_free (h->msg);
284   h->msg = NULL;
285
286   /* done sending? */
287   if (h->element_count_total == h->element_count_transfered)
288   {
289     GNUNET_CLIENT_receive (h->client,
290                            &receive_cb, h,
291                            GNUNET_TIME_UNIT_FOREVER_REL);
292     return ret;
293   }
294
295   todo = h->element_count_total - h->element_count_transfered;
296   nsize = sizeof (struct ComputationMultipartMessage)
297     + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
298   if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= size)
299   {
300     /* cannot do all of them, limit to what is possible in one message */
301     todo = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationMultipartMessage))
302       / sizeof (struct GNUNET_SCALARPRODUCT_Element);
303     nsize = sizeof (struct ComputationMultipartMessage)
304       + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
305   }
306
307   msg = GNUNET_malloc (nsize);
308   h->msg = &msg->header;
309   msg->header.size = htons (nsize);
310   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART);
311   msg->element_count_contained = htonl (todo);
312   memcpy (&msg[1],
313           &h->elements[h->element_count_transfered],
314           todo * sizeof (struct GNUNET_SCALARPRODUCT_Element));
315   h->element_count_transfered += todo;
316   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, nsize,
317                                                GNUNET_TIME_UNIT_FOREVER_REL,
318                                                GNUNET_NO,
319                                                &do_send_message, h);
320   GNUNET_assert (NULL != h->th);
321   return ret;
322 }
323
324
325 /**
326  * Used by Bob's client to cooperate with Alice,
327  *
328  * @param cfg the gnunet configuration handle
329  * @param key Session key unique to the requesting client
330  * @param elements Array of elements of the vector
331  * @param element_count Number of elements in the @a elements vector
332  * @param cont Callback function
333  * @param cont_cls Closure for @a cont
334  * @return a new handle for this computation
335  */
336 struct GNUNET_SCALARPRODUCT_ComputationHandle *
337 GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handle *cfg,
338                                          const struct GNUNET_HashCode *session_key,
339                                          const struct GNUNET_SCALARPRODUCT_Element *elements,
340                                          uint32_t element_count,
341                                          GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
342                                          void *cont_cls)
343 {
344   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
345   struct ComputationMessage *msg;
346   uint32_t size;
347   uint16_t possible;
348
349   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
350   h->cont_status = cont;
351   h->cont_cls = cont_cls;
352   h->response_proc = &process_status_message;
353   h->cfg = cfg;
354   h->key = *session_key;
355   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
356   h->element_count_total = element_count;
357   if (NULL == h->client)
358   {
359     /* scalarproduct configuration error */
360     GNUNET_break (0);
361     GNUNET_free (h);
362     return NULL;
363   }
364   size = sizeof (struct ComputationMessage)
365     + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
366   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size)
367   {
368     possible = element_count;
369     h->element_count_transfered = element_count;
370   }
371   else
372   {
373     /* create a multipart msg, first we calculate a new msg size for the head msg */
374     possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationMessage))
375       / sizeof (struct GNUNET_SCALARPRODUCT_Element);
376     h->element_count_transfered = possible;
377     size = sizeof (struct ComputationMessage)
378       + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element);
379     h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
380     memcpy (h->elements,
381             elements,
382             sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count);
383   }
384
385   msg = GNUNET_malloc (size);
386   h->msg = &msg->header;
387   msg->header.size = htons (size);
388   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB);
389   msg->element_count_total = htonl (element_count);
390   msg->element_count_contained = htonl (possible);
391   msg->session_key = *session_key;
392   memcpy (&msg[1],
393           elements,
394           possible * sizeof (struct GNUNET_SCALARPRODUCT_Element));
395   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
396                                                GNUNET_TIME_UNIT_FOREVER_REL,
397                                                GNUNET_YES, /* retry is OK in the initial stage */
398                                                &do_send_message, h);
399   GNUNET_assert (NULL != h->th);
400   return h;
401 }
402
403
404 /**
405  * Request by Alice's client for computing a scalar product
406  *
407  * @param cfg the gnunet configuration handle
408  * @param session_key Session key should be unique to the requesting client
409  * @param peer PeerID of the other peer
410  * @param elements Array of elements of the vector
411  * @param element_count Number of elements in the @a elements vector
412  * @param cont Callback function
413  * @param cont_cls Closure for @a cont
414  * @return a new handle for this computation
415  */
416 struct GNUNET_SCALARPRODUCT_ComputationHandle *
417 GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle *cfg,
418                                         const struct GNUNET_HashCode *session_key,
419                                         const struct GNUNET_PeerIdentity *peer,
420                                         const struct GNUNET_SCALARPRODUCT_Element *elements,
421                                         uint32_t element_count,
422                                         GNUNET_SCALARPRODUCT_DatumProcessor cont,
423                                         void *cont_cls)
424 {
425   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
426   struct ComputationMessage *msg;
427   uint32_t size;
428   uint32_t possible;
429
430   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
431   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
432   if (NULL == h->client)
433   {
434     /* missconfigured scalarproduct service */
435     GNUNET_break (0);
436     GNUNET_free (h);
437     return NULL;
438   }
439   h->element_count_total = element_count;
440   h->cont_datum = cont;
441   h->cont_cls = cont_cls;
442   h->response_proc = &process_result_message;
443   h->cfg = cfg;
444   h->key = *session_key;
445   size = sizeof (struct ComputationMessage)
446     + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
447   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size)
448   {
449     possible = element_count;
450     h->element_count_transfered = element_count;
451   }
452   else
453   {
454     /* create a multipart msg, first we calculate a new msg size for the head msg */
455     possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationMessage))
456       / sizeof (struct GNUNET_SCALARPRODUCT_Element);
457     h->element_count_transfered = possible;
458     size = sizeof (struct ComputationMessage)
459       + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element);
460     h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
461     memcpy (h->elements,
462             elements,
463             sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count);
464   }
465
466   msg = GNUNET_malloc (size);
467   h->msg = &msg->header;
468   msg->header.size = htons (size);
469   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
470   msg->element_count_total = htonl (element_count);
471   msg->element_count_contained = htonl (possible);
472   msg->reserved = htonl (0);
473   msg->peer = *peer;
474   msg->session_key = *session_key;
475   memcpy (&msg[1],
476           elements,
477           sizeof (struct GNUNET_SCALARPRODUCT_Element) * possible);
478   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
479                                                GNUNET_TIME_UNIT_FOREVER_REL,
480                                                GNUNET_YES, /* retry is OK in the initial stage */
481                                                &do_send_message, h);
482   GNUNET_assert (NULL != h->th);
483   return h;
484 }
485
486
487 /**
488  * Cancel an ongoing computation or revoke our collaboration offer.
489  * Closes the connection to the service
490  *
491  * @param h computation handle to terminate
492  */
493 void
494 GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle *h)
495 {
496   if (NULL != h->th)
497   {
498     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
499     h->th = NULL;
500   }
501   GNUNET_free_non_null (h->elements);
502   GNUNET_free_non_null (h->msg);
503   if (NULL != h->client)
504   {
505     GNUNET_CLIENT_disconnect (h->client);
506     h->client = NULL;
507   }
508   GNUNET_free (h);
509 }
510
511
512 /* end of scalarproduct_api.c */