2 This file is part of GNUnet.
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file scalarproduct/scalarproduct_api.c
23 * @brief API for the scalarproduct
24 * @author Christian Fuchs
25 * @author Gaurav Kukreja
29 #include "gnunet_util_lib.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_scalarproduct_service.h"
32 #include "gnunet_protocols.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
36 /**************************************************************
37 *** Datatype Declarations **********
38 **************************************************************/
41 * Entry in the request queue per client
43 struct GNUNET_SCALARPRODUCT_QueueEntry
46 * This is a linked list.
48 struct GNUNET_SCALARPRODUCT_QueueEntry *next;
51 * This is a linked list.
53 struct GNUNET_SCALARPRODUCT_QueueEntry *prev;
56 * Handle to the master context.
58 struct GNUNET_SCALARPRODUCT_Handle *h;
63 uint16_t message_size;
66 * Message to be sent to the scalarproduct service
68 struct GNUNET_SCALARPRODUCT_client_request* msg;
73 * Function to call after transmission of the request.
75 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
78 * Function to call after transmission of the request.
80 GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
89 * Has this message been transmitted to the service?
90 * Only ever GNUNET_YES for the head of the queue.
91 * Note that the overall struct should end at a
92 * multiple of 64 bits.
94 int16_t was_transmitted;
97 * Timeout for the current operation.
99 struct GNUNET_TIME_Absolute timeout;
102 * Task for timeout signaling.
104 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
107 * Response Processor for response from the service. This function calls the
108 * continuation function provided by the client.
110 GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
113 /**************************************************************
114 *** Function Declarations **********
115 **************************************************************/
118 * Creates a new entry at the tail of the DLL
120 * @param h handle to the master context
122 * @return pointer to the entry
124 static struct GNUNET_SCALARPRODUCT_QueueEntry *
125 make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h);
128 * Removes the head entry from the queue
130 * @param h Handle to the master context
132 static struct GNUNET_SCALARPRODUCT_QueueEntry *
133 free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h);
136 * Triggered when timeout occurs for a request in queue
138 * @param cls The pointer to the QueueEntry
139 * @param tc Task Context
142 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
145 * Called when a response is received from the service. After basic check
146 * handler in qe->response_proc is called. This functions handles the response
147 * to the client which used the API.
149 * @param cls Pointer to the Master Context
150 * @param msg Pointer to the data received in response
153 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg);
156 * Transmits the request to the VectorProduct Sevice
159 * @param size Size of the buffer
160 * @param buf Pointer to the buffer
162 * @return Size of the message sent
164 static size_t transmit_request (void *cls, size_t size,
168 * Issues transmit request for the new entries in the queue
170 * @param h handle to the master context
173 process_queue (struct GNUNET_SCALARPRODUCT_Handle *h);
175 /**************************************************************
176 *** Static Function Declarations **********
177 **************************************************************/
181 * Creates a new entry at the tail of the DLL
183 * @param h handle to the master context
185 * @return pointer to the entry
187 static struct GNUNET_SCALARPRODUCT_QueueEntry *
188 make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h)
190 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
192 qe = GNUNET_new (struct GNUNET_SCALARPRODUCT_QueueEntry);
195 if (NULL == h->queue_head && NULL == h->queue_tail)
204 qe->prev = h->queue_tail;
205 h->queue_tail->next = qe;
214 * Removes the head entry from the queue
216 * @param h Handle to the master context
218 static struct GNUNET_SCALARPRODUCT_QueueEntry *
219 free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h)
221 struct GNUNET_SCALARPRODUCT_QueueEntry * qe = NULL;
223 GNUNET_assert (NULL != h);
224 if (NULL == h->queue_head && NULL == h->queue_tail)
226 // The queue is empty. Just return.
227 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n");
229 else if (h->queue_head == h->queue_tail) //only one entry
234 h->queue_head = NULL;
235 h->queue_tail = NULL;
240 h->queue_head = h->queue_head->next;
241 h->queue_head->prev = NULL;
250 * Triggered when timeout occurs for a request in queue
252 * @param cls The pointer to the QueueEntry
253 * @param tc Task Context
256 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
258 struct GNUNET_SCALARPRODUCT_QueueEntry * qe = cls;
261 GNUNET_STATISTICS_update (qe->h->stats,
262 gettext_noop ("# queue entry timeouts"), 1,
265 // Clear the timeout_task
266 qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
268 // transmit_request is supposed to cancel timeout task.
269 // If message was not transmitted, there is definitely an error.
270 GNUNET_assert (GNUNET_NO == qe->was_transmitted);
272 LOG (GNUNET_ERROR_TYPE_INFO, "Timeout of request in datastore queue\n");
274 // remove the queue_entry for the queue
275 GNUNET_CONTAINER_DLL_remove (qe->h->queue_head, qe->h->queue_tail, qe);
276 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Timeout);
281 * Handles the RESULT received in reply of prepare_response from the
284 * @param cls Handle to the Master Context
285 * @param msg Pointer to the response received
288 process_status_message (void *cls,
289 const struct GNUNET_MessageHeader *msg,
290 enum GNUNET_SCALARPRODUCT_ResponseStatus status)
292 struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
294 GNUNET_assert (qe != NULL);
296 if (qe->cont_status != NULL)
297 qe->cont_status (qe->cont_cls, &qe->msg->key, status);
302 * Handles the RESULT received in reply of prepare_response from the
305 * @param cls Handle to the Master Context
306 * @param msg Pointer to the response received
309 process_result_message (void *cls,
310 const struct GNUNET_MessageHeader *msg,
311 enum GNUNET_SCALARPRODUCT_ResponseStatus status)
313 struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
315 GNUNET_assert (qe != NULL);
317 if (msg == NULL && qe->cont_datum != NULL)
319 LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout reached or session terminated.\n");
321 if (qe->cont_datum != NULL)
323 qe->cont_datum (qe->cont_cls, &qe->msg->key, &qe->msg->peer, status, (struct GNUNET_SCALARPRODUCT_client_response *) msg);
329 * Called when a response is received from the service. After basic check
330 * handler in qe->response_proc is called. This functions handles the response
331 * to the client which used the API.
333 * @param cls Pointer to the Master Context
334 * @param msg Pointer to the data received in response
337 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
339 struct GNUNET_SCALARPRODUCT_Handle *h = cls;
340 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
341 int16_t was_transmitted;
342 struct GNUNET_SCALARPRODUCT_client_response *message =
343 (struct GNUNET_SCALARPRODUCT_client_response *) msg;
345 h->in_receive = GNUNET_NO;
346 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply from VectorProduct\n");
348 if (NULL == (qe = free_queue_head_entry (h)))
351 * The queue head will be NULL if the client disconnected,
352 * * In case of Alice, client disconnected after sending request, before receiving response
353 * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
359 if (h->client == NULL)
361 // GKUKREJA : handle this correctly
363 * The queue head will be NULL if the client disconnected,
364 * * In case of Alice, client disconnected after sending request, before receiving response
365 * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
371 was_transmitted = qe->was_transmitted;
372 // Control will only come here, when the request was transmitted to service,
373 // and service responded.
374 GNUNET_assert (was_transmitted == GNUNET_YES);
378 LOG (GNUNET_ERROR_TYPE_WARNING, "Service responded with NULL!\n");
379 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
381 else if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT))
383 LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid Message Received\n");
384 qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_InvalidResponse);
386 else if (ntohl (message->product_length) == 0)
388 // response for the responder client, successful
389 GNUNET_STATISTICS_update (h->stats,
390 gettext_noop ("# SUC responder result messages received"), 1,
393 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from service without product attached.\n");
394 qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
396 else if (ntohl (message->product_length) > 0)
398 // response for the requester client, successful
399 GNUNET_STATISTICS_update (h->stats,
400 gettext_noop ("# SUC requester result messages received"), 1,
403 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from requester service for requester client.\n");
404 qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
413 * Transmits the request to the VectorProduct Sevice
416 * @param size Size of the buffer
417 * @param buf Pointer to the buffer
419 * @return Size of the message sent
422 transmit_request (void *cls, size_t size,
425 struct GNUNET_SCALARPRODUCT_Handle *h = cls;
426 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
429 if (NULL == (qe = h->queue_head))
431 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue head is NULL!\n\n");
435 GNUNET_SCHEDULER_cancel (qe->timeout_task);
436 qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
439 if (NULL == (qe = h->queue_head))
440 return 0; /* no entry in queue */
443 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
444 GNUNET_STATISTICS_update (h->stats,
445 gettext_noop ("# transmission request failures"),
447 GNUNET_SCALARPRODUCT_disconnect (h);
450 if (size < (msize = qe->message_size))
455 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n",
458 memcpy (buf, qe->msg, size);
459 GNUNET_free (qe->msg);
460 qe->was_transmitted = GNUNET_YES;
462 GNUNET_assert (GNUNET_NO == h->in_receive);
463 h->in_receive = GNUNET_YES;
465 GNUNET_CLIENT_receive (h->client, &receive_cb, h,
466 GNUNET_TIME_UNIT_FOREVER_REL);
468 #if INSANE_STATISTICS
469 GNUNET_STATISTICS_update (h->stats,
470 gettext_noop ("# bytes sent to scalarproduct"), 1,
478 * Issues transmit request for the new entries in the queue
480 * @param h handle to the master context
483 process_queue (struct GNUNET_SCALARPRODUCT_Handle *h)
485 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
487 if (NULL == (qe = h->queue_head))
489 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
490 return; /* no entry in queue */
492 if (qe->was_transmitted == GNUNET_YES)
494 LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
495 return; /* waiting for replies */
499 LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
500 return; /* request pending */
502 if (h->client == NULL)
504 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
505 return; /* waiting for reconnect */
507 if (GNUNET_YES == h->in_receive)
509 /* wait for response to previous query */
514 GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
515 GNUNET_TIME_UNIT_FOREVER_REL,
517 &transmit_request, h);
521 LOG (GNUNET_ERROR_TYPE_ERROR,
522 _ ("Failed to send a message to the scalarproduct service\n"));
526 GNUNET_assert (GNUNET_NO == h->in_receive);
527 GNUNET_break (NULL != h->th);
532 /**************************************************************
534 **************************************************************/
538 * Called by the responder client to prepare response
540 * @param h handle to the master context
541 * @param key Session key - unique to the requesting client
542 * @param element_count Number of elements in the vector
543 * @param mask_length number of bytes in the mask
544 * @param elements Array of elements of the vector
545 * @param mask Array of the mask
546 * @param timeout Relative timeout for the operation
547 * @param cont Callback function
548 * @param cont_cls Closure for the callback function
550 struct GNUNET_SCALARPRODUCT_QueueEntry *
551 GNUNET_SCALARPRODUCT_prepare_response (struct GNUNET_SCALARPRODUCT_Handle *h,
552 const struct GNUNET_HashCode * key,
553 uint16_t element_count,
555 struct GNUNET_TIME_Relative timeout,
556 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
559 struct GNUNET_SCALARPRODUCT_QueueEntry *qe = make_queue_entry (h);
564 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
565 +element_count * sizeof (int32_t));
566 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) +element_count * sizeof (int32_t);
568 qe->message_size = size;
569 qe->msg = GNUNET_malloc (size);
570 qe->msg->header.size = htons (size);
571 qe->msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB);
572 qe->msg->element_count = htons (element_count);
573 qe->msg->mask_length = htons (0);
574 memcpy (&qe->msg->key, key, sizeof (struct GNUNET_HashCode));
575 qe->cont_status = cont;
576 qe->cont_cls = cont_cls;
577 qe->was_transmitted = GNUNET_NO;
578 qe->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, qe);
579 qe->response_proc = &process_status_message;
580 qe->timeout = GNUNET_TIME_relative_to_absolute (timeout);
582 vector = (int32_t *) & qe->msg[1];
583 // copy each element over to the message
584 for (i = 0; i < element_count; i++)
585 vector[i] = htonl (elements[i]);
593 * Request the Scalar Product Evaluation
595 * @param h handle to the master context
596 * @param key Session key - unique to the requesting client
597 * @param peer PeerID of the other peer
598 * @param element_count Number of elements in the vector
599 * @param mask_length number of bytes in the mask
600 * @param elements Array of elements of the vector
601 * @param mask Array of the mask
602 * @param timeout Relative timeout for the operation
603 * @param cont Callback function
604 * @param cont_cls Closure for the callback function
606 struct GNUNET_SCALARPRODUCT_QueueEntry *
607 GNUNET_SCALARPRODUCT_request (struct GNUNET_SCALARPRODUCT_Handle *h,
608 const struct GNUNET_HashCode * key,
609 const struct GNUNET_PeerIdentity * peer,
610 uint16_t element_count,
611 uint16_t mask_length,
613 const unsigned char * mask,
614 struct GNUNET_TIME_Relative timeout,
615 GNUNET_SCALARPRODUCT_DatumProcessor cont,
618 struct GNUNET_SCALARPRODUCT_QueueEntry *qe = make_queue_entry (h);
623 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
624 +element_count * sizeof (int32_t)
626 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) +element_count * sizeof (int32_t) + mask_length;
628 qe->message_size = size;
629 qe->msg = GNUNET_malloc (size);
630 qe->msg->header.size = htons (size);
631 qe->msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
632 memcpy (&qe->msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
633 qe->msg->element_count = htons (element_count);
634 qe->msg->mask_length = htons (mask_length);
635 memcpy (&qe->msg->key, key, sizeof (struct GNUNET_HashCode));
636 qe->cont_datum = cont;
637 qe->cont_cls = cont_cls;
638 qe->was_transmitted = GNUNET_NO;
639 qe->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, qe);
640 qe->response_proc = &process_result_message;
641 qe->timeout = GNUNET_TIME_relative_to_absolute (timeout);
643 vector = (int32_t*) & qe->msg[1];
644 // copy each element over to the message
645 for (i = 0; i < element_count; i++)
646 vector[i] = htonl (elements[i]);
649 memcpy (&vector[element_count], mask, mask_length);
657 * Connect to the scalarproduct service.
659 * @param cfg configuration to use
660 * @return handle to use to access the service
662 struct GNUNET_SCALARPRODUCT_Handle *
663 GNUNET_SCALARPRODUCT_connect (const struct GNUNET_CONFIGURATION_Handle * cfg)
665 struct GNUNET_CLIENT_Connection *client;
666 struct GNUNET_SCALARPRODUCT_Handle *h;
668 client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
672 LOG (GNUNET_ERROR_TYPE_ERROR,
673 _ ("Failed to connect to the scalarproduct service\n"));
677 h = GNUNET_malloc (sizeof (struct GNUNET_SCALARPRODUCT_Handle) +
678 GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
681 h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
687 * Disconnect from the scalarproduct service.
689 * @param h handle to the scalarproduct
692 GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_Handle * h)
694 struct GNUNET_SCALARPRODUCT_QueueEntry * qe;
696 LOG (GNUNET_ERROR_TYPE_INFO,
697 "Disconnecting from VectorProduct\n");
699 while (NULL != h->queue_head)
701 GNUNET_assert (NULL != (qe = free_queue_head_entry (h)));
702 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
705 if (h->client != NULL)
707 GNUNET_CLIENT_disconnect (h->client);
711 GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
715 /* end of ext_api.c */