2 This file is part of GNUnet.
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file scalarproduct/scalarproduct_api.c
23 * @brief API for the scalarproduct
24 * @author Christian Fuchs
25 * @author Gaurav Kukreja
29 #include "gnunet_util_lib.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_scalarproduct_service.h"
32 #include "gnunet_protocols.h"
33 #include "scalarproduct.h"
35 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
37 /**************************************************************
38 *** Datatype Declarations **********
39 **************************************************************/
42 * Entry in the request queue per client
44 struct GNUNET_SCALARPRODUCT_QueueEntry
47 * This is a linked list.
49 struct GNUNET_SCALARPRODUCT_QueueEntry *next;
52 * This is a linked list.
54 struct GNUNET_SCALARPRODUCT_QueueEntry *prev;
57 * Handle to the master context.
59 struct GNUNET_SCALARPRODUCT_Handle *h;
64 uint16_t message_size;
67 * Message to be sent to the scalarproduct service
69 struct GNUNET_SCALARPRODUCT_client_request* msg;
74 * Function to call after transmission of the request.
76 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
79 * Function to call after transmission of the request.
81 GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
90 * Has this message been transmitted to the service?
91 * Only ever GNUNET_YES for the head of the queue.
92 * Note that the overall struct should end at a
93 * multiple of 64 bits.
95 int16_t was_transmitted;
98 * Response Processor for response from the service. This function calls the
99 * continuation function provided by the client.
101 GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
104 /**************************************************************
105 *** Function Declarations **********
106 **************************************************************/
109 * Creates a new entry at the tail of the DLL
111 * @param h handle to the master context
113 * @return pointer to the entry
115 static struct GNUNET_SCALARPRODUCT_QueueEntry *
116 make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h);
119 * Removes the head entry from the queue
121 * @param h Handle to the master context
123 static struct GNUNET_SCALARPRODUCT_QueueEntry *
124 free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h);
127 * Triggered when timeout occurs for a request in queue
129 * @param cls The pointer to the QueueEntry
130 * @param tc Task Context
133 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
136 * Called when a response is received from the service. After basic check
137 * handler in qe->response_proc is called. This functions handles the response
138 * to the client which used the API.
140 * @param cls Pointer to the Master Context
141 * @param msg Pointer to the data received in response
144 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg);
147 * Transmits the request to the VectorProduct Sevice
150 * @param size Size of the buffer
151 * @param buf Pointer to the buffer
153 * @return Size of the message sent
155 static size_t transmit_request (void *cls, size_t size,
159 * Issues transmit request for the new entries in the queue
161 * @param h handle to the master context
164 process_queue (struct GNUNET_SCALARPRODUCT_Handle *h);
166 /**************************************************************
167 *** Static Function Declarations **********
168 **************************************************************/
172 * Creates a new entry at the tail of the DLL
174 * @param h handle to the master context
176 * @return pointer to the entry
178 static struct GNUNET_SCALARPRODUCT_QueueEntry *
179 make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h)
181 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
183 qe = GNUNET_new (struct GNUNET_SCALARPRODUCT_QueueEntry);
186 if (NULL == h->queue_head && NULL == h->queue_tail)
195 qe->prev = h->queue_tail;
196 h->queue_tail->next = qe;
205 * Removes the head entry from the queue
207 * @param h Handle to the master context
209 static struct GNUNET_SCALARPRODUCT_QueueEntry *
210 free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h)
212 struct GNUNET_SCALARPRODUCT_QueueEntry * qe = NULL;
214 GNUNET_assert (NULL != h);
215 if (NULL == h->queue_head && NULL == h->queue_tail)
217 // The queue is empty. Just return.
219 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n");
221 else if (h->queue_head == h->queue_tail) //only one entry
226 h->queue_head = NULL;
227 h->queue_tail = NULL;
232 h->queue_head = h->queue_head->next;
233 h->queue_head->prev = NULL;
242 * Triggered when timeout occurs for a request in queue
244 * @param cls The pointer to the QueueEntry
245 * @param tc Task Context
248 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
250 struct GNUNET_SCALARPRODUCT_QueueEntry * qe = cls;
253 GNUNET_STATISTICS_update (qe->h->stats,
254 gettext_noop ("# queue entry timeouts"), 1,
257 // Clear the timeout_task
258 qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
260 // transmit_request is supposed to cancel timeout task.
261 // If message was not transmitted, there is definitely an error.
262 GNUNET_assert (GNUNET_NO == qe->was_transmitted);
264 LOG (GNUNET_ERROR_TYPE_INFO, "Timeout of request in datastore queue\n");
266 // remove the queue_entry for the queue
267 GNUNET_CONTAINER_DLL_remove (qe->h->queue_head, qe->h->queue_tail, qe);
268 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Timeout);
273 * Handles the RESULT received in reply of prepare_response from the
276 * @param cls Handle to the Master Context
277 * @param msg Pointer to the response received
280 process_status_message (void *cls,
281 const struct GNUNET_MessageHeader *msg,
282 enum GNUNET_SCALARPRODUCT_ResponseStatus status)
284 struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
286 GNUNET_assert (qe != NULL);
288 if (qe->cont_status != NULL)
289 qe->cont_status (qe->cont_cls, &qe->msg->key, status);
294 * Handles the RESULT received in reply of prepare_response from the
297 * @param cls Handle to the Master Context
298 * @param msg Pointer to the response received
301 process_result_message (void *cls,
302 const struct GNUNET_MessageHeader *msg,
303 enum GNUNET_SCALARPRODUCT_ResponseStatus status)
305 struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
307 GNUNET_assert (qe != NULL);
309 if (msg == NULL && qe->cont_datum != NULL)
311 LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout reached or session terminated.\n");
313 if (qe->cont_datum != NULL)
315 qe->cont_datum (qe->cont_cls, &qe->msg->key, &qe->msg->peer, status, (struct GNUNET_SCALARPRODUCT_client_response *) msg);
321 * Called when a response is received from the service. After basic check
322 * handler in qe->response_proc is called. This functions handles the response
323 * to the client which used the API.
325 * @param cls Pointer to the Master Context
326 * @param msg Pointer to the data received in response
329 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
331 struct GNUNET_SCALARPRODUCT_Handle *h = cls;
332 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
333 int16_t was_transmitted;
334 struct GNUNET_SCALARPRODUCT_client_response *message =
335 (struct GNUNET_SCALARPRODUCT_client_response *) msg;
337 h->in_receive = GNUNET_NO;
338 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply from VectorProduct\n");
340 if (NULL == (qe = free_queue_head_entry (h)))
343 * The queue head will be NULL if the client disconnected,
344 * * In case of Alice, client disconnected after sending request, before receiving response
345 * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
351 if (h->client == NULL)
353 // GKUKREJA : handle this correctly
355 * The queue head will be NULL if the client disconnected,
356 * * In case of Alice, client disconnected after sending request, before receiving response
357 * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
363 was_transmitted = qe->was_transmitted;
364 // Control will only come here, when the request was transmitted to service,
365 // and service responded.
366 GNUNET_assert (was_transmitted == GNUNET_YES);
370 LOG (GNUNET_ERROR_TYPE_WARNING, "Service responded with NULL!\n");
371 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
373 else if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT))
375 LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid Message Received\n");
376 qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_InvalidResponse);
378 else if (ntohl (message->product_length) == 0)
380 // response for the responder client, successful
381 GNUNET_STATISTICS_update (h->stats,
382 gettext_noop ("# SUC responder result messages received"), 1,
385 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from service without product attached.\n");
386 qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
388 else if (ntohl (message->product_length) > 0)
390 // response for the requester client, successful
391 GNUNET_STATISTICS_update (h->stats,
392 gettext_noop ("# SUC requester result messages received"), 1,
395 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from requester service for requester client.\n");
396 qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
405 * Transmits the request to the VectorProduct Sevice
408 * @param size Size of the buffer
409 * @param buf Pointer to the buffer
411 * @return Size of the message sent
414 transmit_request (void *cls, size_t size,
417 struct GNUNET_SCALARPRODUCT_Handle *h = cls;
418 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
421 if (NULL == (qe = h->queue_head))
423 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue head is NULL!\n\n");
427 GNUNET_SCHEDULER_cancel (qe->timeout_task);
428 qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
431 if (NULL == (qe = h->queue_head))
432 return 0; /* no entry in queue */
435 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
436 GNUNET_STATISTICS_update (h->stats,
437 gettext_noop ("# transmission request failures"),
439 GNUNET_SCALARPRODUCT_disconnect (h);
442 if (size < (msize = qe->message_size))
447 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n",
450 memcpy (buf, qe->msg, size);
451 GNUNET_free (qe->msg);
452 qe->was_transmitted = GNUNET_YES;
454 GNUNET_assert (GNUNET_NO == h->in_receive);
455 h->in_receive = GNUNET_YES;
457 GNUNET_CLIENT_receive (h->client, &receive_cb, h,
458 GNUNET_TIME_UNIT_FOREVER_REL);
460 #if INSANE_STATISTICS
461 GNUNET_STATISTICS_update (h->stats,
462 gettext_noop ("# bytes sent to scalarproduct"), 1,
470 * Issues transmit request for the new entries in the queue
472 * @param h handle to the master context
475 process_queue (struct GNUNET_SCALARPRODUCT_Handle *h)
477 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
479 if (NULL == (qe = h->queue_head))
481 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
482 return; /* no entry in queue */
484 if (qe->was_transmitted == GNUNET_YES)
486 LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
487 return; /* waiting for replies */
491 LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
492 return; /* request pending */
494 if (h->client == NULL)
496 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
497 return; /* waiting for reconnect */
499 if (GNUNET_YES == h->in_receive)
501 /* wait for response to previous query */
506 GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
507 GNUNET_TIME_UNIT_FOREVER_REL,
509 &transmit_request, h);
513 LOG (GNUNET_ERROR_TYPE_ERROR,
514 _ ("Failed to send a message to the scalarproduct service\n"));
518 GNUNET_assert (GNUNET_NO == h->in_receive);
519 GNUNET_break (NULL != h->th);
524 /**************************************************************
526 **************************************************************/
530 * Used by Bob's client to cooperate with Alice,
532 * @param h handle to the master context
533 * @param key Session key - unique to the requesting client
534 * @param elements Array of elements of the vector
535 * @param element_count Number of elements in the vector
536 * @param cont Callback function
537 * @param cont_cls Closure for the callback function
539 struct GNUNET_SCALARPRODUCT_Handle *
540 GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
541 const struct GNUNET_HashCode * key,
542 const int32_t * elements,
543 uint32_t element_count,
544 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
547 struct GNUNET_SCALARPRODUCT_Handle *h;
548 struct GNUNET_SCALARPRODUCT_client_request *msg;
554 GNUNET_assert(elements);
556 GNUNET_assert(element_count > 1);
557 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
558 + element_count * sizeof (int32_t));
559 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
560 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
563 LOG (GNUNET_ERROR_TYPE_ERROR,
564 _ ("Failed to connect to the scalarproduct service\n"));
568 h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
570 LOG (GNUNET_ERROR_TYPE_ERROR,
571 _("Failed to send a message to the statistics service\n"));
572 GNUNET_CLIENT_disconnect(h->client);
577 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t);
579 h->cont_datum = cont;
580 h->cont_cls = cont_cls;
581 h->response_proc = &process_result_message;
583 h->msg = GNUNET_malloc (size);
584 memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
586 msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
587 msg->header.size = htons (size);
588 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
589 msg->element_count = htonl (element_count);
591 vector = (int32_t*) &msg[1];
592 // copy each element over to the message
593 for (i = 0; i < element_count; i++)
594 vector[i] = htonl(elements[i]);
596 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
599 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
600 GNUNET_TIME_UNIT_FOREVER_REL,
601 GNUNET_YES, // retry is OK in the initial stage
602 &transmit_request, h);
605 LOG (GNUNET_ERROR_TYPE_ERROR,
606 _ ("Failed to send a message to the scalarproduct service\n"));
607 GNUNET_STATISTICS_destroy(h->GNUNET_YES);
608 GNUNET_CLIENT_disconnect(h->client);
618 * Request by Alice's client for computing a scalar product
620 * @param h handle to the master context
621 * @param key Session key - unique to the requesting client
622 * @param peer PeerID of the other peer
623 * @param elements Array of elements of the vector
624 * @param element_count Number of elements in the vector
625 * @param mask Array of the mask
626 * @param mask_bytes number of bytes in the mask
627 * @param cont Callback function
628 * @param cont_cls Closure for the callback function
630 struct GNUNET_SCALARPRODUCT_Handle *
631 GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
632 const struct GNUNET_HashCode * key,
633 const struct GNUNET_PeerIdentity *peer,
634 const int32_t * elements,
635 uint32_t element_count,
636 const unsigned char * mask,
638 GNUNET_SCALARPRODUCT_DatumProcessor cont,
641 struct GNUNET_CLIENT_Connection *client;
642 struct GNUNET_SCALARPRODUCT_Handle *h;
643 struct GNUNET_SCALARPRODUCT_client_request *msg;
650 GNUNET_assert(elements);
653 GNUNET_assert(element_count > 1);
654 GNUNET_assert(mask_bytes != 0);
655 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
656 + element_count * sizeof (int32_t)
658 client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
662 LOG (GNUNET_ERROR_TYPE_ERROR,
663 _ ("Failed to connect to the scalarproduct service\n"));
666 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length;
668 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
669 h->cont_datum = cont;
670 h->cont_cls = cont_cls;
671 h->response_proc = &process_status_message;
674 h->msg = GNUNET_malloc (size);
675 memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
677 msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
678 msg->header.size = htons (size);
679 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
680 msg->element_count = htons (element_count);
681 msg->mask_length = htons (mask_length);
683 vector = (int32_t*) &msg[1];
684 // copy each element over to the message
685 for (i = 0; i < element_count; i++)
686 vector[i] = htonl(elements[i]);
688 memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
689 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
690 memcpy (&vector[element_count], mask, mask_length);
692 h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
693 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
694 GNUNET_TIME_UNIT_FOREVER_REL,
695 GNUNET_YES, // retry is OK in the initial stage
696 &transmit_request, h);
699 LOG (GNUNET_ERROR_TYPE_ERROR,
700 _ ("Failed to send a message to the scalarproduct service\n"));
707 * Disconnect from the scalarproduct service.
709 * @param h handle to the scalarproduct
712 GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_Handle * h)
714 struct GNUNET_SCALARPRODUCT_QueueEntry * qe;
716 LOG (GNUNET_ERROR_TYPE_INFO,
717 "Disconnecting from VectorProduct\n");
719 while (NULL != h->queue_head)
721 GNUNET_assert (NULL != (qe = free_queue_head_entry (h)));
722 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
725 if (h->client != NULL)
727 GNUNET_CLIENT_disconnect (h->client);
731 GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
735 /* end of ext_api.c */