rewrote API minus cancel function
[oweals/gnunet.git] / src / scalarproduct / scalarproduct_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file scalarproduct/scalarproduct_api.c
23  * @brief API for the scalarproduct
24  * @author Christian Fuchs
25  * @author Gaurav Kukreja
26  * 
27  */
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_scalarproduct_service.h"
32 #include "gnunet_protocols.h"
33 #include "scalarproduct.h"
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
36
37 /**************************************************************
38  ***  Datatype Declarations                          **********
39  **************************************************************/
40
41 /**
42  * Entry in the request queue per client
43  */
44 struct GNUNET_SCALARPRODUCT_ComputationHandle
45 {
46   /**
47    * This is a linked list.
48    */
49   struct GNUNET_SCALARPRODUCT_ComputationHandle *next;
50
51   /**
52    * This is a linked list.
53    */
54   struct GNUNET_SCALARPRODUCT_ComputationHandle *prev;
55   
56   /**
57    * Our configuration.
58    */
59   const struct GNUNET_CONFIGURATION_Handle *cfg;
60
61   /**
62    * Current connection to the scalarproduct service.
63    */
64   struct GNUNET_CLIENT_Connection *client;
65
66   /**
67    * Handle for statistics.
68    */
69   struct GNUNET_STATISTICS_Handle *stats;
70
71   /**
72    * The shared session key identifying this computation
73    */
74   struct GNUNET_HashCode * key;
75     
76   /**
77    * Current transmit handle.
78    */
79   struct GNUNET_CLIENT_TransmitHandle *th;
80
81   /**
82    * Size of the message
83    */
84   uint16_t message_size;
85
86   /**
87    * Message to be sent to the scalarproduct service
88    */
89   struct GNUNET_SCALARPRODUCT_client_request * msg;
90
91   union
92   {
93     /**
94      * Function to call after transmission of the request.
95      */
96     GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
97
98     /**
99      * Function to call after transmission of the request.
100      */
101     GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
102   };
103
104   /**
105    * Closure for 'cont'.
106    */
107   void *cont_cls;
108
109   /**
110    * Response Processor for response from the service. This function calls the
111    * continuation function provided by the client.
112    */
113   GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
114 };
115
116 /**************************************************************
117  ***  Global Variables                               **********
118  **************************************************************/
119 /**
120  * Head of the active sessions queue
121  */
122 static struct GNUNET_SCALARPRODUCT_ComputationHandle *head;
123 /**
124  * Tail of the active sessions queue
125  */
126 static struct GNUNET_SCALARPRODUCT_ComputationHandle *tail;
127
128 /**************************************************************
129  ***  Function Declarations                          **********
130  **************************************************************/
131
132 /**
133  * Called when a response is received from the service. After basic check
134  * handler in qe->response_proc is called. This functions handles the response
135  * to the client which used the API.
136  * 
137  * @param cls Pointer to the Master Context
138  * @param msg Pointer to the data received in response
139  */
140 static void
141 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg);
142
143 /**
144  * Transmits the request to the VectorProduct Sevice
145  * 
146  * @param cls Closure
147  * @param size Size of the buffer
148  * @param buf Pointer to the buffer
149  * 
150  * @return Size of the message sent
151  */
152 static size_t transmit_request (void *cls, size_t size,
153                                 void *buf);
154
155 /**************************************************************
156  ***  Static Function Declarations                   **********
157  **************************************************************/
158
159 /**
160  * Handles the RESULT received in reply of prepare_response from the 
161  * service
162  * 
163  * @param cls Handle to the Master Context
164  * @param msg Pointer to the response received
165  */
166 static void
167 process_status_message (void *cls,
168                         const struct GNUNET_MessageHeader *msg,
169                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
170 {
171   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
172
173   qe->cont_status (qe->cont_cls, status);
174 }
175
176
177 /**
178  * Handles the RESULT received in reply of prepare_response from the 
179  * service
180  * 
181  * @param cls Handle to the Master Context
182  * @param msg Pointer to the response received
183  */
184 static void
185 process_result_message (void *cls,
186                         const struct GNUNET_MessageHeader *msg,
187                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
188 {
189   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
190   const struct GNUNET_SCALARPRODUCT_client_response *message =
191           (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
192   gcry_mpi_t result = NULL;
193
194   if (GNUNET_SCALARPRODUCT_Status_Success == status
195       && qe->cont_datum != NULL)
196     {
197       size_t product_len = ntohl(message->product_length);
198       result = gcry_mpi_new(0);
199       
200       if (0 < product_len)
201         {
202           gcry_mpi_t num;
203           size_t read = 0;
204
205           if (0 != gcry_mpi_scan (&num, GCRYMPI_FMT_USG, &msg[1], product_len, &read)){
206               LOG (GNUNET_ERROR_TYPE_ERROR, "Could not convert to mpi to value!\n");
207               gcry_mpi_release(result);
208               result = NULL;
209               status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
210             }
211           else
212             {
213               if (message->range > 0)
214                 gcry_mpi_add(result, result, num);
215               else
216                 gcry_mpi_sub(result, result, num);
217               gcry_mpi_release(num);
218             }
219         }
220     }
221     qe->cont_datum (qe->cont_cls, status, result);
222 }
223
224
225 /**
226  * Called when a response is received from the service. After basic check
227  * handler in qe->response_proc is called. This functions handles the response
228  * to the client which used the API.
229  * 
230  * @param cls Pointer to the Master Context
231  * @param msg Pointer to the data received in response
232  */
233 static void
234 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
235 {
236   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
237   const struct GNUNET_SCALARPRODUCT_client_response *message =
238           (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
239   enum GNUNET_SCALARPRODUCT_ResponseStatus status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
240
241   if (NULL == msg)
242     {
243       LOG (GNUNET_ERROR_TYPE_WARNING, "Disconnected by Service.\n");
244       status = GNUNET_SCALARPRODUCT_Status_ServiceDisconnected;
245     }
246   else if ( GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT != ntohs (msg->type))
247     {
248       LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid message type received\n");
249     }
250   else if (0 < ntohl (message->product_length) || (0 == message->range))
251     {
252       // response for the responder client, successful
253       GNUNET_STATISTICS_update (qe->stats,
254                                 gettext_noop ("# SUC responder result messages received"), 1,
255                                 GNUNET_NO);
256
257       status = GNUNET_SCALARPRODUCT_Status_Success;
258     }
259   
260   if (qe->cont_datum != NULL)
261     qe->response_proc (qe, msg, status);
262
263   GNUNET_free (qe);
264 }
265
266
267 /**
268  * Transmits the request to the VectorProduct Sevice
269  * 
270  * @param cls Closure
271  * @param size Size of the buffer
272  * @param buf Pointer to the buffer
273  * 
274  * @return Size of the message sent
275  */
276 static size_t
277 transmit_request (void *cls, size_t size,
278                   void *buf)
279 {
280   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
281   
282   if (NULL == buf)
283     {
284       LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
285       GNUNET_STATISTICS_update (qe->stats,
286                                 gettext_noop ("# transmission request failures"),
287                                 1, GNUNET_NO);
288       
289       // notify caller about the error, done here.
290       if (qe->cont_datum != NULL)
291         qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
292       GNUNET_SCALARPRODUCT_cancel(cls);
293       return 0;
294     }
295   memcpy (buf, qe->msg, size);
296   
297   GNUNET_free (qe->msg);
298   qe->msg = NULL;
299   qe->th = NULL;
300
301   GNUNET_CLIENT_receive (qe->client, &receive_cb, qe,
302                          GNUNET_TIME_UNIT_FOREVER_REL);
303
304 #if INSANE_STATISTICS
305   GNUNET_STATISTICS_update (qe->stats,
306                             gettext_noop ("# bytes sent to scalarproduct"), 1,
307                             GNUNET_NO);
308 #endif
309   return size;
310 }
311
312
313 /**************************************************************
314  ***  API                                            **********
315  **************************************************************/
316
317
318 /**
319  * Used by Bob's client to cooperate with Alice, 
320  * 
321  * @param h handle to the master context
322  * @param key Session key - unique to the requesting client
323  * @param elements Array of elements of the vector
324  * @param element_count Number of elements in the vector
325  * @param cont Callback function
326  * @param cont_cls Closure for the callback function
327  */
328 struct GNUNET_SCALARPRODUCT_ComputationHandle *
329 GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
330                                const struct GNUNET_HashCode * key,
331                                const int32_t * elements,
332                                uint32_t element_count,
333                                GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
334                                void *cont_cls)
335 {
336   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
337   struct GNUNET_SCALARPRODUCT_client_request *msg;
338   int32_t * vector;
339   uint16_t size;
340   uint64_t i;
341   
342   GNUNET_assert(key);
343   GNUNET_assert(elements);
344   GNUNET_assert(cont);
345   GNUNET_assert(element_count > 1);
346   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
347                                                    + element_count * sizeof (int32_t));
348   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
349   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
350   if (!h->client)
351     {
352       LOG (GNUNET_ERROR_TYPE_ERROR,
353            _ ("Failed to connect to the scalarproduct service\n"));
354       GNUNET_free(h);
355       return NULL;
356     }
357   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
358   if (!h->th){
359       LOG (GNUNET_ERROR_TYPE_ERROR,
360            _("Failed to send a message to the statistics service\n"));
361       GNUNET_CLIENT_disconnect(h->client);
362       GNUNET_free(h);
363       return NULL;
364   }
365   
366   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t);
367   
368   h->cont_status = cont;
369   h->cont_cls = cont_cls;
370   h->response_proc = &process_result_message;
371   h->cfg = cfg;
372   h->msg = GNUNET_malloc (size);
373   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
374   
375   msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
376   msg->header.size = htons (size);
377   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
378   msg->element_count = htonl (element_count);
379   
380   vector = (int32_t*) &msg[1];
381   // copy each element over to the message
382   for (i = 0; i < element_count; i++)
383     vector[i] = htonl(elements[i]);
384
385   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
386   
387   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
388                                                GNUNET_TIME_UNIT_FOREVER_REL,
389                                                GNUNET_YES, // retry is OK in the initial stage
390                                                &transmit_request, h);
391   if (!h->th)
392     {
393       LOG (GNUNET_ERROR_TYPE_ERROR,
394            _ ("Failed to send a message to the scalarproduct service\n"));
395       GNUNET_STATISTICS_destroy(h->stats, GNUNET_YES);
396       GNUNET_CLIENT_disconnect(h->client);
397       GNUNET_free(h->msg);
398       GNUNET_free(h);
399       return NULL;
400     }
401   GNUNET_CONTAINER_DLL_insert (head, tail, h);
402   return h;
403 }
404
405
406 /**
407  * Request by Alice's client for computing a scalar product
408  * 
409  * @param h handle to the master context
410  * @param key Session key - unique to the requesting client
411  * @param peer PeerID of the other peer
412  * @param elements Array of elements of the vector
413  * @param element_count Number of elements in the vector
414  * @param mask Array of the mask
415  * @param mask_bytes number of bytes in the mask
416  * @param cont Callback function
417  * @param cont_cls Closure for the callback function
418  */
419 struct GNUNET_SCALARPRODUCT_ComputationHandle *
420 GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
421                               const struct GNUNET_HashCode * key,
422                               const struct GNUNET_PeerIdentity *peer,
423                               const int32_t * elements,
424                               uint32_t element_count,
425                               const unsigned char * mask,
426                               uint32_t mask_bytes,
427                               GNUNET_SCALARPRODUCT_DatumProcessor cont,
428                               void *cont_cls)
429 {
430   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
431   struct GNUNET_SCALARPRODUCT_client_request *msg;
432   int32_t * vector;
433   uint16_t size;
434   uint64_t i;
435   
436   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
437                                                    + element_count * sizeof (int32_t)
438                                                    + mask_bytes);
439   
440   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
441   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
442   if (!h->client)
443     {
444       LOG (GNUNET_ERROR_TYPE_ERROR,
445            _ ("Failed to connect to the scalarproduct service\n"));
446       GNUNET_free(h);
447       return NULL;
448     }
449   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
450   if (!h->th){
451       LOG (GNUNET_ERROR_TYPE_ERROR,
452            _("Failed to send a message to the statistics service\n"));
453       GNUNET_CLIENT_disconnect(h->client);
454       GNUNET_free(h);
455       return NULL;
456   }
457   
458   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_bytes;
459   
460   h->cont_datum = cont;
461   h->cont_cls = cont_cls;
462   h->response_proc = &process_status_message;
463   h->cfg = cfg;
464   h->msg = GNUNET_malloc (size);
465   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
466   
467   msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
468   msg->header.size = htons (size);
469   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
470   msg->element_count = htons (element_count);
471   msg->mask_length = htons (mask_bytes);
472   
473   vector = (int32_t*) &msg[1];
474   // copy each element over to the message
475   for (i = 0; i < element_count; i++)
476     vector[i] = htonl(elements[i]);
477
478   memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
479   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
480   memcpy (&vector[element_count], mask, mask_bytes);
481   
482   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
483                                                GNUNET_TIME_UNIT_FOREVER_REL,
484                                                GNUNET_YES, // retry is OK in the initial stage
485                                                &transmit_request, h);
486   if (!h->th)
487     {
488       LOG (GNUNET_ERROR_TYPE_ERROR,
489            _ ("Failed to send a message to the scalarproduct service\n"));
490       GNUNET_STATISTICS_destroy(h->stats, GNUNET_YES);
491       GNUNET_CLIENT_disconnect(h->client);
492       GNUNET_free(h->msg);
493       GNUNET_free(h);
494       return NULL;
495     }
496   GNUNET_CONTAINER_DLL_insert (head, tail, h);
497   return h;
498 }
499
500 /**
501  * Disconnect from the scalarproduct service.
502  * 
503  * @param h a computation handle to cancel
504  */
505 void
506 GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h)
507 {
508   struct GNUNET_SCALARPRODUCT_ComputationHandle * qe;
509
510   for (qe = head; head != NULL; qe = head)
511     {
512       if (qe == h)
513         {
514           GNUNET_CONTAINER_DLL_remove (head, tail, qe);
515           LOG (GNUNET_ERROR_TYPE_INFO,
516                "Disconnecting from VectorProduct\n");
517           if (NULL == qe->th)
518             GNUNET_CLIENT_notify_transmit_ready_cancel (qe->th);
519           GNUNET_CLIENT_disconnect (h->client);
520           GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
521           GNUNET_free (qe->msg);
522           GNUNET_free (qe);
523           break;
524         }
525     }
526 }
527
528 /* end of ext_api.c */