updated test-config template for scalarproduct
[oweals/gnunet.git] / src / scalarproduct / scalarproduct_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file scalarproduct/scalarproduct_api.c
23  * @brief API for the scalarproduct
24  * @author Christian Fuchs
25  * @author Gaurav Kukreja
26  *
27  */
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_scalarproduct_service.h"
32 #include "gnunet_protocols.h"
33 #include "scalarproduct.h"
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
36
37 /**************************************************************
38  ***  Datatype Declarations                          **********
39  **************************************************************/
40
41 /**
42  * the abstraction function for our internal callback
43  */
44 typedef void (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (void *cls,
45                                                              const struct GNUNET_MessageHeader *msg,
46                                                              enum GNUNET_SCALARPRODUCT_ResponseStatus status);
47
48 /**
49  * Entry in the request queue per client
50  */
51 struct GNUNET_SCALARPRODUCT_ComputationHandle
52 {
53   /**
54    * This is a linked list.
55    */
56   struct GNUNET_SCALARPRODUCT_ComputationHandle *next;
57
58   /**
59    * This is a linked list.
60    */
61   struct GNUNET_SCALARPRODUCT_ComputationHandle *prev;
62
63   /**
64    * Our configuration.
65    */
66   const struct GNUNET_CONFIGURATION_Handle *cfg;
67
68   /**
69    * Current connection to the scalarproduct service.
70    */
71   struct GNUNET_CLIENT_Connection *client;
72
73   /**
74    * Handle for statistics.
75    */
76   struct GNUNET_STATISTICS_Handle *stats;
77
78   /**
79    * The shared session key identifying this computation
80    */
81   struct GNUNET_HashCode key;
82
83   /**
84    * Current transmit handle.
85    */
86   struct GNUNET_CLIENT_TransmitHandle *th;
87
88   /**
89    * Size of the message
90    */
91   uint16_t message_size;
92
93   /**
94    * Message to be sent to the scalarproduct service
95    */
96   struct GNUNET_SCALARPRODUCT_client_request * msg;
97
98   /**
99    * The msg handler callback
100    */
101   union
102   {
103   /**
104    * Function to call after transmission of the request.
105    */
106   GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
107
108   /**
109    * Function to call after transmission of the request.
110    */
111   GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
112   };
113
114   /**
115    * Closure for 'cont'.
116    */
117   void *cont_cls;
118
119   /**
120    * Response Processor for response from the service. This function calls the
121    * continuation function provided by the client.
122    */
123   GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
124 };
125
126 /**************************************************************
127  ***  Global Variables                               **********
128  **************************************************************/
129 /**
130  * Head of the active sessions queue
131  */
132 static struct GNUNET_SCALARPRODUCT_ComputationHandle *head;
133 /**
134  * Tail of the active sessions queue
135  */
136 static struct GNUNET_SCALARPRODUCT_ComputationHandle *tail;
137
138 /**************************************************************
139  ***  Function Declarations                          **********
140  **************************************************************/
141
142 void
143 GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h);
144
145 /**************************************************************
146  ***  Static Function Declarations                   **********
147  **************************************************************/
148
149
150 /**
151  * Handles the STATUS received from the service for a response, does not contain a payload
152  *
153  * @param cls our Handle
154  * @param msg Pointer to the response received
155  * @param status the condition the request was terminated with (eg: disconnect)
156  */
157 static void
158 process_status_message (void *cls,
159                         const struct GNUNET_MessageHeader *msg,
160                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
161 {
162   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
163
164   qe->cont_status (qe->cont_cls, status);
165 }
166
167
168 /**
169  * Handles the RESULT received from the service for a request, should contain a result MPI value
170  *
171  * @param cls our Handle
172  * @param msg Pointer to the response received
173  * @param status the condition the request was terminated with (eg: disconnect)
174  */
175 static void
176 process_result_message (void *cls,
177                         const struct GNUNET_MessageHeader *msg,
178                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
179 {
180   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
181   const struct GNUNET_SCALARPRODUCT_client_response *message =
182           (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
183   gcry_mpi_t result = NULL;
184   gcry_error_t rc;
185
186   if (GNUNET_SCALARPRODUCT_Status_Success == status)
187     {
188       size_t product_len = ntohl (message->product_length);
189       result = gcry_mpi_new (0);
190
191       if (0 < product_len)
192         {
193           gcry_mpi_t num;
194           size_t read = 0;
195
196           if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD, &message[1], product_len, &read)))
197             {
198               LOG_GCRY(GNUNET_ERROR_TYPE_ERROR, "gcry_mpi_scan", rc);
199               gcry_mpi_release (result);
200               result = NULL;
201               status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
202             }
203           else
204             {
205               if (0 < message->range)
206                 gcry_mpi_add (result, result, num);
207               else if (0 > message->range)
208                 gcry_mpi_sub (result, result, num);
209               gcry_mpi_release (num);
210             }
211         }
212     }
213   qe->cont_datum (qe->cont_cls, status, result);
214 }
215
216
217 /**
218  * Called when a response is received from the service. After basic check, the
219  * handler in qe->response_proc is called. This functions handles the response
220  * to the client which used the API.
221  *
222  * @param cls Pointer to the Master Context
223  * @param msg Pointer to the data received in response
224  */
225 static void
226 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
227 {
228   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
229   const struct GNUNET_SCALARPRODUCT_client_response *message =
230           (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
231   enum GNUNET_SCALARPRODUCT_ResponseStatus status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
232
233   if (NULL == msg)
234     {
235       LOG (GNUNET_ERROR_TYPE_WARNING, "Disconnected by Service.\n");
236       status = GNUNET_SCALARPRODUCT_Status_ServiceDisconnected;
237     }
238   else if (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT != ntohs (msg->type))
239     {
240       LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid message type received\n");
241     }
242   else if (0 < ntohl (message->product_length) || (0 == message->range))
243     {
244       // response for the responder client, successful
245       GNUNET_STATISTICS_update (qe->stats,
246                                 gettext_noop ("# SUC responder result messages received"), 1,
247                                 GNUNET_NO);
248
249       status = GNUNET_SCALARPRODUCT_Status_Success;
250     }
251
252   if (qe->cont_datum != NULL)
253     qe->response_proc (qe, msg, status);
254
255   GNUNET_CONTAINER_DLL_remove (head, tail, qe);
256   GNUNET_free (qe);
257 }
258
259
260 /**
261  * Transmits the request to the VectorProduct Service
262  *
263  * @param cls Closure
264  * @param size Size of the buffer
265  * @param buf Pointer to the buffer
266  *
267  * @return Size of the message sent
268  */
269 static size_t
270 transmit_request (void *cls, size_t size,
271                   void *buf)
272 {
273   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
274
275   if (NULL == buf)
276     {
277       LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
278       GNUNET_STATISTICS_update (qe->stats,
279                                 gettext_noop ("# transmission request failures"),
280                                 1, GNUNET_NO);
281
282       // notify caller about the error, done here.
283       if (qe->cont_datum != NULL)
284         qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
285
286       GNUNET_SCALARPRODUCT_cancel (cls);
287       return 0;
288     }
289   memcpy (buf, qe->msg, size);
290
291   GNUNET_free (qe->msg);
292   qe->msg = NULL;
293   qe->th = NULL;
294
295   GNUNET_CLIENT_receive (qe->client, &receive_cb, qe,
296                          GNUNET_TIME_UNIT_FOREVER_REL);
297
298 #if INSANE_STATISTICS
299   GNUNET_STATISTICS_update (qe->stats,
300                             gettext_noop ("# bytes sent to scalarproduct"), 1,
301                             GNUNET_NO);
302 #endif
303   return size;
304 }
305
306
307 /**************************************************************
308  ***  API                                            **********
309  **************************************************************/
310
311
312 /**
313  * Used by Bob's client to cooperate with Alice,
314  *
315  * @param cfg the gnunet configuration handle
316  * @param key Session key unique to the requesting client
317  * @param elements Array of elements of the vector
318  * @param element_count Number of elements in the vector
319  * @param cont Callback function
320  * @param cont_cls Closure for the callback function
321  * 
322  * @return a new handle for this computation
323  */
324 struct GNUNET_SCALARPRODUCT_ComputationHandle *
325 GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle * cfg,
326                                const struct GNUNET_HashCode * key,
327                                const int32_t * elements,
328                                uint32_t element_count,
329                                GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
330                                void * cont_cls)
331 {
332   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
333   struct GNUNET_SCALARPRODUCT_client_request *msg;
334   int32_t * vector;
335   uint16_t size;
336   uint64_t i;
337
338   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
339                  + element_count * sizeof (int32_t));
340   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
341   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
342   if (!h->client)
343     {
344       LOG (GNUNET_ERROR_TYPE_ERROR,
345            _ ("Failed to connect to the scalarproduct service\n"));
346       GNUNET_free (h);
347       return NULL;
348     }
349   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
350   if (!h->stats)
351     {
352       LOG (GNUNET_ERROR_TYPE_ERROR,
353            _ ("Failed to send a message to the statistics service\n"));
354       GNUNET_CLIENT_disconnect (h->client);
355       GNUNET_free (h);
356       return NULL;
357     }
358
359   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t);
360
361   h->cont_status = cont;
362   h->cont_cls = cont_cls;
363   h->response_proc = &process_status_message;
364   h->cfg = cfg;
365   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
366
367   msg = (struct GNUNET_SCALARPRODUCT_client_request*) GNUNET_malloc (size);
368   h->msg = msg;
369   msg->header.size = htons (size);
370   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB);
371   msg->element_count = htonl (element_count);
372
373   vector = (int32_t*) & msg[1];
374   // copy each element over to the message
375   for (i = 0; i < element_count; i++)
376     vector[i] = htonl (elements[i]);
377
378   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
379
380   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
381                                                GNUNET_TIME_UNIT_FOREVER_REL,
382                                                GNUNET_YES, // retry is OK in the initial stage
383                                                &transmit_request, h);
384   if (!h->th)
385     {
386       LOG (GNUNET_ERROR_TYPE_ERROR,
387            _ ("Failed to send a message to the scalarproduct service\n"));
388       GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
389       GNUNET_CLIENT_disconnect (h->client);
390       GNUNET_free (h->msg);
391       GNUNET_free (h);
392       return NULL;
393     }
394   GNUNET_CONTAINER_DLL_insert (head, tail, h);
395   return h;
396 }
397
398
399 /**
400  * Request by Alice's client for computing a scalar product
401  *
402  * @param cfg the gnunet configuration handle
403  * @param key Session key should be unique to the requesting client
404  * @param peer PeerID of the other peer
405  * @param elements Array of elements of the vector
406  * @param element_count Number of elements in the vector
407  * @param mask Array of the mask
408  * @param mask_bytes number of bytes in the mask
409  * @param cont Callback function
410  * @param cont_cls Closure for the callback function
411  * 
412  * @return a new handle for this computation
413  */
414 struct GNUNET_SCALARPRODUCT_ComputationHandle *
415 GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle * cfg,
416                               const struct GNUNET_HashCode * key,
417                               const struct GNUNET_PeerIdentity *peer,
418                               const int32_t * elements,
419                               uint32_t element_count,
420                               const unsigned char * mask,
421                               uint32_t mask_bytes,
422                               GNUNET_SCALARPRODUCT_DatumProcessor cont,
423                               void * cont_cls)
424 {
425   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
426   struct GNUNET_SCALARPRODUCT_client_request *msg;
427   int32_t * vector;
428   uint16_t size;
429   uint64_t i;
430
431   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
432                  +element_count * sizeof (int32_t)
433                  + mask_bytes);
434
435   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
436   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
437   if (!h->client)
438     {
439       LOG (GNUNET_ERROR_TYPE_ERROR,
440            _ ("Failed to connect to the scalarproduct service\n"));
441       GNUNET_free (h);
442       return NULL;
443     }
444   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
445   if (!h->stats)
446     {
447       LOG (GNUNET_ERROR_TYPE_ERROR,
448            _ ("Failed to send a message to the statistics service\n"));
449       GNUNET_CLIENT_disconnect (h->client);
450       GNUNET_free (h);
451       return NULL;
452     }
453
454   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_bytes;
455
456   h->cont_datum = cont;
457   h->cont_cls = cont_cls;
458   h->response_proc = &process_result_message;
459   h->cfg = cfg;
460   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
461
462   msg = (struct GNUNET_SCALARPRODUCT_client_request*) GNUNET_malloc (size);
463   h->msg = msg;
464   msg->header.size = htons (size);
465   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
466   msg->element_count = htonl (element_count);
467   msg->mask_length = htonl (mask_bytes);
468
469   vector = (int32_t*) & msg[1];
470   // copy each element over to the message
471   for (i = 0; i < element_count; i++)
472     vector[i] = htonl (elements[i]);
473
474   memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
475   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
476   memcpy (&vector[element_count], mask, mask_bytes);
477
478   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
479                                                GNUNET_TIME_UNIT_FOREVER_REL,
480                                                GNUNET_YES, // retry is OK in the initial stage
481                                                &transmit_request, h);
482   if (!h->th)
483     {
484       LOG (GNUNET_ERROR_TYPE_ERROR,
485            _ ("Failed to send a message to the scalarproduct service\n"));
486       GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
487       GNUNET_CLIENT_disconnect (h->client);
488       GNUNET_free (h->msg);
489       GNUNET_free (h);
490       return NULL;
491     }
492   GNUNET_CONTAINER_DLL_insert (head, tail, h);
493   return h;
494 }
495
496
497 /**
498  * Cancel an ongoing computation or revoke our collaboration offer.
499  * Closes the connection to the service
500  *
501  * @param h computation handle to terminate
502  */
503 void
504 GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h)
505 {
506   struct GNUNET_SCALARPRODUCT_ComputationHandle * qe;
507
508   for (qe = head; head != NULL; qe = head)
509     {
510       if (qe == h)
511         {
512           GNUNET_CONTAINER_DLL_remove (head, tail, qe);
513           if (NULL != qe->th)
514             GNUNET_CLIENT_notify_transmit_ready_cancel (qe->th);
515           GNUNET_CLIENT_disconnect (qe->client);
516           GNUNET_STATISTICS_destroy (qe->stats, GNUNET_YES);
517           GNUNET_free_non_null (qe->msg);
518           GNUNET_free (qe);
519           break;
520         }
521     }
522 }
523 /**
524  * Cancel ALL ongoing computation or revoke our collaboration offer.
525  * Closes ALL connections to the service
526  */
527 void
528 GNUNET_SCALARPRODUCT_disconnect ()
529 {
530     struct GNUNET_SCALARPRODUCT_ComputationHandle * qe;
531
532     LOG (GNUNET_ERROR_TYPE_INFO, "Disconnecting from VectorProduct\n");
533     for (qe = head; head != NULL; qe = head)
534     {
535         GNUNET_CONTAINER_DLL_remove (head, tail, qe);
536         if (NULL != qe->th)
537             GNUNET_CLIENT_notify_transmit_ready_cancel (qe->th);
538         GNUNET_CLIENT_disconnect (qe->client);
539         GNUNET_STATISTICS_destroy (qe->stats, GNUNET_YES);
540         GNUNET_free_non_null (qe->msg);
541         GNUNET_free (qe);
542     }
543 }
544
545 /* end of scalarproduct_api.c */