From 9f064f70f7aff38119ebf1b4345118cb61302f2d Mon Sep 17 00:00:00 2001 From: Christian Fuchs Date: Mon, 2 Sep 2013 14:46:52 +0000 Subject: [PATCH] removed much of the excell logics in the scalar product API finished the the alice/bob API initiation functions in the SP API merged structes in SP API reorganized SP bookkeeping of computations --- src/include/gnunet_scalarproduct_service.h | 62 +--- src/scalarproduct/scalarproduct_api.c | 341 +++++---------------- 2 files changed, 79 insertions(+), 324 deletions(-) diff --git a/src/include/gnunet_scalarproduct_service.h b/src/include/gnunet_scalarproduct_service.h index cdf2cc437..3aac679c4 100644 --- a/src/include/gnunet_scalarproduct_service.h +++ b/src/include/gnunet_scalarproduct_service.h @@ -50,67 +50,7 @@ enum GNUNET_SCALARPRODUCT_ResponseStatus GNUNET_SCALARPRODUCT_Status_ServiceDisconnected }; -struct GNUNET_SCALARPRODUCT_Handle -{ - /** - * Our configuration. - */ - const struct GNUNET_CONFIGURATION_Handle *cfg; - - /** - * Current connection to the scalarproduct service. - */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Handle for statistics. - */ - struct GNUNET_STATISTICS_Handle *stats; - - /** - * Current transmit handle. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - - /** - * Handle to the master context. - */ - struct GNUNET_SCALARPRODUCT_Handle *h; - - /** - * The shared session key identifying this computation - */ - struct GNUNET_HashCode * key; - - /** - * The message to be transmitted - */ - void * msg; - - union - { - /** - * Function to call after transmission of the request. - */ - GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status; - - /** - * Function to call after transmission of the request. - */ - GNUNET_SCALARPRODUCT_DatumProcessor cont_datum; - }; - - /** - * Closure for 'cont'. - */ - void *cont_cls; - - /** - * Response Processor for response from the service. This function calls the - * continuation function provided by the client. - */ - GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc; -}; +struct GNUNET_SCALARPRODUCT_Handle; typedef void (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (void *cls, const struct GNUNET_MessageHeader *msg, diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c index b77c30925..7c1f5394e 100644 --- a/src/scalarproduct/scalarproduct_api.c +++ b/src/scalarproduct/scalarproduct_api.c @@ -41,22 +41,42 @@ /** * Entry in the request queue per client */ -struct GNUNET_SCALARPRODUCT_QueueEntry +struct GNUNET_SCALARPRODUCT_ComputationHandle { /** * This is a linked list. */ - struct GNUNET_SCALARPRODUCT_QueueEntry *next; + struct GNUNET_SCALARPRODUCT_ComputationHandle *next; /** * This is a linked list. */ - struct GNUNET_SCALARPRODUCT_QueueEntry *prev; + struct GNUNET_SCALARPRODUCT_ComputationHandle *prev; + + /** + * Our configuration. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Current connection to the scalarproduct service. + */ + struct GNUNET_CLIENT_Connection *client; + + /** + * Handle for statistics. + */ + struct GNUNET_STATISTICS_Handle *stats; /** - * Handle to the master context. + * The shared session key identifying this computation */ - struct GNUNET_SCALARPRODUCT_Handle *h; + struct GNUNET_HashCode * key; + + /** + * Current transmit handle. + */ + struct GNUNET_CLIENT_TransmitHandle *th; /** * Size of the message @@ -66,7 +86,7 @@ struct GNUNET_SCALARPRODUCT_QueueEntry /** * Message to be sent to the scalarproduct service */ - struct GNUNET_SCALARPRODUCT_client_request* msg; + struct GNUNET_SCALARPRODUCT_client_request * msg; union { @@ -86,14 +106,6 @@ struct GNUNET_SCALARPRODUCT_QueueEntry */ void *cont_cls; - /** - * Has this message been transmitted to the service? - * Only ever GNUNET_YES for the head of the queue. - * Note that the overall struct should end at a - * multiple of 64 bits. - */ - int16_t was_transmitted; - /** * Response Processor for response from the service. This function calls the * continuation function provided by the client. @@ -102,35 +114,20 @@ struct GNUNET_SCALARPRODUCT_QueueEntry }; /************************************************************** - *** Function Declarations ********** + *** Global Variables ********** **************************************************************/ - /** - * Creates a new entry at the tail of the DLL - * - * @param h handle to the master context - * - * @return pointer to the entry + * Head of the active sessions queue */ -static struct GNUNET_SCALARPRODUCT_QueueEntry * -make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h); - +struct GNUNET_SCALARPRODUCT_ComputationHandle *head; /** - * Removes the head entry from the queue - * - * @param h Handle to the master context + * Tail of the active sessions queue */ -static struct GNUNET_SCALARPRODUCT_QueueEntry * -free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h); +struct GNUNET_SCALARPRODUCT_ComputationHandle *tail; -/** - * Triggered when timeout occurs for a request in queue - * - * @param cls The pointer to the QueueEntry - * @param tc Task Context - */ -static void -timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +/************************************************************** + *** Function Declarations ********** + **************************************************************/ /** * Called when a response is received from the service. After basic check @@ -155,120 +152,10 @@ receive_cb (void *cls, const struct GNUNET_MessageHeader *msg); static size_t transmit_request (void *cls, size_t size, void *buf); -/** - * Issues transmit request for the new entries in the queue - * - * @param h handle to the master context - */ -static void -process_queue (struct GNUNET_SCALARPRODUCT_Handle *h); - /************************************************************** *** Static Function Declarations ********** **************************************************************/ - -/** - * Creates a new entry at the tail of the DLL - * - * @param h handle to the master context - * - * @return pointer to the entry - */ -static struct GNUNET_SCALARPRODUCT_QueueEntry * -make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h) -{ - struct GNUNET_SCALARPRODUCT_QueueEntry *qe; - - qe = GNUNET_new (struct GNUNET_SCALARPRODUCT_QueueEntry); - - // if queue empty - if (NULL == h->queue_head && NULL == h->queue_tail) - { - qe->next = NULL; - qe->prev = NULL; - h->queue_head = qe; - h->queue_tail = qe; - } - else - { - qe->prev = h->queue_tail; - h->queue_tail->next = qe; - h->queue_tail = qe; - } - - return qe; -} - - -/** - * Removes the head entry from the queue - * - * @param h Handle to the master context - */ -static struct GNUNET_SCALARPRODUCT_QueueEntry * -free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h) -{ - struct GNUNET_SCALARPRODUCT_QueueEntry * qe = NULL; - - GNUNET_assert (NULL != h); - if (NULL == h->queue_head && NULL == h->queue_tail) - { - // The queue is empty. Just return. - qe = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n"); - } - else if (h->queue_head == h->queue_tail) //only one entry - { - qe = h->queue_head; - qe->next = NULL; - qe->prev = NULL; - h->queue_head = NULL; - h->queue_tail = NULL; - } - else - { - qe = h->queue_head; - h->queue_head = h->queue_head->next; - h->queue_head->prev = NULL; - qe->next = NULL; - qe->prev = NULL; - } - return qe; -} - - -/** - * Triggered when timeout occurs for a request in queue - * - * @param cls The pointer to the QueueEntry - * @param tc Task Context - */ -static void -timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_SCALARPRODUCT_QueueEntry * qe = cls; - - // Update Statistics - GNUNET_STATISTICS_update (qe->h->stats, - gettext_noop ("# queue entry timeouts"), 1, - GNUNET_NO); - - // Clear the timeout_task - qe->timeout_task = GNUNET_SCHEDULER_NO_TASK; - - // transmit_request is supposed to cancel timeout task. - // If message was not transmitted, there is definitely an error. - GNUNET_assert (GNUNET_NO == qe->was_transmitted); - - LOG (GNUNET_ERROR_TYPE_INFO, "Timeout of request in datastore queue\n"); - - // remove the queue_entry for the queue - GNUNET_CONTAINER_DLL_remove (qe->h->queue_head, qe->h->queue_tail, qe); - qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Timeout); -} - - /** * Handles the RESULT received in reply of prepare_response from the * service @@ -281,7 +168,7 @@ process_status_message (void *cls, const struct GNUNET_MessageHeader *msg, enum GNUNET_SCALARPRODUCT_ResponseStatus status) { - struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls; + struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; GNUNET_assert (qe != NULL); @@ -302,7 +189,7 @@ process_result_message (void *cls, const struct GNUNET_MessageHeader *msg, enum GNUNET_SCALARPRODUCT_ResponseStatus status) { - struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls; + struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; GNUNET_assert (qe != NULL); @@ -328,8 +215,8 @@ process_result_message (void *cls, static void receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) { - struct GNUNET_SCALARPRODUCT_Handle *h = cls; - struct GNUNET_SCALARPRODUCT_QueueEntry *qe; + struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; + struct GNUNET_SCALARPRODUCT_ComputationHandle *qe; int16_t was_transmitted; struct GNUNET_SCALARPRODUCT_client_response *message = (struct GNUNET_SCALARPRODUCT_client_response *) msg; @@ -414,34 +301,16 @@ static size_t transmit_request (void *cls, size_t size, void *buf) { - struct GNUNET_SCALARPRODUCT_Handle *h = cls; - struct GNUNET_SCALARPRODUCT_QueueEntry *qe; + struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; size_t msize; - if (NULL == (qe = h->queue_head)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue head is NULL!\n\n"); - return 0; - } - - GNUNET_SCHEDULER_cancel (qe->timeout_task); - qe->timeout_task = GNUNET_SCHEDULER_NO_TASK; - - h->th = NULL; - if (NULL == (qe = h->queue_head)) - return 0; /* no entry in queue */ if (buf == NULL) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n"); - GNUNET_STATISTICS_update (h->stats, + GNUNET_STATISTICS_update (qe->stats, gettext_noop ("# transmission request failures"), 1, GNUNET_NO); - GNUNET_SCALARPRODUCT_disconnect (h); - return 0; - } - if (size < (msize = qe->message_size)) - { - process_queue (h); + GNUNET_SCALARPRODUCT_disconnect (qe); return 0; } LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n", @@ -451,8 +320,7 @@ transmit_request (void *cls, size_t size, GNUNET_free (qe->msg); qe->was_transmitted = GNUNET_YES; - GNUNET_assert (GNUNET_NO == h->in_receive); - h->in_receive = GNUNET_YES; + qe->th = NULL; GNUNET_CLIENT_receive (h->client, &receive_cb, h, GNUNET_TIME_UNIT_FOREVER_REL); @@ -466,61 +334,6 @@ transmit_request (void *cls, size_t size, } -/** - * Issues transmit request for the new entries in the queue - * - * @param h handle to the master context - */ -static void -process_queue (struct GNUNET_SCALARPRODUCT_Handle *h) -{ - struct GNUNET_SCALARPRODUCT_QueueEntry *qe; - - if (NULL == (qe = h->queue_head)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n"); - return; /* no entry in queue */ - } - if (qe->was_transmitted == GNUNET_YES) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n"); - return; /* waiting for replies */ - } - if (h->th != NULL) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n"); - return; /* request pending */ - } - if (h->client == NULL) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n"); - return; /* waiting for reconnect */ - } - if (GNUNET_YES == h->in_receive) - { - /* wait for response to previous query */ - return; - } - - h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &transmit_request, h); - - if (h->th == NULL) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - _ ("Failed to send a message to the scalarproduct service\n")); - return; - } - - GNUNET_assert (GNUNET_NO == h->in_receive); - GNUNET_break (NULL != h->th); -} - - - /************************************************************** *** API ********** **************************************************************/ @@ -536,7 +349,7 @@ process_queue (struct GNUNET_SCALARPRODUCT_Handle *h) * @param cont Callback function * @param cont_cls Closure for the callback function */ -struct GNUNET_SCALARPRODUCT_Handle * +struct GNUNET_SCALARPRODUCT_ComputationHandle * GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_HashCode * key, const int32_t * elements, @@ -544,7 +357,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, void *cont_cls) { - struct GNUNET_SCALARPRODUCT_Handle *h; + struct GNUNET_SCALARPRODUCT_ComputationHandle *h; struct GNUNET_SCALARPRODUCT_client_request *msg; int32_t * vector; uint16_t size; @@ -556,7 +369,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_assert(element_count > 1); GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t)); - h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle); + h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); if (!h->client) { @@ -595,7 +408,6 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_YES, // retry is OK in the initial stage @@ -610,6 +422,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_free(h); return NULL; } + GNUNET_CONTAINER_DLL_insert (head, tail, h); return h; } @@ -627,7 +440,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, * @param cont Callback function * @param cont_cls Closure for the callback function */ -struct GNUNET_SCALARPRODUCT_Handle * +struct GNUNET_SCALARPRODUCT_ComputationHandle * GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_HashCode * key, const struct GNUNET_PeerIdentity *peer, @@ -638,38 +451,39 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_SCALARPRODUCT_DatumProcessor cont, void *cont_cls) { - struct GNUNET_CLIENT_Connection *client; - struct GNUNET_SCALARPRODUCT_Handle *h; + struct GNUNET_SCALARPRODUCT_ComputationHandle *h; struct GNUNET_SCALARPRODUCT_client_request *msg; int32_t * vector; uint16_t size; uint64_t i; - GNUNET_assert(key); - GNUNET_assert(peer); - GNUNET_assert(elements); - GNUNET_assert(mask); - GNUNET_assert(cont); - GNUNET_assert(element_count > 1); - GNUNET_assert(mask_bytes != 0); GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length); - client = GNUNET_CLIENT_connect ("scalarproduct", cfg); - - if (!client) + + h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); + h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); + if (!h->client) { LOG (GNUNET_ERROR_TYPE_ERROR, _ ("Failed to connect to the scalarproduct service\n")); + GNUNET_free(h); return NULL; } + h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); + if (!h->th){ + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Failed to send a message to the statistics service\n")); + GNUNET_CLIENT_disconnect(h->client); + GNUNET_free(h); + return NULL; + } + size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length; - h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle); h->cont_datum = cont; h->cont_cls = cont_cls; h->response_proc = &process_status_message; - h->client = client; h->cfg = cfg; h->msg = GNUNET_malloc (size); memcpy (&h->key, key, sizeof (struct GNUNET_HashCode)); @@ -689,17 +503,21 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); memcpy (&vector[element_count], mask, mask_length); - h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_YES, // retry is OK in the initial stage &transmit_request, h); - if ( !h->th) + if (!h->th) { LOG (GNUNET_ERROR_TYPE_ERROR, _ ("Failed to send a message to the scalarproduct service\n")); + GNUNET_STATISTICS_destroy(h->GNUNET_YES); + GNUNET_CLIENT_disconnect(h->client); + GNUNET_free(h->msg); + GNUNET_free(h); return NULL; } + GNUNET_CONTAINER_DLL_insert (head, tail, h); return h; } @@ -709,27 +527,24 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, * @param h handle to the scalarproduct */ void -GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_Handle * h) +GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_ComputationHandle * h) { - struct GNUNET_SCALARPRODUCT_QueueEntry * qe; + struct GNUNET_SCALARPRODUCT_ComputationHandle * qe; LOG (GNUNET_ERROR_TYPE_INFO, "Disconnecting from VectorProduct\n"); - while (NULL != h->queue_head) - { - GNUNET_assert (NULL != (qe = free_queue_head_entry (h))); - qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected); - } - - if (h->client != NULL) + for (qe = head; head != NULL; qe = head) { + GNUNET_CONTAINER_DLL_remove (head, tail, qe); + if (NULL == qe->th) + GNUNET_CLIENT_notify_transmit_ready_cancel(qe->th); GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); + qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected); + GNUNET_free(qe->msg); + GNUNET_free(qe); } - - GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO); - h->stats = NULL; } /* end of ext_api.c */ -- 2.25.1