3eae200c3a98947a039eca6fb2ec3158230ca028
[oweals/gnunet.git] / src / scalarproduct / scalarproduct_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file scalarproduct/scalarproduct_api.c
23  * @brief API for the scalarproduct
24  * @author Christian Fuchs
25  * @author Gaurav Kukreja
26  * 
27  */
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_scalarproduct_service.h"
32 #include "gnunet_protocols.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
35
36 /**************************************************************
37  ***  Datatype Declarations                          **********
38  **************************************************************/
39
40 /**
41  * Entry in the request queue per client
42  */
43 struct GNUNET_SCALARPRODUCT_QueueEntry
44 {
45   /**
46    * This is a linked list.
47    */
48   struct GNUNET_SCALARPRODUCT_QueueEntry *next;
49
50   /**
51    * This is a linked list.
52    */
53   struct GNUNET_SCALARPRODUCT_QueueEntry *prev;
54
55   /**
56    * Handle to the master context.
57    */
58   struct GNUNET_SCALARPRODUCT_Handle *h;
59
60   /**
61    * Size of the message
62    */
63   uint16_t message_size;
64
65   /**
66    * Message to be sent to the scalarproduct service
67    */
68   struct GNUNET_SCALARPRODUCT_client_request* msg;
69
70   union
71   {
72     /**
73      * Function to call after transmission of the request.
74      */
75     GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
76
77     /**
78      * Function to call after transmission of the request.
79      */
80     GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
81   };
82
83   /**
84    * Closure for 'cont'.
85    */
86   void *cont_cls;
87
88   /**
89    * Has this message been transmitted to the service?
90    * Only ever GNUNET_YES for the head of the queue.
91    * Note that the overall struct should end at a
92    * multiple of 64 bits.
93    */
94   int16_t was_transmitted;
95
96   /**
97    * Response Processor for response from the service. This function calls the
98    * continuation function provided by the client.
99    */
100   GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
101 };
102
103 /**************************************************************
104  ***  Function Declarations                          **********
105  **************************************************************/
106
107 /**
108  * Creates a new entry at the tail of the DLL
109  * 
110  * @param h handle to the master context
111  * 
112  * @return pointer to the entry
113  */
114 static struct GNUNET_SCALARPRODUCT_QueueEntry *
115 make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h);
116
117 /**
118  * Removes the head entry from the queue
119  * 
120  * @param h Handle to the master context
121  */
122 static struct GNUNET_SCALARPRODUCT_QueueEntry *
123 free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h);
124
125 /**
126  * Triggered when timeout occurs for a request in queue
127  * 
128  * @param cls The pointer to the QueueEntry
129  * @param tc Task Context
130  */
131 static void
132 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
133
134 /**
135  * Called when a response is received from the service. After basic check
136  * handler in qe->response_proc is called. This functions handles the response
137  * to the client which used the API.
138  * 
139  * @param cls Pointer to the Master Context
140  * @param msg Pointer to the data received in response
141  */
142 static void
143 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg);
144
145 /**
146  * Transmits the request to the VectorProduct Sevice
147  * 
148  * @param cls Closure
149  * @param size Size of the buffer
150  * @param buf Pointer to the buffer
151  * 
152  * @return Size of the message sent
153  */
154 static size_t transmit_request (void *cls, size_t size,
155                                 void *buf);
156
157 /**
158  * Issues transmit request for the new entries in the queue
159  * 
160  * @param h handle to the master context
161  */
162 static void
163 process_queue (struct GNUNET_SCALARPRODUCT_Handle *h);
164
165 /**************************************************************
166  ***  Static Function Declarations                   **********
167  **************************************************************/
168
169
170 /**
171  * Creates a new entry at the tail of the DLL
172  * 
173  * @param h handle to the master context
174  * 
175  * @return pointer to the entry
176  */
177 static struct GNUNET_SCALARPRODUCT_QueueEntry *
178 make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h)
179 {
180   struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
181
182   qe = GNUNET_new (struct GNUNET_SCALARPRODUCT_QueueEntry);
183
184   // if queue empty
185   if (NULL == h->queue_head && NULL == h->queue_tail)
186     {
187       qe->next = NULL;
188       qe->prev = NULL;
189       h->queue_head = qe;
190       h->queue_tail = qe;
191     }
192   else
193     {
194       qe->prev = h->queue_tail;
195       h->queue_tail->next = qe;
196       h->queue_tail = qe;
197     }
198
199   return qe;
200 }
201
202
203 /**
204  * Removes the head entry from the queue
205  * 
206  * @param h Handle to the master context
207  */
208 static struct GNUNET_SCALARPRODUCT_QueueEntry *
209 free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h)
210 {
211   struct GNUNET_SCALARPRODUCT_QueueEntry * qe = NULL;
212
213   GNUNET_assert (NULL != h);
214   if (NULL == h->queue_head && NULL == h->queue_tail)
215     {
216       // The queue is empty. Just return.
217       qe = NULL;
218       LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n");
219     }
220   else if (h->queue_head == h->queue_tail) //only one entry
221     {
222       qe = h->queue_head;
223       qe->next = NULL;
224       qe->prev = NULL;
225       h->queue_head = NULL;
226       h->queue_tail = NULL;
227     }
228   else
229     {
230       qe = h->queue_head;
231       h->queue_head = h->queue_head->next;
232       h->queue_head->prev = NULL;
233       qe->next = NULL;
234       qe->prev = NULL;
235     }
236   return qe;
237 }
238
239
240 /**
241  * Triggered when timeout occurs for a request in queue
242  * 
243  * @param cls The pointer to the QueueEntry
244  * @param tc Task Context
245  */
246 static void
247 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
248 {
249   struct GNUNET_SCALARPRODUCT_QueueEntry * qe = cls;
250
251   // Update Statistics
252   GNUNET_STATISTICS_update (qe->h->stats,
253                             gettext_noop ("# queue entry timeouts"), 1,
254                             GNUNET_NO);
255
256   // Clear the timeout_task
257   qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
258
259   // transmit_request is supposed to cancel timeout task.
260   // If message was not transmitted, there is definitely an error.
261   GNUNET_assert (GNUNET_NO == qe->was_transmitted);
262
263   LOG (GNUNET_ERROR_TYPE_INFO, "Timeout of request in datastore queue\n");
264
265   // remove the queue_entry for the queue
266   GNUNET_CONTAINER_DLL_remove (qe->h->queue_head, qe->h->queue_tail, qe);
267   qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Timeout);
268 }
269
270
271 /**
272  * Handles the RESULT received in reply of prepare_response from the 
273  * service
274  * 
275  * @param cls Handle to the Master Context
276  * @param msg Pointer to the response received
277  */
278 static void
279 process_status_message (void *cls,
280                         const struct GNUNET_MessageHeader *msg,
281                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
282 {
283   struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
284
285   GNUNET_assert (qe != NULL);
286
287   if (qe->cont_status != NULL)
288     qe->cont_status (qe->cont_cls, &qe->msg->key, status);
289 }
290
291
292 /**
293  * Handles the RESULT received in reply of prepare_response from the 
294  * service
295  * 
296  * @param cls Handle to the Master Context
297  * @param msg Pointer to the response received
298  */
299 static void
300 process_result_message (void *cls,
301                         const struct GNUNET_MessageHeader *msg,
302                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
303 {
304   struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls;
305
306   GNUNET_assert (qe != NULL);
307
308   if (msg == NULL && qe->cont_datum != NULL)
309     {
310       LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout reached or session terminated.\n");
311     }
312   if (qe->cont_datum != NULL)
313     {
314       qe->cont_datum (qe->cont_cls, &qe->msg->key, &qe->msg->peer, status, (struct GNUNET_SCALARPRODUCT_client_response *) msg);
315     }
316 }
317
318
319 /**
320  * Called when a response is received from the service. After basic check
321  * handler in qe->response_proc is called. This functions handles the response
322  * to the client which used the API.
323  * 
324  * @param cls Pointer to the Master Context
325  * @param msg Pointer to the data received in response
326  */
327 static void
328 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
329 {
330   struct GNUNET_SCALARPRODUCT_Handle *h = cls;
331   struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
332   int16_t was_transmitted;
333   struct GNUNET_SCALARPRODUCT_client_response *message =
334           (struct GNUNET_SCALARPRODUCT_client_response *) msg;
335
336   h->in_receive = GNUNET_NO;
337   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply from VectorProduct\n");
338
339   if (NULL == (qe = free_queue_head_entry (h)))
340     {
341       /**
342        * The queue head will be NULL if the client disconnected,
343        * * In case of Alice, client disconnected after sending request, before receiving response
344        * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
345        */
346       process_queue (h);
347       return;
348     }
349
350   if (h->client == NULL)
351     {
352       // GKUKREJA : handle this correctly
353       /**
354        * The queue head will be NULL if the client disconnected,
355        * * In case of Alice, client disconnected after sending request, before receiving response
356        * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
357        */
358       process_queue (h);
359       return;
360     }
361
362   was_transmitted = qe->was_transmitted;
363   // Control will only come here, when the request was transmitted to service,
364   // and service responded.
365   GNUNET_assert (was_transmitted == GNUNET_YES);
366
367   if (msg == NULL)
368     {
369       LOG (GNUNET_ERROR_TYPE_WARNING, "Service responded with NULL!\n");
370       qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
371     }
372   else if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT))
373     {
374       LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid Message Received\n");
375       qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_InvalidResponse);
376     }
377   else if (ntohl (message->product_length) == 0)
378     {
379       // response for the responder client, successful
380       GNUNET_STATISTICS_update (h->stats,
381                                 gettext_noop ("# SUC responder result messages received"), 1,
382                                 GNUNET_NO);
383
384       LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from service without product attached.\n");
385       qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
386     }
387   else if (ntohl (message->product_length) > 0)
388     {
389       // response for the requester client, successful
390       GNUNET_STATISTICS_update (h->stats,
391                                 gettext_noop ("# SUC requester result messages received"), 1,
392                                 GNUNET_NO);
393
394       LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from requester service for requester client.\n");
395       qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
396     }
397
398   GNUNET_free (qe);
399   process_queue (h);
400 }
401
402
403 /**
404  * Transmits the request to the VectorProduct Sevice
405  * 
406  * @param cls Closure
407  * @param size Size of the buffer
408  * @param buf Pointer to the buffer
409  * 
410  * @return Size of the message sent
411  */
412 static size_t
413 transmit_request (void *cls, size_t size,
414                   void *buf)
415 {
416   struct GNUNET_SCALARPRODUCT_Handle *h = cls;
417   struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
418   size_t msize;
419   
420   if (NULL == (qe = h->queue_head))
421     {
422       LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue head is NULL!\n\n");
423       return 0;
424     }
425
426   GNUNET_SCHEDULER_cancel (qe->timeout_task);
427   qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
428
429   h->th = NULL;
430   if (NULL == (qe = h->queue_head))
431     return 0; /* no entry in queue */
432   if (buf == NULL)
433     {
434       LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
435       GNUNET_STATISTICS_update (h->stats,
436                                 gettext_noop ("# transmission request failures"),
437                                 1, GNUNET_NO);
438       GNUNET_SCALARPRODUCT_disconnect (h);
439       return 0;
440     }
441   if (size < (msize = qe->message_size))
442     {
443       process_queue (h);
444       return 0;
445     }
446   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n",
447        msize);
448
449   memcpy (buf, qe->msg, size);
450   GNUNET_free (qe->msg);
451   qe->was_transmitted = GNUNET_YES;
452
453   GNUNET_assert (GNUNET_NO == h->in_receive);
454   h->in_receive = GNUNET_YES;
455
456   GNUNET_CLIENT_receive (h->client, &receive_cb, h,
457                          GNUNET_TIME_UNIT_FOREVER_REL);
458
459 #if INSANE_STATISTICS
460   GNUNET_STATISTICS_update (h->stats,
461                             gettext_noop ("# bytes sent to scalarproduct"), 1,
462                             GNUNET_NO);
463 #endif
464   return size;
465 }
466
467
468 /**
469  * Issues transmit request for the new entries in the queue
470  * 
471  * @param h handle to the master context
472  */
473 static void
474 process_queue (struct GNUNET_SCALARPRODUCT_Handle *h)
475 {
476   struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
477   
478   if (NULL == (qe = h->queue_head))
479     {
480       LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
481       return; /* no entry in queue */
482     }
483   if (qe->was_transmitted == GNUNET_YES)
484     {
485       LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
486       return; /* waiting for replies */
487     }
488   if (h->th != NULL)
489     {
490       LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
491       return; /* request pending */
492     }
493   if (h->client == NULL)
494     {
495       LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
496       return; /* waiting for reconnect */
497     }
498   if (GNUNET_YES == h->in_receive)
499     {
500       /* wait for response to previous query */
501       return;
502     }
503
504   h->th =
505           GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
506                                                GNUNET_TIME_UNIT_FOREVER_REL,
507                                                GNUNET_YES,
508                                                &transmit_request, h);
509
510   if (h->th == NULL)
511     {
512       LOG (GNUNET_ERROR_TYPE_ERROR,
513            _ ("Failed to send a message to the scalarproduct service\n"));
514       return;
515     }
516
517   GNUNET_assert (GNUNET_NO == h->in_receive);
518   GNUNET_break (NULL != h->th);
519 }
520
521
522
523 /**************************************************************
524  ***  API                                            **********
525  **************************************************************/
526
527
528 /**
529  * Used by Bob's client to cooperate with Alice, 
530  * 
531  * @param h handle to the master context
532  * @param key Session key - unique to the requesting client
533  * @param elements Array of elements of the vector
534  * @param element_count Number of elements in the vector
535  * @param cont Callback function
536  * @param cont_cls Closure for the callback function
537  */
538 struct GNUNET_SCALARPRODUCT_Handle *
539 GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
540                                const struct GNUNET_HashCode * key,
541                                const int32_t * elements,
542                                uint32_t element_count,
543                                GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
544                                void *cont_cls)
545 {
546   struct GNUNET_SCALARPRODUCT_Handle *h;
547   struct GNUNET_SCALARPRODUCT_client_request *msg;
548   int32_t * vector;
549   uint16_t size;
550   uint64_t i;
551   
552   GNUNET_assert(key);
553   GNUNET_assert(elements);
554   GNUNET_assert(cont);
555   GNUNET_assert(element_count > 1);
556   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
557                                                    + element_count * sizeof (int32_t));
558   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
559   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
560   if (!h->client)
561     {
562       LOG (GNUNET_ERROR_TYPE_ERROR,
563            _ ("Failed to connect to the scalarproduct service\n"));
564       GNUNET_free(h);
565       return NULL;
566     }
567   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
568   if (!h->th){
569       LOG (GNUNET_ERROR_TYPE_ERROR,
570            _("Failed to send a message to the statistics service\n"));
571       GNUNET_CLIENT_disconnect(h->client);
572       GNUNET_free(h);
573       return NULL;
574   }
575   
576   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t);
577   
578   h->cont_datum = cont;
579   h->cont_cls = cont_cls;
580   h->response_proc = &process_result_message;
581   h->cfg = cfg;
582   h->msg = GNUNET_malloc (size);
583   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
584   
585   msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
586   msg->header.size = htons (size);
587   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
588   msg->element_count = htonl (element_count);
589   
590   vector = (int32_t*) &msg[1];
591   // copy each element over to the message
592   for (i = 0; i < element_count; i++)
593     vector[i] = htonl(elements[i]);
594
595   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
596   
597   
598   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
599                                                GNUNET_TIME_UNIT_FOREVER_REL,
600                                                GNUNET_YES, // retry is OK in the initial stage
601                                                &transmit_request, h);
602   if (!h->th)
603     {
604       LOG (GNUNET_ERROR_TYPE_ERROR,
605            _ ("Failed to send a message to the scalarproduct service\n"));
606       GNUNET_STATISTICS_destroy(h->GNUNET_YES);
607       GNUNET_CLIENT_disconnect(h->client);
608       GNUNET_free(h->msg);
609       GNUNET_free(h);
610       return NULL;
611     }
612   return h;
613 }
614
615
616 /**
617  * Request by Alice's client for computing a scalar product
618  * 
619  * @param h handle to the master context
620  * @param key Session key - unique to the requesting client
621  * @param peer PeerID of the other peer
622  * @param elements Array of elements of the vector
623  * @param element_count Number of elements in the vector
624  * @param mask Array of the mask
625  * @param mask_bytes number of bytes in the mask
626  * @param cont Callback function
627  * @param cont_cls Closure for the callback function
628  */
629 struct GNUNET_SCALARPRODUCT_Handle *
630 GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
631                               const struct GNUNET_HashCode * key,
632                               const struct GNUNET_PeerIdentity *peer,
633                               const int32_t * elements,
634                               uint32_t element_count,
635                               const unsigned char * mask,
636                               uint32_t mask_bytes,
637                               GNUNET_SCALARPRODUCT_DatumProcessor cont,
638                               void *cont_cls)
639 {
640   struct GNUNET_CLIENT_Connection *client;
641   struct GNUNET_SCALARPRODUCT_Handle *h;
642   struct GNUNET_SCALARPRODUCT_client_request *msg;
643   int32_t * vector;
644   uint16_t size;
645   uint64_t i;
646   
647   GNUNET_assert(key);
648   GNUNET_assert(peer);
649   GNUNET_assert(elements);
650   GNUNET_assert(mask);
651   GNUNET_assert(cont);
652   GNUNET_assert(element_count > 1);
653   GNUNET_assert(mask_bytes != 0);
654   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
655                                                    + element_count * sizeof (int32_t)
656                                                    + mask_length);
657   client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
658
659   if (!client)
660     {
661       LOG (GNUNET_ERROR_TYPE_ERROR,
662            _ ("Failed to connect to the scalarproduct service\n"));
663       return NULL;
664     }
665   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length;
666   
667   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
668   h->cont_datum = cont;
669   h->cont_cls = cont_cls;
670   h->response_proc = &process_status_message;
671   h->client = client;
672   h->cfg = cfg;
673   h->msg = GNUNET_malloc (size);
674   memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
675   
676   msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
677   msg->header.size = htons (size);
678   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
679   msg->element_count = htons (element_count);
680   msg->mask_length = htons (mask_length);
681   
682   vector = (int32_t*) &msg[1];
683   // copy each element over to the message
684   for (i = 0; i < element_count; i++)
685     vector[i] = htonl(elements[i]);
686
687   memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
688   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
689   memcpy (&vector[element_count], mask, mask_length);
690   
691   h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
692   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
693                                                GNUNET_TIME_UNIT_FOREVER_REL,
694                                                GNUNET_YES, // retry is OK in the initial stage
695                                                &transmit_request, h);
696   if ( !h->th)
697     {
698       LOG (GNUNET_ERROR_TYPE_ERROR,
699            _ ("Failed to send a message to the scalarproduct service\n"));
700       return NULL;
701     }
702   return h;
703 }
704
705 /**
706  * Disconnect from the scalarproduct service.
707  * 
708  * @param h handle to the scalarproduct
709  */
710 void
711 GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_Handle * h)
712 {
713   struct GNUNET_SCALARPRODUCT_QueueEntry * qe;
714
715   LOG (GNUNET_ERROR_TYPE_INFO,
716        "Disconnecting from VectorProduct\n");
717
718   while (NULL != h->queue_head)
719     {
720       GNUNET_assert (NULL != (qe = free_queue_head_entry (h)));
721       qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
722     }
723
724   if (h->client != NULL)
725     {
726       GNUNET_CLIENT_disconnect (h->client);
727       h->client = NULL;
728     }
729
730   GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
731   h->stats = NULL;
732 }
733
734 /* end of ext_api.c */