updated includes of api and service to include the new headerfiles
[oweals/gnunet.git] / src / scalarproduct / scalarproduct_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file scalarproduct/scalarproduct_api.c
23  * @brief API for the scalarproduct
24  * @author Christian Fuchs
25  * @author Gaurav Kukreja
26  * 
27  */
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_scalarproduct_service.h"
32 #include "gnunet_protocols.h"
33 #include "scalarproduct.h"
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
36
37 /**************************************************************
38  ***  Datatype Declarations                          **********
39  **************************************************************/
40
41 /**
42  * Entry in the request queue per client
43  */
44 struct GNUNET_SCALARPRODUCT_QueueEntry
45 {
46   /**
47    * This is a linked list.
48    */
49   struct GNUNET_SCALARPRODUCT_QueueEntry *next;
50
51   /**
52    * This is a linked list.
53    */
54   struct GNUNET_SCALARPRODUCT_QueueEntry *prev;
55
56   /**
57    * Handle to the master context.
58    */
59   struct GNUNET_SCALARPRODUCT_Handle *h;
60
61   /**
62    * Size of the message
63    */
64   uint16_t message_size;
65
66   /**
67    * Message to be sent to the scalarproduct service
68    */
69   struct GNUNET_SCALARPRODUCT_client_request* msg;
70
71   union
72   {
73     /**
74      * Function to call after transmission of the request.
75      */
76     GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
77
78     /**
79      * Function to call after transmission of the request.
80      */
81     GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
82   };
83
84   /**
85    * Closure for 'cont'.
86    */
87   void *cont_cls;
88
89   /**
90    * Has this message been transmitted to the service?
91    * Only ever GNUNET_YES for the head of the queue.
92    * Note that the overall struct should end at a
93    * multiple of 64 bits.
94    */
95   int16_t was_transmitted;
96
97   /**
98    * Response Processor for response from the service. This function calls the
99    * continuation function provided by the client.
100    */
101   GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
102 };
103
104 /**************************************************************
105  ***  Function Declarations                          **********
106  **************************************************************/
107
108 /**
109  * Creates a new entry at the tail of the DLL
110  * 
111  * @param h handle to the master context
112  * 
113  * @return pointer to the entry
114  */
115 static struct GNUNET_SCALARPRODUCT_QueueEntry *
116 make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h);
117
118 /**
119  * Removes the head entry from the queue
120  * 
121  * @param h Handle to the master context
122  */
123 static struct GNUNET_SCALARPRODUCT_QueueEntry *
124 free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h);
125
126 /**
127  * Triggered when timeout occurs for a request in queue
128  * 
129  * @param cls The pointer to the QueueEntry
130  * @param tc Task Context
131  */
132 static void
133 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
134
135 /**
136  * Called when a response is received from the service. After basic check
137  * handler in qe->response_proc is called. This functions handles the response
138  * to the client which used the API.
139  * 
140  * @param cls Pointer to the Master Context
141  * @param msg Pointer to the data received in response
142  */
143 static void
144 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg);
145
146 /**
147  * Transmits the request to the VectorProduct Sevice
148  * 
149  * @param cls Closure
150  * @param size Size of the buffer
151  * @param buf Pointer to the buffer
152  * 
153  * @return Size of the message sent
154  */
155 static size_t transmit_request (void *cls, size_t size,
156                                 void *buf);
157
158 /**
159  * Issues transmit request for the new entries in the queue
160  * 
161  * @param h handle to the master context
162  */
163 static void
164 process_queue (struct GNUNET_SCALARPRODUCT_Handle *h);
165
166 /**************************************************************
167  ***  Static Function Declarations                   **********
168  **************************************************************/
169
170
171 /**
172  * Creates a new entry at the tail of the DLL
173  * 
174  * @param h handle to the master context
175  * 
176  * @return pointer to the entry
177  */
178 static struct GNUNET_SCALARPRODUCT_QueueEntry *
179 make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h)
180 {
181   struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
182
183   qe = GNUNET_new (struct GNUNET_SCALARPRODUCT_QueueEntry);
184
185   // if queue empty
186   if (NULL == h->queue_head && NULL == h->queue_tail)
187     {
188       qe->next = NULL;
189       qe->prev = NULL;
190       h->queue_head = qe;
191       h->queue_tail = qe;
192     }
193   else
194     {
195       qe->prev = h->queue_tail;
196       h->queue_tail->next = qe;
197       h->queue_tail = qe;
198     }
199
200   return qe;
201 }
202
203
204 /**
205  * Removes the head entry from the queue
206  * 
207  * @param h Handle to the master context
208  */
209 static struct GNUNET_SCALARPRODUCT_QueueEntry *
210 free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h)
211 {
212   struct GNUNET_SCALARPRODUCT_QueueEntry * qe = NULL;
213
214   GNUNET_assert (NULL != h);
215   if (NULL == h->queue_head && NULL == h->queue_tail)
216     {
217       // The queue is empty. Just return.
218       qe = NULL;
219       LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n");
220     }
221   else if (h->queue_head == h->queue_tail) //only one entry
222     {
223       qe = h->queue_head;
224       qe->next = NULL;
225       qe->prev = NULL;
226       h->queue_head = NULL;
227       h->queue_tail = NULL;
228     }
229   else
230     {
231       qe = h->queue_head;
232       h->queue_head = h->queue_head->next;
233       h->queue_head->prev = NULL;
234       qe->next = NULL;
235       qe->prev = NULL;
236     }
237   return qe;
238 }
239
240
241 /**
242  * Triggered when timeout occurs for a request in queue
243  * 
244  * @param cls The pointer to the QueueEntry
245  * @param tc Task Context
246  */
247 static void
248 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
249 {
250   struct GNUNET_SCALARPRODUCT_QueueEntry * qe = cls;
251
252   // Update Statistics
253   GNUNET_STATISTICS_update (qe->h->stats,
254                             gettext_noop ("# queue entry timeouts"), 1,
255                             GNUNET_NO);
256
257   // Clear the timeout_task
258   qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
259
260   // transmit_request is supposed to cancel timeout task.
261   // If message was not transmitted, there is definitely an error.
262   GNUNET_assert (GNUNET_NO == qe->was_transmitted);
263
264   LOG (GNUNET_ERROR_TYPE_INFO, "Timeout of request in datastore queue\n");
265
266   // remove the queue_entry for the queue
267   GNUNET_CONTAINER_DLL_remove (qe->h->queue_head, qe->h->queue_tail, qe);
268   qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Timeout);
269 }
270
271
272 /**
273  * Handles the RESULT received in reply of prepare_response from the 
274  * service
275  * 
276  * @param cls Handle to the Master Context
277  * @param msg Pointer to the response received
278  */
279 static void
280 process_status_message (void *cls,
281                         const struct GNUNET_MessageHeader *msg,
282                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
283 {
284   struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
285
286   GNUNET_assert (qe != NULL);
287
288   if (qe->cont_status != NULL)
289     qe->cont_status (qe->cont_cls, &qe->msg->key, status);
290 }
291
292
293 /**
294  * Handles the RESULT received in reply of prepare_response from the 
295  * service
296  * 
297  * @param cls Handle to the Master Context
298  * @param msg Pointer to the response received
299  */
300 static void
301 process_result_message (void *cls,
302                         const struct GNUNET_MessageHeader *msg,
303                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
304 {
305   struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
306
307   GNUNET_assert (qe != NULL);
308
309   if (msg == NULL && qe->cont_datum != NULL)
310     {
311       LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout reached or session terminated.\n");
312     }
313   if (qe->cont_datum != NULL)
314     {
315       qe->cont_datum (qe->cont_cls, &qe->msg->key, &qe->msg->peer, status, (struct GNUNET_SCALARPRODUCT_client_response *) msg);
316     }
317 }
318
319
320 /**
321  * Called when a response is received from the service. After basic check
322  * handler in qe->response_proc is called. This functions handles the response
323  * to the client which used the API.
324  * 
325  * @param cls Pointer to the Master Context
326  * @param msg Pointer to the data received in response
327  */
328 static void
329 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
330 {
331   struct GNUNET_SCALARPRODUCT_Handle *h = cls;
332   struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
333   int16_t was_transmitted;
334   struct GNUNET_SCALARPRODUCT_client_response *message =
335           (struct GNUNET_SCALARPRODUCT_client_response *) msg;
336
337   h->in_receive = GNUNET_NO;
338   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply from VectorProduct\n");
339
340   if (NULL == (qe = free_queue_head_entry (h)))
341     {
342       /**
343        * The queue head will be NULL if the client disconnected,
344        * * In case of Alice, client disconnected after sending request, before receiving response
345        * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
346        */
347       process_queue (h);
348       return;
349     }
350
351   if (h->client == NULL)
352     {
353       // GKUKREJA : handle this correctly
354       /**
355        * The queue head will be NULL if the client disconnected,
356        * * In case of Alice, client disconnected after sending request, before receiving response
357        * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
358        */
359       process_queue (h);
360       return;
361     }
362
363   was_transmitted = qe->was_transmitted;
364   // Control will only come here, when the request was transmitted to service,
365   // and service responded.
366   GNUNET_assert (was_transmitted == GNUNET_YES);
367
368   if (msg == NULL)
369     {
370       LOG (GNUNET_ERROR_TYPE_WARNING, "Service responded with NULL!\n");
371       qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
372     }
373   else if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT))
374     {
375       LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid Message Received\n");
376       qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_InvalidResponse);
377     }
378   else if (ntohl (message->product_length) == 0)
379     {
380       // response for the responder client, successful
381       GNUNET_STATISTICS_update (h->stats,
382                                 gettext_noop ("# SUC responder result messages received"), 1,
383                                 GNUNET_NO);
384
385       LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from service without product attached.\n");
386       qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
387     }
388   else if (ntohl (message->product_length) > 0)
389     {
390       // response for the requester client, successful
391       GNUNET_STATISTICS_update (h->stats,
392                                 gettext_noop ("# SUC requester result messages received"), 1,
393                                 GNUNET_NO);
394
395       LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from requester service for requester client.\n");
396       qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
397     }
398
399   GNUNET_free (qe);
400   process_queue (h);
401 }
402
403
404 /**
405  * Transmits the request to the VectorProduct Sevice
406  * 
407  * @param cls Closure
408  * @param size Size of the buffer
409  * @param buf Pointer to the buffer
410  * 
411  * @return Size of the message sent
412  */
413 static size_t
414 transmit_request (void *cls, size_t size,
415                   void *buf)
416 {
417   struct GNUNET_SCALARPRODUCT_Handle *h = cls;
418   struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
419   size_t msize;
420   
421   if (NULL == (qe = h->queue_head))
422     {
423       LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue head is NULL!\n\n");
424       return 0;
425     }
426
427   GNUNET_SCHEDULER_cancel (qe->timeout_task);
428   qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
429
430   h->th = NULL;
431   if (NULL == (qe = h->queue_head))
432     return 0; /* no entry in queue */
433   if (buf == NULL)
434     {
435       LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
436       GNUNET_STATISTICS_update (h->stats,
437                                 gettext_noop ("# transmission request failures"),
438                                 1, GNUNET_NO);
439       GNUNET_SCALARPRODUCT_disconnect (h);
440       return 0;
441     }
442   if (size < (msize = qe->message_size))
443     {
444       process_queue (h);
445       return 0;
446     }
447   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n",
448        msize);
449
450   memcpy (buf, qe->msg, size);
451   GNUNET_free (qe->msg);
452   qe->was_transmitted = GNUNET_YES;
453
454   GNUNET_assert (GNUNET_NO == h->in_receive);
455   h->in_receive = GNUNET_YES;
456
457   GNUNET_CLIENT_receive (h->client, &receive_cb, h,
458                          GNUNET_TIME_UNIT_FOREVER_REL);
459
460 #if INSANE_STATISTICS
461   GNUNET_STATISTICS_update (h->stats,
462                             gettext_noop ("# bytes sent to scalarproduct"), 1,
463                             GNUNET_NO);
464 #endif
465   return size;
466 }
467
468
469 /**
470  * Issues transmit request for the new entries in the queue
471  * 
472  * @param h handle to the master context
473  */
474 static void
475 process_queue (struct GNUNET_SCALARPRODUCT_Handle *h)
476 {
477   struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
478   
479   if (NULL == (qe = h->queue_head))
480     {
481       LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
482       return; /* no entry in queue */
483     }
484   if (qe->was_transmitted == GNUNET_YES)
485     {
486       LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
487       return; /* waiting for replies */
488     }
489   if (h->th != NULL)
490     {
491       LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
492       return; /* request pending */
493     }
494   if (h->client == NULL)
495     {
496       LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
497       return; /* waiting for reconnect */
498     }
499   if (GNUNET_YES == h->in_receive)
500     {
501       /* wait for response to previous query */
502       return;
503     }
504
505   h->th =
506           GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
507                                                GNUNET_TIME_UNIT_FOREVER_REL,
508                                                GNUNET_YES,
509                                                &transmit_request, h);
510
511   if (h->th == NULL)
512     {
513       LOG (GNUNET_ERROR_TYPE_ERROR,
514            _ ("Failed to send a message to the scalarproduct service\n"));
515       return;
516     }
517
518   GNUNET_assert (GNUNET_NO == h->in_receive);
519   GNUNET_break (NULL != h->th);
520 }
521
522
523
524 /**************************************************************
525  ***  API                                            **********
526  **************************************************************/
527
528
529 /**
530  * Used by Bob's client to cooperate with Alice, 
531  * 
532  * @param h handle to the master context
533  * @param key Session key - unique to the requesting client
534  * @param elements Array of elements of the vector
535  * @param element_count Number of elements in the vector
536  * @param cont Callback function
537  * @param cont_cls Closure for the callback function
538  */
539 struct GNUNET_SCALARPRODUCT_Handle *
540 GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
541                                const struct GNUNET_HashCode * key,
542                                const int32_t * elements,
543                                uint32_t element_count,
544                                GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
545                                void *cont_cls)
546 {
547   struct GNUNET_SCALARPRODUCT_Handle *h;
548   struct GNUNET_SCALARPRODUCT_client_request *msg;
549   int32_t * vector;
550   uint16_t size;
551   uint64_t i;
552   
553   GNUNET_assert(key);
554   GNUNET_assert(elements);
555   GNUNET_assert(cont);
556   GNUNET_assert(element_count > 1);
557   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
558                                                    + element_count * sizeof (int32_t));
559   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
560   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
561   if (!h->client)
562     {
563       LOG (GNUNET_ERROR_TYPE_ERROR,
564            _ ("Failed to connect to the scalarproduct service\n"));
565       GNUNET_free(h);
566       return NULL;
567     }
568   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
569   if (!h->th){
570       LOG (GNUNET_ERROR_TYPE_ERROR,
571            _("Failed to send a message to the statistics service\n"));
572       GNUNET_CLIENT_disconnect(h->client);
573       GNUNET_free(h);
574       return NULL;
575   }
576   
577   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t);
578   
579   h->cont_datum = cont;
580   h->cont_cls = cont_cls;
581   h->response_proc = &process_result_message;
582   h->cfg = cfg;
583   h->msg = GNUNET_malloc (size);
584   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
585   
586   msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
587   msg->header.size = htons (size);
588   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
589   msg->element_count = htonl (element_count);
590   
591   vector = (int32_t*) &msg[1];
592   // copy each element over to the message
593   for (i = 0; i < element_count; i++)
594     vector[i] = htonl(elements[i]);
595
596   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
597   
598   
599   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
600                                                GNUNET_TIME_UNIT_FOREVER_REL,
601                                                GNUNET_YES, // retry is OK in the initial stage
602                                                &transmit_request, h);
603   if (!h->th)
604     {
605       LOG (GNUNET_ERROR_TYPE_ERROR,
606            _ ("Failed to send a message to the scalarproduct service\n"));
607       GNUNET_STATISTICS_destroy(h->GNUNET_YES);
608       GNUNET_CLIENT_disconnect(h->client);
609       GNUNET_free(h->msg);
610       GNUNET_free(h);
611       return NULL;
612     }
613   return h;
614 }
615
616
617 /**
618  * Request by Alice's client for computing a scalar product
619  * 
620  * @param h handle to the master context
621  * @param key Session key - unique to the requesting client
622  * @param peer PeerID of the other peer
623  * @param elements Array of elements of the vector
624  * @param element_count Number of elements in the vector
625  * @param mask Array of the mask
626  * @param mask_bytes number of bytes in the mask
627  * @param cont Callback function
628  * @param cont_cls Closure for the callback function
629  */
630 struct GNUNET_SCALARPRODUCT_Handle *
631 GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
632                               const struct GNUNET_HashCode * key,
633                               const struct GNUNET_PeerIdentity *peer,
634                               const int32_t * elements,
635                               uint32_t element_count,
636                               const unsigned char * mask,
637                               uint32_t mask_bytes,
638                               GNUNET_SCALARPRODUCT_DatumProcessor cont,
639                               void *cont_cls)
640 {
641   struct GNUNET_CLIENT_Connection *client;
642   struct GNUNET_SCALARPRODUCT_Handle *h;
643   struct GNUNET_SCALARPRODUCT_client_request *msg;
644   int32_t * vector;
645   uint16_t size;
646   uint64_t i;
647   
648   GNUNET_assert(key);
649   GNUNET_assert(peer);
650   GNUNET_assert(elements);
651   GNUNET_assert(mask);
652   GNUNET_assert(cont);
653   GNUNET_assert(element_count > 1);
654   GNUNET_assert(mask_bytes != 0);
655   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
656                                                    + element_count * sizeof (int32_t)
657                                                    + mask_length);
658   client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
659
660   if (!client)
661     {
662       LOG (GNUNET_ERROR_TYPE_ERROR,
663            _ ("Failed to connect to the scalarproduct service\n"));
664       return NULL;
665     }
666   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length;
667   
668   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
669   h->cont_datum = cont;
670   h->cont_cls = cont_cls;
671   h->response_proc = &process_status_message;
672   h->client = client;
673   h->cfg = cfg;
674   h->msg = GNUNET_malloc (size);
675   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
676   
677   msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
678   msg->header.size = htons (size);
679   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
680   msg->element_count = htons (element_count);
681   msg->mask_length = htons (mask_length);
682   
683   vector = (int32_t*) &msg[1];
684   // copy each element over to the message
685   for (i = 0; i < element_count; i++)
686     vector[i] = htonl(elements[i]);
687
688   memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
689   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
690   memcpy (&vector[element_count], mask, mask_length);
691   
692   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
693   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
694                                                GNUNET_TIME_UNIT_FOREVER_REL,
695                                                GNUNET_YES, // retry is OK in the initial stage
696                                                &transmit_request, h);
697   if ( !h->th)
698     {
699       LOG (GNUNET_ERROR_TYPE_ERROR,
700            _ ("Failed to send a message to the scalarproduct service\n"));
701       return NULL;
702     }
703   return h;
704 }
705
706 /**
707  * Disconnect from the scalarproduct service.
708  * 
709  * @param h handle to the scalarproduct
710  */
711 void
712 GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_Handle * h)
713 {
714   struct GNUNET_SCALARPRODUCT_QueueEntry * qe;
715
716   LOG (GNUNET_ERROR_TYPE_INFO,
717        "Disconnecting from VectorProduct\n");
718
719   while (NULL != h->queue_head)
720     {
721       GNUNET_assert (NULL != (qe = free_queue_head_entry (h)));
722       qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
723     }
724
725   if (h->client != NULL)
726     {
727       GNUNET_CLIENT_disconnect (h->client);
728       h->client = NULL;
729     }
730
731   GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
732   h->stats = NULL;
733 }
734
735 /* end of ext_api.c */