9c497129a71d820c8a845088241dcfba12afbc32
[oweals/gnunet.git] / src / scalarproduct / scalarproduct_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file scalarproduct/scalarproduct_api.c
23  * @brief API for the scalarproduct
24  * @author Christian Fuchs
25  * @author Gaurav Kukreja
26  *
27  */
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_scalarproduct_service.h"
32 #include "gnunet_protocols.h"
33 #include "scalarproduct.h"
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
36
37 /**************************************************************
38  ***  Datatype Declarations                          **********
39  **************************************************************/
40
41 /**
42  * the abstraction function for our internal callback
43  */
44 typedef void (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (void *cls,
45                                                              const struct GNUNET_MessageHeader *msg,
46                                                              enum GNUNET_SCALARPRODUCT_ResponseStatus status);
47
48 /**
49  * A handle returned for each computation
50  */
51 struct GNUNET_SCALARPRODUCT_ComputationHandle
52 {
53   /**
54    * Our configuration.
55    */
56   const struct GNUNET_CONFIGURATION_Handle *cfg;
57
58   /**
59    * Current connection to the scalarproduct service.
60    */
61   struct GNUNET_CLIENT_Connection *client;
62
63   /**
64    * Handle for statistics.
65    */
66   struct GNUNET_STATISTICS_Handle *stats;
67
68   /**
69    * The shared session key identifying this computation
70    */
71   struct GNUNET_HashCode key;
72
73   /**
74    * Current transmit handle.
75    */
76   struct GNUNET_CLIENT_TransmitHandle *th;
77
78   /**
79    * count of all elements we offer for computation
80    */
81   uint32_t element_count_total;
82
83   /**
84    * count of the transfered elements we offer for computation
85    */
86   uint32_t element_count_transfered;
87   
88   /**
89    * the client's elements which 
90    */
91   struct GNUNET_SCALARPRODUCT_Element * elements;
92   
93   /**
94    * Message to be sent to the scalarproduct service
95    */
96   void * msg;
97
98   /**
99    * The client's msg handler callback
100    */
101   union
102   {
103   /**
104    * Function to call after transmission of the request (Bob).
105    */
106   GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
107
108   /**
109    * Function to call after transmission of the request (Alice).
110    */
111   GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
112   };
113
114   /**
115    * Closure for 'cont'.
116    */
117   void *cont_cls;
118
119   /**
120    * API internal callback for results and failures to be forwarded to the client
121    */
122   GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
123   
124   /**
125    * 
126    */
127   GNUNET_SCHEDULER_TaskIdentifier cont_multipart;
128 };
129
130 /**************************************************************
131  ***  Forward Function Declarations                          **********
132  **************************************************************/
133
134 void
135 GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h);
136
137 static size_t do_send_message (void *cls, size_t size, void *buf);
138 /**************************************************************
139  ***  Static Function Declarations                   **********
140  **************************************************************/
141
142
143 /**
144  * Handles the STATUS received from the service for a response, does not contain a payload
145  *
146  * @param cls our Handle
147  * @param msg Pointer to the response received
148  * @param status the condition the request was terminated with (eg: disconnect)
149  */
150 static void
151 process_status_message (void *cls,
152                         const struct GNUNET_MessageHeader *msg,
153                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
154 {
155   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
156
157   qe->cont_status (qe->cont_cls, status);
158 }
159
160
161 /**
162  * Handles the RESULT received from the service for a request, should contain a result MPI value
163  *
164  * @param cls our Handle
165  * @param msg Pointer to the response received
166  * @param status the condition the request was terminated with (eg: disconnect)
167  */
168 static void
169 process_result_message (void *cls,
170                         const struct GNUNET_MessageHeader *msg,
171                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
172 {
173   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
174   const struct GNUNET_SCALARPRODUCT_client_response *message =
175           (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
176   gcry_mpi_t result = NULL;
177   gcry_error_t rc;
178
179   if (GNUNET_SCALARPRODUCT_Status_Success == status)
180     {
181       size_t product_len = ntohl (message->product_length);
182       result = gcry_mpi_new (0);
183
184       if (0 < product_len)
185         {
186           gcry_mpi_t num;
187           size_t read = 0;
188
189           if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD, &message[1], product_len, &read)))
190             {
191               LOG_GCRY(GNUNET_ERROR_TYPE_ERROR, "gcry_mpi_scan", rc);
192               gcry_mpi_release (result);
193               result = NULL;
194               status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
195             }
196           else
197             {
198               if (0 < message->range)
199                 gcry_mpi_add (result, result, num);
200               else if (0 > message->range)
201                 gcry_mpi_sub (result, result, num);
202               gcry_mpi_release (num);
203             }
204         }
205     }
206   qe->cont_datum (qe->cont_cls, status, result);
207 }
208
209
210 /**
211  * Called when a response is received from the service. After basic check, the
212  * handler in qe->response_proc is called. This functions handles the response
213  * to the client which used the API.
214  *
215  * @param cls Pointer to the Master Context
216  * @param msg Pointer to the data received in response
217  */
218 static void
219 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
220 {
221   struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
222   const struct GNUNET_SCALARPRODUCT_client_response *message =
223           (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
224   enum GNUNET_SCALARPRODUCT_ResponseStatus status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
225
226   if (NULL == msg)
227     {
228       LOG (GNUNET_ERROR_TYPE_WARNING, "Disconnected by Service.\n");
229       status = GNUNET_SCALARPRODUCT_Status_ServiceDisconnected;
230     }
231   else if ((GNUNET_SYSERR != message->status) && (0 < message->product_length ))
232     {
233       // response for the responder client, successful
234       GNUNET_STATISTICS_update (h->stats,
235                                 gettext_noop ("# SUC responder result messages received"), 1,
236                                 GNUNET_NO);
237
238       status = GNUNET_SCALARPRODUCT_Status_Success;
239     }
240   else if (message->status == GNUNET_SYSERR){
241       // service signaled an error
242       status = GNUNET_SCALARPRODUCT_Status_Failure;
243   }
244   
245   if (h->cont_status != NULL)
246     h->response_proc (h, msg, status);
247
248   GNUNET_free (h);
249 }
250
251
252 static void
253 send_multipart (void * cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
254 {
255   struct GNUNET_SCALARPRODUCT_ComputationHandle *h = (struct GNUNET_SCALARPRODUCT_ComputationHandle *) cls;
256   struct GNUNET_SCALARPRODUCT_computation_message_multipart *msg;
257   uint32_t size;
258   uint32_t todo;
259
260   h->cont_multipart = GNUNET_SCHEDULER_NO_TASK;
261
262   todo = h->element_count_total - h->element_count_transfered;
263   size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart) +todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
264   if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= size) {
265     //create a multipart msg, first we calculate a new msg size for the head msg
266     todo = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart)) / sizeof (struct GNUNET_SCALARPRODUCT_Element);
267     size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart) +todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
268   }
269
270   msg = (struct GNUNET_SCALARPRODUCT_computation_message_multipart*) GNUNET_malloc (size);
271   h->msg = msg;
272   msg->header.size = htons (size);
273   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART);
274   msg->element_count_contained = htonl (todo);
275
276   memcpy (&msg[1], &h->elements[h->element_count_transfered], todo);
277   h->element_count_transfered += todo;
278
279   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
280                                                GNUNET_TIME_UNIT_FOREVER_REL,
281                                                GNUNET_YES, // retry is OK in the initial stage
282                                                &do_send_message, h);
283
284   if (!h->th) {
285     LOG (GNUNET_ERROR_TYPE_ERROR,
286          _ ("Failed to send a multipart message to the scalarproduct service\n"));
287     GNUNET_STATISTICS_update (h->stats,
288                               gettext_noop ("# transmission request failures"),
289                               1, GNUNET_NO);
290     GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
291     GNUNET_CLIENT_disconnect (h->client);
292     GNUNET_free (h->msg);
293     h->msg = NULL;
294     if (h->cont_status != NULL)
295       h->response_proc (h, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
296
297     GNUNET_SCALARPRODUCT_cancel (cls);
298   }
299 }
300
301 /**
302  * Transmits the request to the VectorProduct Service
303  *
304  * @param cls Closure
305  * @param size Size of the buffer
306  * @param buf Pointer to the buffer
307  *
308  * @return Size of the message sent
309  */
310 static size_t
311 do_send_message (void *cls, size_t size,
312                  void *buf)
313 {
314   struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
315
316   if (NULL == buf) {
317     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
318     GNUNET_STATISTICS_update (h->stats,
319                               gettext_noop ("# transmission request failures"),
320                               1, GNUNET_NO);
321
322     // notify caller about the error, done here.
323     if (h->cont_status != NULL)
324       h->response_proc (h, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
325
326     GNUNET_SCALARPRODUCT_cancel (cls);
327     return 0;
328   }
329   memcpy (buf, h->msg, size);
330
331   GNUNET_free (h->msg);
332   h->msg = NULL;
333   h->th = NULL;
334
335 #if INSANE_STATISTICS
336   GNUNET_STATISTICS_update (h->stats,
337                             gettext_noop ("# bytes sent to scalarproduct"), 1,
338                             GNUNET_NO);
339 #endif
340
341   /* done sending */
342   if (h->element_count_total == h->element_count_transfered) {
343     GNUNET_CLIENT_receive (h->client, &receive_cb, h,
344                            GNUNET_TIME_UNIT_FOREVER_REL);
345     return size;
346   }
347   
348   h->cont_multipart = GNUNET_SCHEDULER_add_now (&send_multipart, h);
349   
350   return size;
351 }
352
353
354 /**************************************************************
355  ***  API                                            **********
356  **************************************************************/
357
358
359 /**
360  * Used by Bob's client to cooperate with Alice,
361  *
362  * @param cfg the gnunet configuration handle
363  * @param key Session key unique to the requesting client
364  * @param elements Array of elements of the vector
365  * @param element_count Number of elements in the vector
366  * @param cont Callback function
367  * @param cont_cls Closure for the callback function
368  *
369  * @return a new handle for this computation
370  */
371 struct GNUNET_SCALARPRODUCT_ComputationHandle *
372 GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handle * cfg,
373                                const struct GNUNET_HashCode * session_key,
374                                const struct GNUNET_SCALARPRODUCT_Element * elements,
375                                uint32_t element_count,
376                                GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
377                                void * cont_cls)
378 {
379   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
380   struct GNUNET_SCALARPRODUCT_computation_message *msg;
381   uint32_t size;
382   uint16_t possible;
383
384   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_computation_message)
385                  + element_count * sizeof (int32_t));
386   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
387   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
388   if (!h->client)
389     {
390       LOG (GNUNET_ERROR_TYPE_ERROR,
391            _ ("Failed to connect to the scalarproduct service\n"));
392       GNUNET_free (h);
393       return NULL;
394     }
395   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
396   if (!h->stats)
397     {
398       LOG (GNUNET_ERROR_TYPE_ERROR,
399            _ ("Failed to send a message to the statistics service\n"));
400       GNUNET_CLIENT_disconnect (h->client);
401       GNUNET_free (h);
402       return NULL;
403     }
404
405   h->element_count_total = element_count;
406   size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
407   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) {
408     possible = element_count;
409     h->element_count_transfered = element_count;
410   }
411   else {
412     //create a multipart msg, first we calculate a new msg size for the head msg
413     possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_computation_message)) / sizeof (struct GNUNET_SCALARPRODUCT_Element);
414     h->element_count_transfered = possible;
415     size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + possible*sizeof (struct GNUNET_SCALARPRODUCT_Element);
416     h->elements = (struct GNUNET_SCALARPRODUCT_Element*) 
417             GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
418     memcpy (h->elements, elements, sizeof (struct GNUNET_SCALARPRODUCT_Element)*element_count);
419   }
420
421   h->cont_status = cont;
422   h->cont_cls = cont_cls;
423   h->response_proc = &process_status_message;
424   h->cfg = cfg;
425   memcpy (&h->key, session_key, sizeof (struct GNUNET_HashCode));
426
427   msg = (struct GNUNET_SCALARPRODUCT_computation_message*) GNUNET_malloc (size);
428   h->msg = msg;
429   msg->header.size = htons (size);
430   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB);
431   msg->element_count_total = htonl (element_count);
432   msg->element_count_contained = htonl (possible);
433
434   memcpy (&msg->session_key, session_key, sizeof (struct GNUNET_HashCode));
435   memcpy (&msg[1], elements, possible);
436
437   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
438                                                GNUNET_TIME_UNIT_FOREVER_REL,
439                                                GNUNET_YES, // retry is OK in the initial stage
440                                                &do_send_message, h);
441   if (!h->th)
442     {
443       LOG (GNUNET_ERROR_TYPE_ERROR,
444            _ ("Failed to send a message to the scalarproduct service\n"));
445       GNUNET_STATISTICS_update (h->stats,
446                               gettext_noop ("# transmission request failures"),
447                               1, GNUNET_NO);
448       GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
449       GNUNET_CLIENT_disconnect (h->client);
450       GNUNET_free (h->msg);
451       GNUNET_free_non_null (h->elements);
452       GNUNET_free (h);
453       return NULL;
454     }
455   return h;
456 }
457
458
459 /**
460  * Request by Alice's client for computing a scalar product
461  *
462  * @param cfg the gnunet configuration handle
463  * @param session_key Session key should be unique to the requesting client
464  * @param peer PeerID of the other peer
465  * @param elements Array of elements of the vector
466  * @param element_count Number of elements in the vector
467  * @param cont Callback function
468  * @param cont_cls Closure for the callback function
469  *
470  * @return a new handle for this computation
471  */
472 struct GNUNET_SCALARPRODUCT_ComputationHandle *
473 GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle * cfg,
474                               const struct GNUNET_HashCode * session_key,
475                               const struct GNUNET_PeerIdentity *peer,
476                               const struct GNUNET_SCALARPRODUCT_Element * elements,
477                               uint32_t element_count,
478                               GNUNET_SCALARPRODUCT_DatumProcessor cont,
479                               void * cont_cls)
480 {
481   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
482   struct GNUNET_SCALARPRODUCT_computation_message *msg;
483   uint32_t size;
484   uint16_t possible;
485
486   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
487   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
488   if (!h->client)
489     {
490       LOG (GNUNET_ERROR_TYPE_ERROR,
491            _ ("Failed to connect to the scalarproduct service\n"));
492       GNUNET_free (h);
493       return NULL;
494     }
495   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
496   if (!h->stats)
497     {
498       LOG (GNUNET_ERROR_TYPE_ERROR,
499            _ ("Failed to send a message to the statistics service\n"));
500       GNUNET_CLIENT_disconnect (h->client);
501       GNUNET_free (h);
502       return NULL;
503     }
504
505   h->element_count_total = element_count;
506   size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
507   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) {
508     possible = element_count;
509     h->element_count_transfered = element_count;
510   }
511   else {
512     //create a multipart msg, first we calculate a new msg size for the head msg
513     possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_computation_message)) / sizeof (struct GNUNET_SCALARPRODUCT_Element);
514     h->element_count_transfered = possible;
515     size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + possible*sizeof (struct GNUNET_SCALARPRODUCT_Element);
516     h->elements = (struct GNUNET_SCALARPRODUCT_Element*) 
517             GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
518     memcpy (h->elements, elements, sizeof (struct GNUNET_SCALARPRODUCT_Element)*element_count);
519   }
520   
521   h->cont_datum = cont;
522   h->cont_cls = cont_cls;
523   h->response_proc = &process_result_message;
524   h->cfg = cfg;
525   memcpy (&h->key, session_key, sizeof (struct GNUNET_HashCode));
526
527   msg = (struct GNUNET_SCALARPRODUCT_computation_message*) GNUNET_malloc (size);
528   h->msg = msg;
529   msg->header.size = htons (size);
530   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
531   msg->element_count_total = htonl (element_count);
532   msg->element_count_contained = htonl (possible);
533
534   memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
535   memcpy (&msg->session_key, session_key, sizeof (struct GNUNET_HashCode));
536   memcpy (&msg[1], elements, possible);
537
538   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
539                                                GNUNET_TIME_UNIT_FOREVER_REL,
540                                                GNUNET_YES, // retry is OK in the initial stage
541                                                &do_send_message, h);
542   if (!h->th)
543     {
544       LOG (GNUNET_ERROR_TYPE_ERROR,
545            _ ("Failed to send a message to the scalarproduct service\n"));
546       GNUNET_STATISTICS_update (h->stats,
547                               gettext_noop ("# transmission request failures"),
548                               1, GNUNET_NO);
549       GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
550       GNUNET_CLIENT_disconnect (h->client);
551       GNUNET_free (h->msg);
552       GNUNET_free_non_null (h->elements);
553       GNUNET_free (h);
554       return NULL;
555     }
556   return h;
557 }
558
559 /**
560  * Cancel an ongoing computation or revoke our collaboration offer.
561  * Closes the connection to the service
562  *
563  * @param h computation handle to terminate
564  */
565 void
566 GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h)
567 {
568   if (NULL != h->th)
569     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
570   if (GNUNET_SCHEDULER_NO_TASK != h->cont_multipart)
571     GNUNET_SCHEDULER_cancel (h->cont_multipart);
572   GNUNET_free_non_null (h->elements);
573   GNUNET_free_non_null (h->msg);
574   GNUNET_CLIENT_disconnect (h->client);
575   GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
576   GNUNET_free (h);
577 }
578
579
580 /* end of scalarproduct_api.c */