made the service more resilient against out of order and simply incorrect messages
[oweals/gnunet.git] / src / scalarproduct / scalarproduct_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file scalarproduct/scalarproduct_api.c
23  * @brief API for the scalarproduct
24  * @author Christian Fuchs
25  * @author Gaurav Kukreja
26  *
27  */
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_scalarproduct_service.h"
32 #include "gnunet_protocols.h"
33 #include "scalarproduct.h"
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
36
37 /**************************************************************
38  ***  Datatype Declarations                          **********
39  **************************************************************/
40
41 /**
42  * the abstraction function for our internal callback
43  */
44 typedef void (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (void *cls,
45                                                              const struct GNUNET_MessageHeader *msg,
46                                                              enum GNUNET_SCALARPRODUCT_ResponseStatus status);
47
48 /**
49  * Entry in the request queue per client
50  */
51 struct GNUNET_SCALARPRODUCT_ComputationHandle
52 {
53   /**
54    * This is a linked list.
55    */
56   struct GNUNET_SCALARPRODUCT_ComputationHandle *next;
57
58   /**
59    * This is a linked list.
60    */
61   struct GNUNET_SCALARPRODUCT_ComputationHandle *prev;
62
63   /**
64    * Our configuration.
65    */
66   const struct GNUNET_CONFIGURATION_Handle *cfg;
67
68   /**
69    * Current connection to the scalarproduct service.
70    */
71   struct GNUNET_CLIENT_Connection *client;
72
73   /**
74    * Handle for statistics.
75    */
76   struct GNUNET_STATISTICS_Handle *stats;
77
78   /**
79    * The shared session key identifying this computation
80    */
81   struct GNUNET_HashCode key;
82
83   /**
84    * Current transmit handle.
85    */
86   struct GNUNET_CLIENT_TransmitHandle *th;
87
88   /**
89    * Size of the message
90    */
91   uint16_t message_size;
92
93   /**
94    * Message to be sent to the scalarproduct service
95    */
96   struct GNUNET_SCALARPRODUCT_client_request * msg;
97
98   union
99   {
100   /**
101    * Function to call after transmission of the request.
102    */
103   GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
104
105   /**
106    * Function to call after transmission of the request.
107    */
108   GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
109   };
110
111   /**
112    * Closure for 'cont'.
113    */
114   void *cont_cls;
115
116   /**
117    * Response Processor for response from the service. This function calls the
118    * continuation function provided by the client.
119    */
120   GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
121 };
122
123 /**************************************************************
124  ***  Global Variables                               **********
125  **************************************************************/
126 /**
127  * Head of the active sessions queue
128  */
129 static struct GNUNET_SCALARPRODUCT_ComputationHandle *head;
130 /**
131  * Tail of the active sessions queue
132  */
133 static struct GNUNET_SCALARPRODUCT_ComputationHandle *tail;
134
135 /**************************************************************
136  ***  Function Declarations                          **********
137  **************************************************************/
138
139 void
140 GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h);
141
142 /**************************************************************
143  ***  Static Function Declarations                   **********
144  **************************************************************/
145
146
147 /**
148  * Handles the RESULT received in reply of prepare_response from the
149  * service
150  *
151  * @param cls Handle to the Master Context
152  * @param msg Pointer to the response received
153  */
154 static void
155 process_status_message (void *cls,
156                         const struct GNUNET_MessageHeader *msg,
157                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
158 {
159   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
160
161   qe->cont_status (qe->cont_cls, status);
162 }
163
164
165 /**
166  * Handles the RESULT received in reply of prepare_response from the
167  * service
168  *
169  * @param cls Handle to the Master Context
170  * @param msg Pointer to the response received
171  */
172 static void
173 process_result_message (void *cls,
174                         const struct GNUNET_MessageHeader *msg,
175                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
176 {
177   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
178   const struct GNUNET_SCALARPRODUCT_client_response *message =
179           (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
180   gcry_mpi_t result = NULL;
181   gcry_error_t rc;
182
183   if (GNUNET_SCALARPRODUCT_Status_Success == status)
184     {
185       size_t product_len = ntohl (message->product_length);
186       result = gcry_mpi_new (0);
187
188       if (0 < product_len)
189         {
190           gcry_mpi_t num;
191           size_t read = 0;
192
193           if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD, &message[1], product_len, &read)))
194             {
195               LOG_GCRY(GNUNET_ERROR_TYPE_ERROR, "gcry_mpi_scan", rc);
196               gcry_mpi_release (result);
197               result = NULL;
198               status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
199             }
200           else
201             {
202               if (0 < message->range)
203                 gcry_mpi_add (result, result, num);
204               else if (0 > message->range)
205                 gcry_mpi_sub (result, result, num);
206               gcry_mpi_release (num);
207             }
208         }
209     }
210   qe->cont_datum (qe->cont_cls, status, result);
211 }
212
213
214 /**
215  * Called when a response is received from the service. After basic check
216  * handler in qe->response_proc is called. This functions handles the response
217  * to the client which used the API.
218  *
219  * @param cls Pointer to the Master Context
220  * @param msg Pointer to the data received in response
221  */
222 static void
223 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
224 {
225   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
226   const struct GNUNET_SCALARPRODUCT_client_response *message =
227           (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
228   enum GNUNET_SCALARPRODUCT_ResponseStatus status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
229
230   if (NULL == msg)
231     {
232       LOG (GNUNET_ERROR_TYPE_WARNING, "Disconnected by Service.\n");
233       status = GNUNET_SCALARPRODUCT_Status_ServiceDisconnected;
234     }
235   else if (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT != ntohs (msg->type))
236     {
237       LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid message type received\n");
238     }
239   else if (0 < ntohl (message->product_length) || (0 == message->range))
240     {
241       // response for the responder client, successful
242       GNUNET_STATISTICS_update (qe->stats,
243                                 gettext_noop ("# SUC responder result messages received"), 1,
244                                 GNUNET_NO);
245
246       status = GNUNET_SCALARPRODUCT_Status_Success;
247     }
248
249   if (qe->cont_datum != NULL)
250     qe->response_proc (qe, msg, status);
251
252   GNUNET_CONTAINER_DLL_remove (head, tail, qe);
253   GNUNET_free (qe);
254 }
255
256
257 /**
258  * Transmits the request to the VectorProduct Sevice
259  *
260  * @param cls Closure
261  * @param size Size of the buffer
262  * @param buf Pointer to the buffer
263  *
264  * @return Size of the message sent
265  */
266 static size_t
267 transmit_request (void *cls, size_t size,
268                   void *buf)
269 {
270   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
271
272   if (NULL == buf)
273     {
274       LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
275       GNUNET_STATISTICS_update (qe->stats,
276                                 gettext_noop ("# transmission request failures"),
277                                 1, GNUNET_NO);
278
279       // notify caller about the error, done here.
280       if (qe->cont_datum != NULL)
281         qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
282
283       GNUNET_SCALARPRODUCT_cancel (cls);
284       return 0;
285     }
286   memcpy (buf, qe->msg, size);
287
288   GNUNET_free (qe->msg);
289   qe->msg = NULL;
290   qe->th = NULL;
291
292   GNUNET_CLIENT_receive (qe->client, &receive_cb, qe,
293                          GNUNET_TIME_UNIT_FOREVER_REL);
294
295 #if INSANE_STATISTICS
296   GNUNET_STATISTICS_update (qe->stats,
297                             gettext_noop ("# bytes sent to scalarproduct"), 1,
298                             GNUNET_NO);
299 #endif
300   return size;
301 }
302
303
304 /**************************************************************
305  ***  API                                            **********
306  **************************************************************/
307
308
309 /**
310  * Used by Bob's client to cooperate with Alice,
311  *
312  * @param h handle to the master context
313  * @param key Session key - unique to the requesting client
314  * @param elements Array of elements of the vector
315  * @param element_count Number of elements in the vector
316  * @param cont Callback function
317  * @param cont_cls Closure for the callback function
318  */
319 struct GNUNET_SCALARPRODUCT_ComputationHandle *
320 GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
321                                const struct GNUNET_HashCode * key,
322                                const int32_t * elements,
323                                uint32_t element_count,
324                                GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
325                                void *cont_cls)
326 {
327   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
328   struct GNUNET_SCALARPRODUCT_client_request *msg;
329   int32_t * vector;
330   uint16_t size;
331   uint64_t i;
332
333   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
334                  + element_count * sizeof (int32_t));
335   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
336   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
337   if (!h->client)
338     {
339       LOG (GNUNET_ERROR_TYPE_ERROR,
340            _ ("Failed to connect to the scalarproduct service\n"));
341       GNUNET_free (h);
342       return NULL;
343     }
344   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
345   if (!h->stats)
346     {
347       LOG (GNUNET_ERROR_TYPE_ERROR,
348            _ ("Failed to send a message to the statistics service\n"));
349       GNUNET_CLIENT_disconnect (h->client);
350       GNUNET_free (h);
351       return NULL;
352     }
353
354   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t);
355
356   h->cont_status = cont;
357   h->cont_cls = cont_cls;
358   h->response_proc = &process_status_message;
359   h->cfg = cfg;
360   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
361
362   msg = (struct GNUNET_SCALARPRODUCT_client_request*) GNUNET_malloc (size);
363   h->msg = msg;
364   msg->header.size = htons (size);
365   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB);
366   msg->element_count = htonl (element_count);
367
368   vector = (int32_t*) & msg[1];
369   // copy each element over to the message
370   for (i = 0; i < element_count; i++)
371     vector[i] = htonl (elements[i]);
372
373   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
374
375   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
376                                                GNUNET_TIME_UNIT_FOREVER_REL,
377                                                GNUNET_YES, // retry is OK in the initial stage
378                                                &transmit_request, h);
379   if (!h->th)
380     {
381       LOG (GNUNET_ERROR_TYPE_ERROR,
382            _ ("Failed to send a message to the scalarproduct service\n"));
383       GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
384       GNUNET_CLIENT_disconnect (h->client);
385       GNUNET_free (h->msg);
386       GNUNET_free (h);
387       return NULL;
388     }
389   GNUNET_CONTAINER_DLL_insert (head, tail, h);
390   return h;
391 }
392
393
394 /**
395  * Request by Alice's client for computing a scalar product
396  *
397  * @param h handle to the master context
398  * @param key Session key - unique to the requesting client
399  * @param peer PeerID of the other peer
400  * @param elements Array of elements of the vector
401  * @param element_count Number of elements in the vector
402  * @param mask Array of the mask
403  * @param mask_bytes number of bytes in the mask
404  * @param cont Callback function
405  * @param cont_cls Closure for the callback function
406  */
407 struct GNUNET_SCALARPRODUCT_ComputationHandle *
408 GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
409                               const struct GNUNET_HashCode * key,
410                               const struct GNUNET_PeerIdentity *peer,
411                               const int32_t * elements,
412                               uint32_t element_count,
413                               const unsigned char * mask,
414                               uint32_t mask_bytes,
415                               GNUNET_SCALARPRODUCT_DatumProcessor cont,
416                               void *cont_cls)
417 {
418   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
419   struct GNUNET_SCALARPRODUCT_client_request *msg;
420   int32_t * vector;
421   uint16_t size;
422   uint64_t i;
423
424   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
425                  +element_count * sizeof (int32_t)
426                  + mask_bytes);
427
428   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
429   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
430   if (!h->client)
431     {
432       LOG (GNUNET_ERROR_TYPE_ERROR,
433            _ ("Failed to connect to the scalarproduct service\n"));
434       GNUNET_free (h);
435       return NULL;
436     }
437   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
438   if (!h->stats)
439     {
440       LOG (GNUNET_ERROR_TYPE_ERROR,
441            _ ("Failed to send a message to the statistics service\n"));
442       GNUNET_CLIENT_disconnect (h->client);
443       GNUNET_free (h);
444       return NULL;
445     }
446
447   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_bytes;
448
449   h->cont_datum = cont;
450   h->cont_cls = cont_cls;
451   h->response_proc = &process_result_message;
452   h->cfg = cfg;
453   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
454
455   msg = (struct GNUNET_SCALARPRODUCT_client_request*) GNUNET_malloc (size);
456   h->msg = msg;
457   msg->header.size = htons (size);
458   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
459   msg->element_count = htonl (element_count);
460   msg->mask_length = htonl (mask_bytes);
461
462   vector = (int32_t*) & msg[1];
463   // copy each element over to the message
464   for (i = 0; i < element_count; i++)
465     vector[i] = htonl (elements[i]);
466
467   memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
468   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
469   memcpy (&vector[element_count], mask, mask_bytes);
470
471   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
472                                                GNUNET_TIME_UNIT_FOREVER_REL,
473                                                GNUNET_YES, // retry is OK in the initial stage
474                                                &transmit_request, h);
475   if (!h->th)
476     {
477       LOG (GNUNET_ERROR_TYPE_ERROR,
478            _ ("Failed to send a message to the scalarproduct service\n"));
479       GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
480       GNUNET_CLIENT_disconnect (h->client);
481       GNUNET_free (h->msg);
482       GNUNET_free (h);
483       return NULL;
484     }
485   GNUNET_CONTAINER_DLL_insert (head, tail, h);
486   return h;
487 }
488
489
490 /**
491  * Disconnect from the scalarproduct service.
492  *
493  * @param h a computation handle to cancel
494  */
495 void
496 GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h)
497 {
498   struct GNUNET_SCALARPRODUCT_ComputationHandle * qe;
499
500   for (qe = head; head != NULL; qe = head)
501     {
502       if (qe == h)
503         {
504           GNUNET_CONTAINER_DLL_remove (head, tail, qe);
505           if (NULL != qe->th)
506             GNUNET_CLIENT_notify_transmit_ready_cancel (qe->th);
507           GNUNET_CLIENT_disconnect (qe->client);
508           GNUNET_STATISTICS_destroy (qe->stats, GNUNET_YES);
509           GNUNET_free_non_null (qe->msg);
510           GNUNET_free (qe);
511           break;
512         }
513     }
514 }
515 /**
516  * Cancel ALL our ongoing scalar product computations and collaboration offers.
517  * Closes ALL connections to the service
518  */
519 void
520 GNUNET_SCALARPRODUCT_disconnect ()
521 {
522     struct GNUNET_SCALARPRODUCT_ComputationHandle * qe;
523
524     LOG (GNUNET_ERROR_TYPE_INFO, "Disconnecting from VectorProduct\n");
525     for (qe = head; head != NULL; qe = head)
526     {
527         GNUNET_CONTAINER_DLL_remove (head, tail, qe);
528         if (NULL != qe->th)
529             GNUNET_CLIENT_notify_transmit_ready_cancel (qe->th);
530         GNUNET_CLIENT_disconnect (qe->client);
531         GNUNET_STATISTICS_destroy (qe->stats, GNUNET_YES);
532         GNUNET_free_non_null (qe->msg);
533         GNUNET_free (qe);
534     }
535 }
536
537 /* end of ext_api.c */