2 This file is part of GNUnet.
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file scalarproduct/scalarproduct_api.c
23 * @brief API for the scalarproduct
24 * @author Christian Fuchs
25 * @author Gaurav Kukreja
29 #include "gnunet_util_lib.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_scalarproduct_service.h"
32 #include "gnunet_protocols.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
36 /**************************************************************
37 *** Datatype Declarations **********
38 **************************************************************/
41 * Entry in the request queue per client
43 struct GNUNET_SCALARPRODUCT_QueueEntry
46 * This is a linked list.
48 struct GNUNET_SCALARPRODUCT_QueueEntry *next;
51 * This is a linked list.
53 struct GNUNET_SCALARPRODUCT_QueueEntry *prev;
56 * Handle to the master context.
58 struct GNUNET_SCALARPRODUCT_Handle *h;
63 uint16_t message_size;
66 * Message to be sent to the scalarproduct service
68 struct GNUNET_SCALARPRODUCT_client_request* msg;
73 * Function to call after transmission of the request.
75 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
78 * Function to call after transmission of the request.
80 GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
89 * Has this message been transmitted to the service?
90 * Only ever GNUNET_YES for the head of the queue.
91 * Note that the overall struct should end at a
92 * multiple of 64 bits.
94 int16_t was_transmitted;
97 * Response Processor for response from the service. This function calls the
98 * continuation function provided by the client.
100 GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
103 /**************************************************************
104 *** Function Declarations **********
105 **************************************************************/
108 * Creates a new entry at the tail of the DLL
110 * @param h handle to the master context
112 * @return pointer to the entry
114 static struct GNUNET_SCALARPRODUCT_QueueEntry *
115 make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h);
118 * Removes the head entry from the queue
120 * @param h Handle to the master context
122 static struct GNUNET_SCALARPRODUCT_QueueEntry *
123 free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h);
126 * Triggered when timeout occurs for a request in queue
128 * @param cls The pointer to the QueueEntry
129 * @param tc Task Context
132 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
135 * Called when a response is received from the service. After basic check
136 * handler in qe->response_proc is called. This functions handles the response
137 * to the client which used the API.
139 * @param cls Pointer to the Master Context
140 * @param msg Pointer to the data received in response
143 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg);
146 * Transmits the request to the VectorProduct Sevice
149 * @param size Size of the buffer
150 * @param buf Pointer to the buffer
152 * @return Size of the message sent
154 static size_t transmit_request (void *cls, size_t size,
158 * Issues transmit request for the new entries in the queue
160 * @param h handle to the master context
163 process_queue (struct GNUNET_SCALARPRODUCT_Handle *h);
165 /**************************************************************
166 *** Static Function Declarations **********
167 **************************************************************/
171 * Creates a new entry at the tail of the DLL
173 * @param h handle to the master context
175 * @return pointer to the entry
177 static struct GNUNET_SCALARPRODUCT_QueueEntry *
178 make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h)
180 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
182 qe = GNUNET_new (struct GNUNET_SCALARPRODUCT_QueueEntry);
185 if (NULL == h->queue_head && NULL == h->queue_tail)
194 qe->prev = h->queue_tail;
195 h->queue_tail->next = qe;
204 * Removes the head entry from the queue
206 * @param h Handle to the master context
208 static struct GNUNET_SCALARPRODUCT_QueueEntry *
209 free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h)
211 struct GNUNET_SCALARPRODUCT_QueueEntry * qe = NULL;
213 GNUNET_assert (NULL != h);
214 if (NULL == h->queue_head && NULL == h->queue_tail)
216 // The queue is empty. Just return.
218 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n");
220 else if (h->queue_head == h->queue_tail) //only one entry
225 h->queue_head = NULL;
226 h->queue_tail = NULL;
231 h->queue_head = h->queue_head->next;
232 h->queue_head->prev = NULL;
241 * Triggered when timeout occurs for a request in queue
243 * @param cls The pointer to the QueueEntry
244 * @param tc Task Context
247 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
249 struct GNUNET_SCALARPRODUCT_QueueEntry * qe = cls;
252 GNUNET_STATISTICS_update (qe->h->stats,
253 gettext_noop ("# queue entry timeouts"), 1,
256 // Clear the timeout_task
257 qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
259 // transmit_request is supposed to cancel timeout task.
260 // If message was not transmitted, there is definitely an error.
261 GNUNET_assert (GNUNET_NO == qe->was_transmitted);
263 LOG (GNUNET_ERROR_TYPE_INFO, "Timeout of request in datastore queue\n");
265 // remove the queue_entry for the queue
266 GNUNET_CONTAINER_DLL_remove (qe->h->queue_head, qe->h->queue_tail, qe);
267 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Timeout);
272 * Handles the RESULT received in reply of prepare_response from the
275 * @param cls Handle to the Master Context
276 * @param msg Pointer to the response received
279 process_status_message (void *cls,
280 const struct GNUNET_MessageHeader *msg,
281 enum GNUNET_SCALARPRODUCT_ResponseStatus status)
283 struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
285 GNUNET_assert (qe != NULL);
287 if (qe->cont_status != NULL)
288 qe->cont_status (qe->cont_cls, &qe->msg->key, status);
293 * Handles the RESULT received in reply of prepare_response from the
296 * @param cls Handle to the Master Context
297 * @param msg Pointer to the response received
300 process_result_message (void *cls,
301 const struct GNUNET_MessageHeader *msg,
302 enum GNUNET_SCALARPRODUCT_ResponseStatus status)
304 struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
306 GNUNET_assert (qe != NULL);
308 if (msg == NULL && qe->cont_datum != NULL)
310 LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout reached or session terminated.\n");
312 if (qe->cont_datum != NULL)
314 qe->cont_datum (qe->cont_cls, &qe->msg->key, &qe->msg->peer, status, (struct GNUNET_SCALARPRODUCT_client_response *) msg);
320 * Called when a response is received from the service. After basic check
321 * handler in qe->response_proc is called. This functions handles the response
322 * to the client which used the API.
324 * @param cls Pointer to the Master Context
325 * @param msg Pointer to the data received in response
328 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
330 struct GNUNET_SCALARPRODUCT_Handle *h = cls;
331 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
332 int16_t was_transmitted;
333 struct GNUNET_SCALARPRODUCT_client_response *message =
334 (struct GNUNET_SCALARPRODUCT_client_response *) msg;
336 h->in_receive = GNUNET_NO;
337 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply from VectorProduct\n");
339 if (NULL == (qe = free_queue_head_entry (h)))
342 * The queue head will be NULL if the client disconnected,
343 * * In case of Alice, client disconnected after sending request, before receiving response
344 * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
350 if (h->client == NULL)
352 // GKUKREJA : handle this correctly
354 * The queue head will be NULL if the client disconnected,
355 * * In case of Alice, client disconnected after sending request, before receiving response
356 * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
362 was_transmitted = qe->was_transmitted;
363 // Control will only come here, when the request was transmitted to service,
364 // and service responded.
365 GNUNET_assert (was_transmitted == GNUNET_YES);
369 LOG (GNUNET_ERROR_TYPE_WARNING, "Service responded with NULL!\n");
370 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
372 else if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT))
374 LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid Message Received\n");
375 qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_InvalidResponse);
377 else if (ntohl (message->product_length) == 0)
379 // response for the responder client, successful
380 GNUNET_STATISTICS_update (h->stats,
381 gettext_noop ("# SUC responder result messages received"), 1,
384 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from service without product attached.\n");
385 qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
387 else if (ntohl (message->product_length) > 0)
389 // response for the requester client, successful
390 GNUNET_STATISTICS_update (h->stats,
391 gettext_noop ("# SUC requester result messages received"), 1,
394 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from requester service for requester client.\n");
395 qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
404 * Transmits the request to the VectorProduct Sevice
407 * @param size Size of the buffer
408 * @param buf Pointer to the buffer
410 * @return Size of the message sent
413 transmit_request (void *cls, size_t size,
416 struct GNUNET_SCALARPRODUCT_Handle *h = cls;
417 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
420 if (NULL == (qe = h->queue_head))
422 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue head is NULL!\n\n");
426 GNUNET_SCHEDULER_cancel (qe->timeout_task);
427 qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
430 if (NULL == (qe = h->queue_head))
431 return 0; /* no entry in queue */
434 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
435 GNUNET_STATISTICS_update (h->stats,
436 gettext_noop ("# transmission request failures"),
438 GNUNET_SCALARPRODUCT_disconnect (h);
441 if (size < (msize = qe->message_size))
446 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n",
449 memcpy (buf, qe->msg, size);
450 GNUNET_free (qe->msg);
451 qe->was_transmitted = GNUNET_YES;
453 GNUNET_assert (GNUNET_NO == h->in_receive);
454 h->in_receive = GNUNET_YES;
456 GNUNET_CLIENT_receive (h->client, &receive_cb, h,
457 GNUNET_TIME_UNIT_FOREVER_REL);
459 #if INSANE_STATISTICS
460 GNUNET_STATISTICS_update (h->stats,
461 gettext_noop ("# bytes sent to scalarproduct"), 1,
469 * Issues transmit request for the new entries in the queue
471 * @param h handle to the master context
474 process_queue (struct GNUNET_SCALARPRODUCT_Handle *h)
476 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
478 if (NULL == (qe = h->queue_head))
480 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
481 return; /* no entry in queue */
483 if (qe->was_transmitted == GNUNET_YES)
485 LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
486 return; /* waiting for replies */
490 LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
491 return; /* request pending */
493 if (h->client == NULL)
495 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
496 return; /* waiting for reconnect */
498 if (GNUNET_YES == h->in_receive)
500 /* wait for response to previous query */
505 GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
506 GNUNET_TIME_UNIT_FOREVER_REL,
508 &transmit_request, h);
512 LOG (GNUNET_ERROR_TYPE_ERROR,
513 _ ("Failed to send a message to the scalarproduct service\n"));
517 GNUNET_assert (GNUNET_NO == h->in_receive);
518 GNUNET_break (NULL != h->th);
523 /**************************************************************
525 **************************************************************/
529 * Used by Bob's client to cooperate with Alice,
531 * @param h handle to the master context
532 * @param key Session key - unique to the requesting client
533 * @param elements Array of elements of the vector
534 * @param element_count Number of elements in the vector
535 * @param cont Callback function
536 * @param cont_cls Closure for the callback function
538 struct GNUNET_SCALARPRODUCT_Handle *
539 GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
540 const struct GNUNET_HashCode * key,
541 const int32_t * elements,
542 uint32_t element_count,
543 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
546 struct GNUNET_SCALARPRODUCT_Handle *h;
547 struct GNUNET_SCALARPRODUCT_client_request *msg;
553 GNUNET_assert(elements);
555 GNUNET_assert(element_count > 1);
556 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
557 + element_count * sizeof (int32_t));
558 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
559 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
562 LOG (GNUNET_ERROR_TYPE_ERROR,
563 _ ("Failed to connect to the scalarproduct service\n"));
567 h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
569 LOG (GNUNET_ERROR_TYPE_ERROR,
570 _("Failed to send a message to the statistics service\n"));
571 GNUNET_CLIENT_disconnect(h->client);
576 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t);
578 h->cont_datum = cont;
579 h->cont_cls = cont_cls;
580 h->response_proc = &process_result_message;
582 h->msg = GNUNET_malloc (size);
583 memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
585 msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
586 msg->header.size = htons (size);
587 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
588 msg->element_count = htonl (element_count);
590 vector = (int32_t*) &msg[1];
591 // copy each element over to the message
592 for (i = 0; i < element_count; i++)
593 vector[i] = htonl(elements[i]);
595 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
598 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
599 GNUNET_TIME_UNIT_FOREVER_REL,
600 GNUNET_YES, // retry is OK in the initial stage
601 &transmit_request, h);
604 LOG (GNUNET_ERROR_TYPE_ERROR,
605 _ ("Failed to send a message to the scalarproduct service\n"));
606 GNUNET_STATISTICS_destroy(h->GNUNET_YES);
607 GNUNET_CLIENT_disconnect(h->client);
617 * Request by Alice's client for computing a scalar product
619 * @param h handle to the master context
620 * @param key Session key - unique to the requesting client
621 * @param peer PeerID of the other peer
622 * @param elements Array of elements of the vector
623 * @param element_count Number of elements in the vector
624 * @param mask Array of the mask
625 * @param mask_bytes number of bytes in the mask
626 * @param cont Callback function
627 * @param cont_cls Closure for the callback function
629 struct GNUNET_SCALARPRODUCT_Handle *
630 GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
631 const struct GNUNET_HashCode * key,
632 const struct GNUNET_PeerIdentity *peer,
633 const int32_t * elements,
634 uint32_t element_count,
635 const unsigned char * mask,
637 GNUNET_SCALARPRODUCT_DatumProcessor cont,
640 struct GNUNET_CLIENT_Connection *client;
641 struct GNUNET_SCALARPRODUCT_Handle *h;
642 struct GNUNET_SCALARPRODUCT_client_request *msg;
649 GNUNET_assert(elements);
652 GNUNET_assert(element_count > 1);
653 GNUNET_assert(mask_bytes != 0);
654 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
655 + element_count * sizeof (int32_t)
657 client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
661 LOG (GNUNET_ERROR_TYPE_ERROR,
662 _ ("Failed to connect to the scalarproduct service\n"));
665 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length;
667 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
668 h->cont_datum = cont;
669 h->cont_cls = cont_cls;
670 h->response_proc = &process_status_message;
673 h->msg = GNUNET_malloc (size);
674 memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
676 msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
677 msg->header.size = htons (size);
678 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
679 msg->element_count = htons (element_count);
680 msg->mask_length = htons (mask_length);
682 vector = (int32_t*) &msg[1];
683 // copy each element over to the message
684 for (i = 0; i < element_count; i++)
685 vector[i] = htonl(elements[i]);
687 memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
688 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
689 memcpy (&vector[element_count], mask, mask_length);
691 h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
692 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
693 GNUNET_TIME_UNIT_FOREVER_REL,
694 GNUNET_YES, // retry is OK in the initial stage
695 &transmit_request, h);
698 LOG (GNUNET_ERROR_TYPE_ERROR,
699 _ ("Failed to send a message to the scalarproduct service\n"));
706 * Disconnect from the scalarproduct service.
708 * @param h handle to the scalarproduct
711 GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_Handle * h)
713 struct GNUNET_SCALARPRODUCT_QueueEntry * qe;
715 LOG (GNUNET_ERROR_TYPE_INFO,
716 "Disconnecting from VectorProduct\n");
718 while (NULL != h->queue_head)
720 GNUNET_assert (NULL != (qe = free_queue_head_entry (h)));
721 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
724 if (h->client != NULL)
726 GNUNET_CLIENT_disconnect (h->client);
730 GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
734 /* end of ext_api.c */