removed much of the excell logics in the scalar product API
[oweals/gnunet.git] / src / scalarproduct / scalarproduct_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file scalarproduct/scalarproduct_api.c
23  * @brief API for the scalarproduct
24  * @author Christian Fuchs
25  * @author Gaurav Kukreja
26  * 
27  */
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_scalarproduct_service.h"
32 #include "gnunet_protocols.h"
33 #include "scalarproduct.h"
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
36
37 /**************************************************************
38  ***  Datatype Declarations                          **********
39  **************************************************************/
40
41 /**
42  * Entry in the request queue per client
43  */
44 struct GNUNET_SCALARPRODUCT_ComputationHandle
45 {
46   /**
47    * This is a linked list.
48    */
49   struct GNUNET_SCALARPRODUCT_ComputationHandle *next;
50
51   /**
52    * This is a linked list.
53    */
54   struct GNUNET_SCALARPRODUCT_ComputationHandle *prev;
55   
56   /**
57    * Our configuration.
58    */
59   const struct GNUNET_CONFIGURATION_Handle *cfg;
60
61   /**
62    * Current connection to the scalarproduct service.
63    */
64   struct GNUNET_CLIENT_Connection *client;
65
66   /**
67    * Handle for statistics.
68    */
69   struct GNUNET_STATISTICS_Handle *stats;
70
71   /**
72    * The shared session key identifying this computation
73    */
74   struct GNUNET_HashCode * key;
75     
76   /**
77    * Current transmit handle.
78    */
79   struct GNUNET_CLIENT_TransmitHandle *th;
80
81   /**
82    * Size of the message
83    */
84   uint16_t message_size;
85
86   /**
87    * Message to be sent to the scalarproduct service
88    */
89   struct GNUNET_SCALARPRODUCT_client_request * msg;
90
91   union
92   {
93     /**
94      * Function to call after transmission of the request.
95      */
96     GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
97
98     /**
99      * Function to call after transmission of the request.
100      */
101     GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
102   };
103
104   /**
105    * Closure for 'cont'.
106    */
107   void *cont_cls;
108
109   /**
110    * Response Processor for response from the service. This function calls the
111    * continuation function provided by the client.
112    */
113   GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
114 };
115
116 /**************************************************************
117  ***  Global Variables                               **********
118  **************************************************************/
119 /**
120  * Head of the active sessions queue
121  */
122 struct GNUNET_SCALARPRODUCT_ComputationHandle *head;
123 /**
124  * Tail of the active sessions queue
125  */
126 struct GNUNET_SCALARPRODUCT_ComputationHandle *tail;
127
128 /**************************************************************
129  ***  Function Declarations                          **********
130  **************************************************************/
131
132 /**
133  * Called when a response is received from the service. After basic check
134  * handler in qe->response_proc is called. This functions handles the response
135  * to the client which used the API.
136  * 
137  * @param cls Pointer to the Master Context
138  * @param msg Pointer to the data received in response
139  */
140 static void
141 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg);
142
143 /**
144  * Transmits the request to the VectorProduct Sevice
145  * 
146  * @param cls Closure
147  * @param size Size of the buffer
148  * @param buf Pointer to the buffer
149  * 
150  * @return Size of the message sent
151  */
152 static size_t transmit_request (void *cls, size_t size,
153                                 void *buf);
154
155 /**************************************************************
156  ***  Static Function Declarations                   **********
157  **************************************************************/
158
159 /**
160  * Handles the RESULT received in reply of prepare_response from the 
161  * service
162  * 
163  * @param cls Handle to the Master Context
164  * @param msg Pointer to the response received
165  */
166 static void
167 process_status_message (void *cls,
168                         const struct GNUNET_MessageHeader *msg,
169                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
170 {
171   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
172
173   GNUNET_assert (qe != NULL);
174
175   if (qe->cont_status != NULL)
176     qe->cont_status (qe->cont_cls, &qe->msg->key, status);
177 }
178
179
180 /**
181  * Handles the RESULT received in reply of prepare_response from the 
182  * service
183  * 
184  * @param cls Handle to the Master Context
185  * @param msg Pointer to the response received
186  */
187 static void
188 process_result_message (void *cls,
189                         const struct GNUNET_MessageHeader *msg,
190                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
191 {
192   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
193
194   GNUNET_assert (qe != NULL);
195
196   if (msg == NULL && qe->cont_datum != NULL)
197     {
198       LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout reached or session terminated.\n");
199     }
200   if (qe->cont_datum != NULL)
201     {
202       qe->cont_datum (qe->cont_cls, &qe->msg->key, &qe->msg->peer, status, (struct GNUNET_SCALARPRODUCT_client_response *) msg);
203     }
204 }
205
206
207 /**
208  * Called when a response is received from the service. After basic check
209  * handler in qe->response_proc is called. This functions handles the response
210  * to the client which used the API.
211  * 
212  * @param cls Pointer to the Master Context
213  * @param msg Pointer to the data received in response
214  */
215 static void
216 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
217 {
218   struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
219   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe;
220   int16_t was_transmitted;
221   struct GNUNET_SCALARPRODUCT_client_response *message =
222           (struct GNUNET_SCALARPRODUCT_client_response *) msg;
223
224   h->in_receive = GNUNET_NO;
225   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply from VectorProduct\n");
226
227   if (NULL == (qe = free_queue_head_entry (h)))
228     {
229       /**
230        * The queue head will be NULL if the client disconnected,
231        * * In case of Alice, client disconnected after sending request, before receiving response
232        * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
233        */
234       process_queue (h);
235       return;
236     }
237
238   if (h->client == NULL)
239     {
240       // GKUKREJA : handle this correctly
241       /**
242        * The queue head will be NULL if the client disconnected,
243        * * In case of Alice, client disconnected after sending request, before receiving response
244        * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
245        */
246       process_queue (h);
247       return;
248     }
249
250   was_transmitted = qe->was_transmitted;
251   // Control will only come here, when the request was transmitted to service,
252   // and service responded.
253   GNUNET_assert (was_transmitted == GNUNET_YES);
254
255   if (msg == NULL)
256     {
257       LOG (GNUNET_ERROR_TYPE_WARNING, "Service responded with NULL!\n");
258       qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
259     }
260   else if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT))
261     {
262       LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid Message Received\n");
263       qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_InvalidResponse);
264     }
265   else if (ntohl (message->product_length) == 0)
266     {
267       // response for the responder client, successful
268       GNUNET_STATISTICS_update (h->stats,
269                                 gettext_noop ("# SUC responder result messages received"), 1,
270                                 GNUNET_NO);
271
272       LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from service without product attached.\n");
273       qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
274     }
275   else if (ntohl (message->product_length) > 0)
276     {
277       // response for the requester client, successful
278       GNUNET_STATISTICS_update (h->stats,
279                                 gettext_noop ("# SUC requester result messages received"), 1,
280                                 GNUNET_NO);
281
282       LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from requester service for requester client.\n");
283       qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
284     }
285
286   GNUNET_free (qe);
287   process_queue (h);
288 }
289
290
291 /**
292  * Transmits the request to the VectorProduct Sevice
293  * 
294  * @param cls Closure
295  * @param size Size of the buffer
296  * @param buf Pointer to the buffer
297  * 
298  * @return Size of the message sent
299  */
300 static size_t
301 transmit_request (void *cls, size_t size,
302                   void *buf)
303 {
304   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
305   size_t msize;
306   
307   if (buf == NULL)
308     {
309       LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
310       GNUNET_STATISTICS_update (qe->stats,
311                                 gettext_noop ("# transmission request failures"),
312                                 1, GNUNET_NO);
313       GNUNET_SCALARPRODUCT_disconnect (qe);
314       return 0;
315     }
316   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n",
317        msize);
318
319   memcpy (buf, qe->msg, size);
320   GNUNET_free (qe->msg);
321   qe->was_transmitted = GNUNET_YES;
322
323   qe->th = NULL;
324
325   GNUNET_CLIENT_receive (h->client, &receive_cb, h,
326                          GNUNET_TIME_UNIT_FOREVER_REL);
327
328 #if INSANE_STATISTICS
329   GNUNET_STATISTICS_update (h->stats,
330                             gettext_noop ("# bytes sent to scalarproduct"), 1,
331                             GNUNET_NO);
332 #endif
333   return size;
334 }
335
336
337 /**************************************************************
338  ***  API                                            **********
339  **************************************************************/
340
341
342 /**
343  * Used by Bob's client to cooperate with Alice, 
344  * 
345  * @param h handle to the master context
346  * @param key Session key - unique to the requesting client
347  * @param elements Array of elements of the vector
348  * @param element_count Number of elements in the vector
349  * @param cont Callback function
350  * @param cont_cls Closure for the callback function
351  */
352 struct GNUNET_SCALARPRODUCT_ComputationHandle *
353 GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
354                                const struct GNUNET_HashCode * key,
355                                const int32_t * elements,
356                                uint32_t element_count,
357                                GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
358                                void *cont_cls)
359 {
360   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
361   struct GNUNET_SCALARPRODUCT_client_request *msg;
362   int32_t * vector;
363   uint16_t size;
364   uint64_t i;
365   
366   GNUNET_assert(key);
367   GNUNET_assert(elements);
368   GNUNET_assert(cont);
369   GNUNET_assert(element_count > 1);
370   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
371                                                    + element_count * sizeof (int32_t));
372   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
373   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
374   if (!h->client)
375     {
376       LOG (GNUNET_ERROR_TYPE_ERROR,
377            _ ("Failed to connect to the scalarproduct service\n"));
378       GNUNET_free(h);
379       return NULL;
380     }
381   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
382   if (!h->th){
383       LOG (GNUNET_ERROR_TYPE_ERROR,
384            _("Failed to send a message to the statistics service\n"));
385       GNUNET_CLIENT_disconnect(h->client);
386       GNUNET_free(h);
387       return NULL;
388   }
389   
390   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t);
391   
392   h->cont_datum = cont;
393   h->cont_cls = cont_cls;
394   h->response_proc = &process_result_message;
395   h->cfg = cfg;
396   h->msg = GNUNET_malloc (size);
397   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
398   
399   msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
400   msg->header.size = htons (size);
401   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
402   msg->element_count = htonl (element_count);
403   
404   vector = (int32_t*) &msg[1];
405   // copy each element over to the message
406   for (i = 0; i < element_count; i++)
407     vector[i] = htonl(elements[i]);
408
409   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
410   
411   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
412                                                GNUNET_TIME_UNIT_FOREVER_REL,
413                                                GNUNET_YES, // retry is OK in the initial stage
414                                                &transmit_request, h);
415   if (!h->th)
416     {
417       LOG (GNUNET_ERROR_TYPE_ERROR,
418            _ ("Failed to send a message to the scalarproduct service\n"));
419       GNUNET_STATISTICS_destroy(h->GNUNET_YES);
420       GNUNET_CLIENT_disconnect(h->client);
421       GNUNET_free(h->msg);
422       GNUNET_free(h);
423       return NULL;
424     }
425   GNUNET_CONTAINER_DLL_insert (head, tail, h);
426   return h;
427 }
428
429
430 /**
431  * Request by Alice's client for computing a scalar product
432  * 
433  * @param h handle to the master context
434  * @param key Session key - unique to the requesting client
435  * @param peer PeerID of the other peer
436  * @param elements Array of elements of the vector
437  * @param element_count Number of elements in the vector
438  * @param mask Array of the mask
439  * @param mask_bytes number of bytes in the mask
440  * @param cont Callback function
441  * @param cont_cls Closure for the callback function
442  */
443 struct GNUNET_SCALARPRODUCT_ComputationHandle *
444 GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
445                               const struct GNUNET_HashCode * key,
446                               const struct GNUNET_PeerIdentity *peer,
447                               const int32_t * elements,
448                               uint32_t element_count,
449                               const unsigned char * mask,
450                               uint32_t mask_bytes,
451                               GNUNET_SCALARPRODUCT_DatumProcessor cont,
452                               void *cont_cls)
453 {
454   struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
455   struct GNUNET_SCALARPRODUCT_client_request *msg;
456   int32_t * vector;
457   uint16_t size;
458   uint64_t i;
459   
460   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
461                                                    + element_count * sizeof (int32_t)
462                                                    + mask_length);
463   
464   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
465   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
466   if (!h->client)
467     {
468       LOG (GNUNET_ERROR_TYPE_ERROR,
469            _ ("Failed to connect to the scalarproduct service\n"));
470       GNUNET_free(h);
471       return NULL;
472     }
473   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
474   if (!h->th){
475       LOG (GNUNET_ERROR_TYPE_ERROR,
476            _("Failed to send a message to the statistics service\n"));
477       GNUNET_CLIENT_disconnect(h->client);
478       GNUNET_free(h);
479       return NULL;
480   }
481   
482   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length;
483   
484   h->cont_datum = cont;
485   h->cont_cls = cont_cls;
486   h->response_proc = &process_status_message;
487   h->cfg = cfg;
488   h->msg = GNUNET_malloc (size);
489   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
490   
491   msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
492   msg->header.size = htons (size);
493   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
494   msg->element_count = htons (element_count);
495   msg->mask_length = htons (mask_length);
496   
497   vector = (int32_t*) &msg[1];
498   // copy each element over to the message
499   for (i = 0; i < element_count; i++)
500     vector[i] = htonl(elements[i]);
501
502   memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
503   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
504   memcpy (&vector[element_count], mask, mask_length);
505   
506   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
507                                                GNUNET_TIME_UNIT_FOREVER_REL,
508                                                GNUNET_YES, // retry is OK in the initial stage
509                                                &transmit_request, h);
510   if (!h->th)
511     {
512       LOG (GNUNET_ERROR_TYPE_ERROR,
513            _ ("Failed to send a message to the scalarproduct service\n"));
514       GNUNET_STATISTICS_destroy(h->GNUNET_YES);
515       GNUNET_CLIENT_disconnect(h->client);
516       GNUNET_free(h->msg);
517       GNUNET_free(h);
518       return NULL;
519     }
520   GNUNET_CONTAINER_DLL_insert (head, tail, h);
521   return h;
522 }
523
524 /**
525  * Disconnect from the scalarproduct service.
526  * 
527  * @param h handle to the scalarproduct
528  */
529 void
530 GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_ComputationHandle * h)
531 {
532   struct GNUNET_SCALARPRODUCT_ComputationHandle * qe;
533
534   LOG (GNUNET_ERROR_TYPE_INFO,
535        "Disconnecting from VectorProduct\n");
536
537   for (qe = head; head != NULL; qe = head)
538     {
539       GNUNET_CONTAINER_DLL_remove (head, tail, qe);
540       if (NULL == qe->th)
541         GNUNET_CLIENT_notify_transmit_ready_cancel(qe->th);
542       GNUNET_CLIENT_disconnect (h->client);
543       GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
544       qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
545       GNUNET_free(qe->msg);
546       GNUNET_free(qe);
547     }
548 }
549
550 /* end of ext_api.c */