X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fscalarproduct%2Fscalarproduct_api.c;h=dcacd85c98152c7ee1f11d21977b4e6738915610;hb=27c12911f4f2aba2d90099270d70de846e83854f;hp=3eae200c3a98947a039eca6fb2ec3158230ca028;hpb=8355426564ccf3afe2e1451cfc6c5b38f6924330;p=oweals%2Fgnunet.git diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c index 3eae200c3..dcacd85c9 100644 --- a/src/scalarproduct/scalarproduct_api.c +++ b/src/scalarproduct/scalarproduct_api.c @@ -23,13 +23,14 @@ * @brief API for the scalarproduct * @author Christian Fuchs * @author Gaurav Kukreja - * + * */ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_statistics_service.h" #include "gnunet_scalarproduct_service.h" #include "gnunet_protocols.h" +#include "scalarproduct.h" #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__) @@ -37,25 +38,52 @@ *** Datatype Declarations ********** **************************************************************/ +/** + * the abstraction function for our internal callback + */ +typedef void (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (void *cls, + const struct GNUNET_MessageHeader *msg, + enum GNUNET_SCALARPRODUCT_ResponseStatus status); + /** * Entry in the request queue per client */ -struct GNUNET_SCALARPRODUCT_QueueEntry +struct GNUNET_SCALARPRODUCT_ComputationHandle { /** * This is a linked list. */ - struct GNUNET_SCALARPRODUCT_QueueEntry *next; + struct GNUNET_SCALARPRODUCT_ComputationHandle *next; /** * This is a linked list. */ - struct GNUNET_SCALARPRODUCT_QueueEntry *prev; + struct GNUNET_SCALARPRODUCT_ComputationHandle *prev; + + /** + * Our configuration. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Current connection to the scalarproduct service. + */ + struct GNUNET_CLIENT_Connection *client; + + /** + * Handle for statistics. + */ + struct GNUNET_STATISTICS_Handle *stats; /** - * Handle to the master context. + * The shared session key identifying this computation */ - struct GNUNET_SCALARPRODUCT_Handle *h; + struct GNUNET_HashCode key; + + /** + * Current transmit handle. + */ + struct GNUNET_CLIENT_TransmitHandle *th; /** * Size of the message @@ -65,33 +93,28 @@ struct GNUNET_SCALARPRODUCT_QueueEntry /** * Message to be sent to the scalarproduct service */ - struct GNUNET_SCALARPRODUCT_client_request* msg; + struct GNUNET_SCALARPRODUCT_client_request * msg; + /** + * The msg handler callback + */ union { - /** - * Function to call after transmission of the request. - */ - GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status; - - /** - * Function to call after transmission of the request. - */ - GNUNET_SCALARPRODUCT_DatumProcessor cont_datum; - }; + /** + * Function to call after transmission of the request. + */ + GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status; /** - * Closure for 'cont'. + * Function to call after transmission of the request. */ - void *cont_cls; + GNUNET_SCALARPRODUCT_DatumProcessor cont_datum; + }; /** - * Has this message been transmitted to the service? - * Only ever GNUNET_YES for the head of the queue. - * Note that the overall struct should end at a - * multiple of 64 bits. + * Closure for 'cont'. */ - int16_t was_transmitted; + void *cont_cls; /** * Response Processor for response from the service. This function calls the @@ -101,66 +124,23 @@ struct GNUNET_SCALARPRODUCT_QueueEntry }; /************************************************************** - *** Function Declarations ********** + *** Global Variables ********** **************************************************************/ - /** - * Creates a new entry at the tail of the DLL - * - * @param h handle to the master context - * - * @return pointer to the entry - */ -static struct GNUNET_SCALARPRODUCT_QueueEntry * -make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h); - -/** - * Removes the head entry from the queue - * - * @param h Handle to the master context + * Head of the active sessions queue */ -static struct GNUNET_SCALARPRODUCT_QueueEntry * -free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h); - +static struct GNUNET_SCALARPRODUCT_ComputationHandle *head; /** - * Triggered when timeout occurs for a request in queue - * - * @param cls The pointer to the QueueEntry - * @param tc Task Context + * Tail of the active sessions queue */ -static void -timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +static struct GNUNET_SCALARPRODUCT_ComputationHandle *tail; -/** - * Called when a response is received from the service. After basic check - * handler in qe->response_proc is called. This functions handles the response - * to the client which used the API. - * - * @param cls Pointer to the Master Context - * @param msg Pointer to the data received in response - */ -static void -receive_cb (void *cls, const struct GNUNET_MessageHeader *msg); - -/** - * Transmits the request to the VectorProduct Sevice - * - * @param cls Closure - * @param size Size of the buffer - * @param buf Pointer to the buffer - * - * @return Size of the message sent - */ -static size_t transmit_request (void *cls, size_t size, - void *buf); +/************************************************************** + *** Function Declarations ********** + **************************************************************/ -/** - * Issues transmit request for the new entries in the queue - * - * @param h handle to the master context - */ -static void -process_queue (struct GNUNET_SCALARPRODUCT_Handle *h); +void +GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h); /************************************************************** *** Static Function Declarations ********** @@ -168,296 +148,155 @@ process_queue (struct GNUNET_SCALARPRODUCT_Handle *h); /** - * Creates a new entry at the tail of the DLL - * - * @param h handle to the master context - * - * @return pointer to the entry - */ -static struct GNUNET_SCALARPRODUCT_QueueEntry * -make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h) -{ - struct GNUNET_SCALARPRODUCT_QueueEntry *qe; - - qe = GNUNET_new (struct GNUNET_SCALARPRODUCT_QueueEntry); - - // if queue empty - if (NULL == h->queue_head && NULL == h->queue_tail) - { - qe->next = NULL; - qe->prev = NULL; - h->queue_head = qe; - h->queue_tail = qe; - } - else - { - qe->prev = h->queue_tail; - h->queue_tail->next = qe; - h->queue_tail = qe; - } - - return qe; -} - - -/** - * Removes the head entry from the queue - * - * @param h Handle to the master context - */ -static struct GNUNET_SCALARPRODUCT_QueueEntry * -free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h) -{ - struct GNUNET_SCALARPRODUCT_QueueEntry * qe = NULL; - - GNUNET_assert (NULL != h); - if (NULL == h->queue_head && NULL == h->queue_tail) - { - // The queue is empty. Just return. - qe = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n"); - } - else if (h->queue_head == h->queue_tail) //only one entry - { - qe = h->queue_head; - qe->next = NULL; - qe->prev = NULL; - h->queue_head = NULL; - h->queue_tail = NULL; - } - else - { - qe = h->queue_head; - h->queue_head = h->queue_head->next; - h->queue_head->prev = NULL; - qe->next = NULL; - qe->prev = NULL; - } - return qe; -} - - -/** - * Triggered when timeout occurs for a request in queue - * - * @param cls The pointer to the QueueEntry - * @param tc Task Context - */ -static void -timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_SCALARPRODUCT_QueueEntry * qe = cls; - - // Update Statistics - GNUNET_STATISTICS_update (qe->h->stats, - gettext_noop ("# queue entry timeouts"), 1, - GNUNET_NO); - - // Clear the timeout_task - qe->timeout_task = GNUNET_SCHEDULER_NO_TASK; - - // transmit_request is supposed to cancel timeout task. - // If message was not transmitted, there is definitely an error. - GNUNET_assert (GNUNET_NO == qe->was_transmitted); - - LOG (GNUNET_ERROR_TYPE_INFO, "Timeout of request in datastore queue\n"); - - // remove the queue_entry for the queue - GNUNET_CONTAINER_DLL_remove (qe->h->queue_head, qe->h->queue_tail, qe); - qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Timeout); -} - - -/** - * Handles the RESULT received in reply of prepare_response from the - * service - * - * @param cls Handle to the Master Context + * Handles the STATUS received from the service for a response, does not contain a payload + * + * @param cls our Handle * @param msg Pointer to the response received + * @param status the condition the request was terminated with (eg: disconnect) */ static void process_status_message (void *cls, const struct GNUNET_MessageHeader *msg, enum GNUNET_SCALARPRODUCT_ResponseStatus status) { - struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls; - - GNUNET_assert (qe != NULL); + struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; - if (qe->cont_status != NULL) - qe->cont_status (qe->cont_cls, &qe->msg->key, status); + qe->cont_status (qe->cont_cls, status); } /** - * Handles the RESULT received in reply of prepare_response from the - * service - * - * @param cls Handle to the Master Context + * Handles the RESULT received from the service for a request, should contain a result MPI value + * + * @param cls our Handle * @param msg Pointer to the response received + * @param status the condition the request was terminated with (eg: disconnect) */ static void process_result_message (void *cls, const struct GNUNET_MessageHeader *msg, enum GNUNET_SCALARPRODUCT_ResponseStatus status) { - struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls; + struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; + const struct GNUNET_SCALARPRODUCT_client_response *message = + (const struct GNUNET_SCALARPRODUCT_client_response *) msg; + gcry_mpi_t result = NULL; + gcry_error_t rc; - GNUNET_assert (qe != NULL); - - if (msg == NULL && qe->cont_datum != NULL) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout reached or session terminated.\n"); - } - if (qe->cont_datum != NULL) + if (GNUNET_SCALARPRODUCT_Status_Success == status) { - qe->cont_datum (qe->cont_cls, &qe->msg->key, &qe->msg->peer, status, (struct GNUNET_SCALARPRODUCT_client_response *) msg); + size_t product_len = ntohl (message->product_length); + result = gcry_mpi_new (0); + + if (0 < product_len) + { + gcry_mpi_t num; + size_t read = 0; + + if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD, &message[1], product_len, &read))) + { + LOG_GCRY(GNUNET_ERROR_TYPE_ERROR, "gcry_mpi_scan", rc); + gcry_mpi_release (result); + result = NULL; + status = GNUNET_SCALARPRODUCT_Status_InvalidResponse; + } + else + { + if (0 < message->range) + gcry_mpi_add (result, result, num); + else if (0 > message->range) + gcry_mpi_sub (result, result, num); + gcry_mpi_release (num); + } + } } + qe->cont_datum (qe->cont_cls, status, result); } /** - * Called when a response is received from the service. After basic check + * Called when a response is received from the service. After basic check, the * handler in qe->response_proc is called. This functions handles the response * to the client which used the API. - * + * * @param cls Pointer to the Master Context * @param msg Pointer to the data received in response */ static void receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) { - struct GNUNET_SCALARPRODUCT_Handle *h = cls; - struct GNUNET_SCALARPRODUCT_QueueEntry *qe; - int16_t was_transmitted; - struct GNUNET_SCALARPRODUCT_client_response *message = - (struct GNUNET_SCALARPRODUCT_client_response *) msg; - - h->in_receive = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply from VectorProduct\n"); - - if (NULL == (qe = free_queue_head_entry (h))) - { - /** - * The queue head will be NULL if the client disconnected, - * * In case of Alice, client disconnected after sending request, before receiving response - * * In case of Bob, client disconnected after preparing response, before getting request from Alice. - */ - process_queue (h); - return; - } + struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; + const struct GNUNET_SCALARPRODUCT_client_response *message = + (const struct GNUNET_SCALARPRODUCT_client_response *) msg; + enum GNUNET_SCALARPRODUCT_ResponseStatus status = GNUNET_SCALARPRODUCT_Status_InvalidResponse; - if (h->client == NULL) + if (NULL == msg) { - // GKUKREJA : handle this correctly - /** - * The queue head will be NULL if the client disconnected, - * * In case of Alice, client disconnected after sending request, before receiving response - * * In case of Bob, client disconnected after preparing response, before getting request from Alice. - */ - process_queue (h); - return; + LOG (GNUNET_ERROR_TYPE_WARNING, "Disconnected by Service.\n"); + status = GNUNET_SCALARPRODUCT_Status_ServiceDisconnected; } - - was_transmitted = qe->was_transmitted; - // Control will only come here, when the request was transmitted to service, - // and service responded. - GNUNET_assert (was_transmitted == GNUNET_YES); - - if (msg == NULL) + else if (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT != ntohs (msg->type)) { - LOG (GNUNET_ERROR_TYPE_WARNING, "Service responded with NULL!\n"); - qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure); + LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid message type received\n"); } - else if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT)) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid Message Received\n"); - qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_InvalidResponse); - } - else if (ntohl (message->product_length) == 0) + else if (0 < ntohl (message->product_length) || (0 == message->range)) { // response for the responder client, successful - GNUNET_STATISTICS_update (h->stats, + GNUNET_STATISTICS_update (qe->stats, gettext_noop ("# SUC responder result messages received"), 1, GNUNET_NO); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from service without product attached.\n"); - qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success); + status = GNUNET_SCALARPRODUCT_Status_Success; } - else if (ntohl (message->product_length) > 0) - { - // response for the requester client, successful - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# SUC requester result messages received"), 1, - GNUNET_NO); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from requester service for requester client.\n"); - qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success); - } + if (qe->cont_datum != NULL) + qe->response_proc (qe, msg, status); + GNUNET_CONTAINER_DLL_remove (head, tail, qe); GNUNET_free (qe); - process_queue (h); } /** - * Transmits the request to the VectorProduct Sevice - * + * Transmits the request to the VectorProduct Service + * * @param cls Closure * @param size Size of the buffer * @param buf Pointer to the buffer - * + * * @return Size of the message sent */ static size_t transmit_request (void *cls, size_t size, void *buf) { - struct GNUNET_SCALARPRODUCT_Handle *h = cls; - struct GNUNET_SCALARPRODUCT_QueueEntry *qe; - size_t msize; - - if (NULL == (qe = h->queue_head)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue head is NULL!\n\n"); - return 0; - } - - GNUNET_SCHEDULER_cancel (qe->timeout_task); - qe->timeout_task = GNUNET_SCHEDULER_NO_TASK; + struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; - h->th = NULL; - if (NULL == (qe = h->queue_head)) - return 0; /* no entry in queue */ - if (buf == NULL) + if (NULL == buf) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n"); - GNUNET_STATISTICS_update (h->stats, + GNUNET_STATISTICS_update (qe->stats, gettext_noop ("# transmission request failures"), 1, GNUNET_NO); - GNUNET_SCALARPRODUCT_disconnect (h); - return 0; - } - if (size < (msize = qe->message_size)) - { - process_queue (h); + + // notify caller about the error, done here. + if (qe->cont_datum != NULL) + qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure); + + GNUNET_SCALARPRODUCT_cancel (cls); return 0; } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n", - msize); - memcpy (buf, qe->msg, size); - GNUNET_free (qe->msg); - qe->was_transmitted = GNUNET_YES; - GNUNET_assert (GNUNET_NO == h->in_receive); - h->in_receive = GNUNET_YES; + GNUNET_free (qe->msg); + qe->msg = NULL; + qe->th = NULL; - GNUNET_CLIENT_receive (h->client, &receive_cb, h, + GNUNET_CLIENT_receive (qe->client, &receive_cb, qe, GNUNET_TIME_UNIT_FOREVER_REL); #if INSANE_STATISTICS - GNUNET_STATISTICS_update (h->stats, + GNUNET_STATISTICS_update (qe->stats, gettext_noop ("# bytes sent to scalarproduct"), 1, GNUNET_NO); #endif @@ -465,136 +304,79 @@ transmit_request (void *cls, size_t size, } -/** - * Issues transmit request for the new entries in the queue - * - * @param h handle to the master context - */ -static void -process_queue (struct GNUNET_SCALARPRODUCT_Handle *h) -{ - struct GNUNET_SCALARPRODUCT_QueueEntry *qe; - - if (NULL == (qe = h->queue_head)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n"); - return; /* no entry in queue */ - } - if (qe->was_transmitted == GNUNET_YES) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n"); - return; /* waiting for replies */ - } - if (h->th != NULL) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n"); - return; /* request pending */ - } - if (h->client == NULL) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n"); - return; /* waiting for reconnect */ - } - if (GNUNET_YES == h->in_receive) - { - /* wait for response to previous query */ - return; - } - - h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &transmit_request, h); - - if (h->th == NULL) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - _ ("Failed to send a message to the scalarproduct service\n")); - return; - } - - GNUNET_assert (GNUNET_NO == h->in_receive); - GNUNET_break (NULL != h->th); -} - - - /************************************************************** *** API ********** **************************************************************/ /** - * Used by Bob's client to cooperate with Alice, - * - * @param h handle to the master context - * @param key Session key - unique to the requesting client + * Used by Bob's client to cooperate with Alice, + * + * @param cfg the gnunet configuration handle + * @param key Session key unique to the requesting client * @param elements Array of elements of the vector * @param element_count Number of elements in the vector * @param cont Callback function * @param cont_cls Closure for the callback function + * + * @return a new handle for this computation */ -struct GNUNET_SCALARPRODUCT_Handle * -GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, +struct GNUNET_SCALARPRODUCT_ComputationHandle * +GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle * cfg, const struct GNUNET_HashCode * key, const int32_t * elements, uint32_t element_count, GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, - void *cont_cls) + void * cont_cls) { - struct GNUNET_SCALARPRODUCT_Handle *h; + struct GNUNET_SCALARPRODUCT_ComputationHandle *h; struct GNUNET_SCALARPRODUCT_client_request *msg; int32_t * vector; uint16_t size; uint64_t i; - - GNUNET_assert(key); - GNUNET_assert(elements); - GNUNET_assert(cont); - GNUNET_assert(element_count > 1); + GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) - + element_count * sizeof (int32_t)); - h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle); + + element_count * sizeof (int32_t)); + h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); if (!h->client) { LOG (GNUNET_ERROR_TYPE_ERROR, _ ("Failed to connect to the scalarproduct service\n")); - GNUNET_free(h); + GNUNET_free (h); return NULL; } h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); - if (!h->th){ + if (!h->stats) + { LOG (GNUNET_ERROR_TYPE_ERROR, - _("Failed to send a message to the statistics service\n")); - GNUNET_CLIENT_disconnect(h->client); - GNUNET_free(h); + _ ("Failed to send a message to the statistics service\n")); + GNUNET_CLIENT_disconnect (h->client); + GNUNET_free (h); return NULL; - } - + } + size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t); - - h->cont_datum = cont; + + h->cont_status = cont; h->cont_cls = cont_cls; - h->response_proc = &process_result_message; + h->response_proc = &process_status_message; h->cfg = cfg; - h->msg = GNUNET_malloc (size); memcpy (&h->key, key, sizeof (struct GNUNET_HashCode)); - - msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg; + + msg = (struct GNUNET_SCALARPRODUCT_client_request*) GNUNET_malloc (size); + h->msg = msg; msg->header.size = htons (size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB); msg->element_count = htonl (element_count); - - vector = (int32_t*) &msg[1]; + + vector = (int32_t*) & msg[1]; // copy each element over to the message for (i = 0; i < element_count; i++) - vector[i] = htonl(elements[i]); + vector[i] = htonl (elements[i]); memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); - - + h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_YES, // retry is OK in the initial stage @@ -603,21 +385,22 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, { LOG (GNUNET_ERROR_TYPE_ERROR, _ ("Failed to send a message to the scalarproduct service\n")); - GNUNET_STATISTICS_destroy(h->GNUNET_YES); - GNUNET_CLIENT_disconnect(h->client); - GNUNET_free(h->msg); - GNUNET_free(h); + GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); + GNUNET_CLIENT_disconnect (h->client); + GNUNET_free (h->msg); + GNUNET_free (h); return NULL; } + GNUNET_CONTAINER_DLL_insert (head, tail, h); return h; } /** * Request by Alice's client for computing a scalar product - * - * @param h handle to the master context - * @param key Session key - unique to the requesting client + * + * @param cfg the gnunet configuration handle + * @param key Session key should be unique to the requesting client * @param peer PeerID of the other peer * @param elements Array of elements of the vector * @param element_count Number of elements in the vector @@ -625,9 +408,11 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, * @param mask_bytes number of bytes in the mask * @param cont Callback function * @param cont_cls Closure for the callback function + * + * @return a new handle for this computation */ -struct GNUNET_SCALARPRODUCT_Handle * -GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, +struct GNUNET_SCALARPRODUCT_ComputationHandle * +GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle * cfg, const struct GNUNET_HashCode * key, const struct GNUNET_PeerIdentity *peer, const int32_t * elements, @@ -635,100 +420,126 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, const unsigned char * mask, uint32_t mask_bytes, GNUNET_SCALARPRODUCT_DatumProcessor cont, - void *cont_cls) + void * cont_cls) { - struct GNUNET_CLIENT_Connection *client; - struct GNUNET_SCALARPRODUCT_Handle *h; + struct GNUNET_SCALARPRODUCT_ComputationHandle *h; struct GNUNET_SCALARPRODUCT_client_request *msg; int32_t * vector; uint16_t size; uint64_t i; - - GNUNET_assert(key); - GNUNET_assert(peer); - GNUNET_assert(elements); - GNUNET_assert(mask); - GNUNET_assert(cont); - GNUNET_assert(element_count > 1); - GNUNET_assert(mask_bytes != 0); + GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) - + element_count * sizeof (int32_t) - + mask_length); - client = GNUNET_CLIENT_connect ("scalarproduct", cfg); + +element_count * sizeof (int32_t) + + mask_bytes); - if (!client) + h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); + h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); + if (!h->client) { LOG (GNUNET_ERROR_TYPE_ERROR, _ ("Failed to connect to the scalarproduct service\n")); + GNUNET_free (h); return NULL; } - size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length; - - h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle); + h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); + if (!h->stats) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + _ ("Failed to send a message to the statistics service\n")); + GNUNET_CLIENT_disconnect (h->client); + GNUNET_free (h); + return NULL; + } + + size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_bytes; + h->cont_datum = cont; h->cont_cls = cont_cls; - h->response_proc = &process_status_message; - h->client = client; + h->response_proc = &process_result_message; h->cfg = cfg; - h->msg = GNUNET_malloc (size); memcpy (&h->key, key, sizeof (struct GNUNET_HashCode)); - - msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg; + + msg = (struct GNUNET_SCALARPRODUCT_client_request*) GNUNET_malloc (size); + h->msg = msg; msg->header.size = htons (size); msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); - msg->element_count = htons (element_count); - msg->mask_length = htons (mask_length); - - vector = (int32_t*) &msg[1]; + msg->element_count = htonl (element_count); + msg->mask_length = htonl (mask_bytes); + + vector = (int32_t*) & msg[1]; // copy each element over to the message for (i = 0; i < element_count; i++) - vector[i] = htonl(elements[i]); + vector[i] = htonl (elements[i]); memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity)); memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); - memcpy (&vector[element_count], mask, mask_length); - - h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); + memcpy (&vector[element_count], mask, mask_bytes); + h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_YES, // retry is OK in the initial stage &transmit_request, h); - if ( !h->th) + if (!h->th) { LOG (GNUNET_ERROR_TYPE_ERROR, _ ("Failed to send a message to the scalarproduct service\n")); + GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); + GNUNET_CLIENT_disconnect (h->client); + GNUNET_free (h->msg); + GNUNET_free (h); return NULL; } + GNUNET_CONTAINER_DLL_insert (head, tail, h); return h; } + /** - * Disconnect from the scalarproduct service. - * - * @param h handle to the scalarproduct + * Cancel an ongoing computation or revoke our collaboration offer. + * Closes the connection to the service + * + * @param h computation handle to terminate */ void -GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_Handle * h) +GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h) { - struct GNUNET_SCALARPRODUCT_QueueEntry * qe; - - LOG (GNUNET_ERROR_TYPE_INFO, - "Disconnecting from VectorProduct\n"); + struct GNUNET_SCALARPRODUCT_ComputationHandle * qe; - while (NULL != h->queue_head) + for (qe = head; head != NULL; qe = head) { - GNUNET_assert (NULL != (qe = free_queue_head_entry (h))); - qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected); + if (qe == h) + { + GNUNET_CONTAINER_DLL_remove (head, tail, qe); + if (NULL != qe->th) + GNUNET_CLIENT_notify_transmit_ready_cancel (qe->th); + GNUNET_CLIENT_disconnect (qe->client); + GNUNET_STATISTICS_destroy (qe->stats, GNUNET_YES); + GNUNET_free_non_null (qe->msg); + GNUNET_free (qe); + break; + } } +} +/** + * Cancel ALL ongoing computation or revoke our collaboration offer. + * Closes ALL connections to the service + */ +void +GNUNET_SCALARPRODUCT_disconnect () +{ + struct GNUNET_SCALARPRODUCT_ComputationHandle * qe; - if (h->client != NULL) + LOG (GNUNET_ERROR_TYPE_INFO, "Disconnecting from VectorProduct\n"); + for (qe = head; head != NULL; qe = head) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + GNUNET_CONTAINER_DLL_remove (head, tail, qe); + if (NULL != qe->th) + GNUNET_CLIENT_notify_transmit_ready_cancel (qe->th); + GNUNET_CLIENT_disconnect (qe->client); + GNUNET_STATISTICS_destroy (qe->stats, GNUNET_YES); + GNUNET_free_non_null (qe->msg); + GNUNET_free (qe); } - - GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO); - h->stats = NULL; } -/* end of ext_api.c */ +/* end of scalarproduct_api.c */