From 8355426564ccf3afe2e1451cfc6c5b38f6924330 Mon Sep 17 00:00:00 2001 From: Christian Fuchs Date: Mon, 2 Sep 2013 13:20:36 +0000 Subject: [PATCH] partial rework of the old API, includes simplifications and gets rid of the extra connect. we now establish one client-connection per session. --- src/scalarproduct/scalarproduct_api.c | 235 ++++++++++++++------------ 1 file changed, 127 insertions(+), 108 deletions(-) diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c index 928f5397c..3eae200c3 100644 --- a/src/scalarproduct/scalarproduct_api.c +++ b/src/scalarproduct/scalarproduct_api.c @@ -93,16 +93,6 @@ struct GNUNET_SCALARPRODUCT_QueueEntry */ int16_t was_transmitted; - /** - * Timeout for the current operation. - */ - struct GNUNET_TIME_Absolute timeout; - - /** - * Task for timeout signaling. - */ - GNUNET_SCHEDULER_TaskIdentifier timeout_task; - /** * Response Processor for response from the service. This function calls the * continuation function provided by the client. @@ -224,6 +214,7 @@ free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h) if (NULL == h->queue_head && NULL == h->queue_tail) { // The queue is empty. Just return. + qe = NULL; LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n"); } else if (h->queue_head == h->queue_tail) //only one entry @@ -535,154 +526,182 @@ process_queue (struct GNUNET_SCALARPRODUCT_Handle *h) /** - * Called by the responder client to prepare response + * Used by Bob's client to cooperate with Alice, * * @param h handle to the master context * @param key Session key - unique to the requesting client - * @param element_count Number of elements in the vector - * @param mask_length number of bytes in the mask * @param elements Array of elements of the vector - * @param mask Array of the mask - * @param timeout Relative timeout for the operation + * @param element_count Number of elements in the vector * @param cont Callback function * @param cont_cls Closure for the callback function */ -struct GNUNET_SCALARPRODUCT_QueueEntry * -GNUNET_SCALARPRODUCT_prepare_response (struct GNUNET_SCALARPRODUCT_Handle *h, - const struct GNUNET_HashCode * key, - uint16_t element_count, - int32_t * elements, - struct GNUNET_TIME_Relative timeout, - GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, - void *cont_cls) +struct GNUNET_SCALARPRODUCT_Handle * +GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_HashCode * key, + const int32_t * elements, + uint32_t element_count, + GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, + void *cont_cls) { - struct GNUNET_SCALARPRODUCT_QueueEntry *qe = make_queue_entry (h); + struct GNUNET_SCALARPRODUCT_Handle *h; + struct GNUNET_SCALARPRODUCT_client_request *msg; int32_t * vector; uint16_t size; - unsigned int i; + uint64_t i; + GNUNET_assert(key); + GNUNET_assert(elements); + GNUNET_assert(cont); + GNUNET_assert(element_count > 1); GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) - +element_count * sizeof (int32_t)); - size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) +element_count * sizeof (int32_t); - - qe->message_size = size; - qe->msg = GNUNET_malloc (size); - qe->msg->header.size = htons (size); - qe->msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB); - qe->msg->element_count = htons (element_count); - qe->msg->mask_length = htons (0); - memcpy (&qe->msg->key, key, sizeof (struct GNUNET_HashCode)); - qe->cont_status = cont; - qe->cont_cls = cont_cls; - qe->was_transmitted = GNUNET_NO; - qe->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, qe); - qe->response_proc = &process_status_message; - qe->timeout = GNUNET_TIME_relative_to_absolute (timeout); - - vector = (int32_t *) & qe->msg[1]; + + element_count * sizeof (int32_t)); + h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle); + h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); + if (!h->client) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + _ ("Failed to connect to the scalarproduct service\n")); + GNUNET_free(h); + return NULL; + } + h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); + if (!h->th){ + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Failed to send a message to the statistics service\n")); + GNUNET_CLIENT_disconnect(h->client); + GNUNET_free(h); + return NULL; + } + + size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t); + + h->cont_datum = cont; + h->cont_cls = cont_cls; + h->response_proc = &process_result_message; + h->cfg = cfg; + h->msg = GNUNET_malloc (size); + memcpy (&h->key, key, sizeof (struct GNUNET_HashCode)); + + msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg; + msg->header.size = htons (size); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); + msg->element_count = htonl (element_count); + + vector = (int32_t*) &msg[1]; // copy each element over to the message for (i = 0; i < element_count; i++) - vector[i] = htonl (elements[i]); + vector[i] = htonl(elements[i]); - process_queue (h); - return qe; + memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); + + + h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_YES, // retry is OK in the initial stage + &transmit_request, h); + if (!h->th) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + _ ("Failed to send a message to the scalarproduct service\n")); + GNUNET_STATISTICS_destroy(h->GNUNET_YES); + GNUNET_CLIENT_disconnect(h->client); + GNUNET_free(h->msg); + GNUNET_free(h); + return NULL; + } + return h; } /** - * Request the Scalar Product Evaluation + * Request by Alice's client for computing a scalar product * * @param h handle to the master context * @param key Session key - unique to the requesting client * @param peer PeerID of the other peer - * @param element_count Number of elements in the vector - * @param mask_length number of bytes in the mask * @param elements Array of elements of the vector + * @param element_count Number of elements in the vector * @param mask Array of the mask - * @param timeout Relative timeout for the operation + * @param mask_bytes number of bytes in the mask * @param cont Callback function * @param cont_cls Closure for the callback function */ -struct GNUNET_SCALARPRODUCT_QueueEntry * -GNUNET_SCALARPRODUCT_request (struct GNUNET_SCALARPRODUCT_Handle *h, +struct GNUNET_SCALARPRODUCT_Handle * +GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_HashCode * key, - const struct GNUNET_PeerIdentity * peer, - uint16_t element_count, - uint16_t mask_length, - int32_t * elements, + const struct GNUNET_PeerIdentity *peer, + const int32_t * elements, + uint32_t element_count, const unsigned char * mask, - struct GNUNET_TIME_Relative timeout, + uint32_t mask_bytes, GNUNET_SCALARPRODUCT_DatumProcessor cont, void *cont_cls) { - struct GNUNET_SCALARPRODUCT_QueueEntry *qe = make_queue_entry (h); + struct GNUNET_CLIENT_Connection *client; + struct GNUNET_SCALARPRODUCT_Handle *h; + struct GNUNET_SCALARPRODUCT_client_request *msg; int32_t * vector; uint16_t size; - unsigned int i; + uint64_t i; + GNUNET_assert(key); + GNUNET_assert(peer); + GNUNET_assert(elements); + GNUNET_assert(mask); + GNUNET_assert(cont); + GNUNET_assert(element_count > 1); + GNUNET_assert(mask_bytes != 0); GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) - +element_count * sizeof (int32_t) - + mask_length); - size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) +element_count * sizeof (int32_t) + mask_length; - - qe->message_size = size; - qe->msg = GNUNET_malloc (size); - qe->msg->header.size = htons (size); - qe->msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); - memcpy (&qe->msg->peer, peer, sizeof (struct GNUNET_PeerIdentity)); - qe->msg->element_count = htons (element_count); - qe->msg->mask_length = htons (mask_length); - memcpy (&qe->msg->key, key, sizeof (struct GNUNET_HashCode)); - qe->cont_datum = cont; - qe->cont_cls = cont_cls; - qe->was_transmitted = GNUNET_NO; - qe->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, qe); - qe->response_proc = &process_result_message; - qe->timeout = GNUNET_TIME_relative_to_absolute (timeout); - - vector = (int32_t*) & qe->msg[1]; - // copy each element over to the message - for (i = 0; i < element_count; i++) - vector[i] = htonl (elements[i]); - - // fill in the mask - memcpy (&vector[element_count], mask, mask_length); - - process_queue (h); - return qe; -} - - -/** - * Connect to the scalarproduct service. - * - * @param cfg configuration to use - * @return handle to use to access the service - */ -struct GNUNET_SCALARPRODUCT_Handle * -GNUNET_SCALARPRODUCT_connect (const struct GNUNET_CONFIGURATION_Handle * cfg) -{ - struct GNUNET_CLIENT_Connection *client; - struct GNUNET_SCALARPRODUCT_Handle *h; - + + element_count * sizeof (int32_t) + + mask_length); client = GNUNET_CLIENT_connect ("scalarproduct", cfg); - if (NULL == client) + if (!client) { LOG (GNUNET_ERROR_TYPE_ERROR, _ ("Failed to connect to the scalarproduct service\n")); return NULL; } - - h = GNUNET_malloc (sizeof (struct GNUNET_SCALARPRODUCT_Handle) + - GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); + size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length; + + h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle); + h->cont_datum = cont; + h->cont_cls = cont_cls; + h->response_proc = &process_status_message; h->client = client; h->cfg = cfg; + h->msg = GNUNET_malloc (size); + memcpy (&h->key, key, sizeof (struct GNUNET_HashCode)); + + msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg; + msg->header.size = htons (size); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); + msg->element_count = htons (element_count); + msg->mask_length = htons (mask_length); + + vector = (int32_t*) &msg[1]; + // copy each element over to the message + for (i = 0; i < element_count; i++) + vector[i] = htonl(elements[i]); + + memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity)); + memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); + memcpy (&vector[element_count], mask, mask_length); + h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); + h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_YES, // retry is OK in the initial stage + &transmit_request, h); + if ( !h->th) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + _ ("Failed to send a message to the scalarproduct service\n")); + return NULL; + } return h; } - /** * Disconnect from the scalarproduct service. * -- 2.25.1