Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / util / client.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2001-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file util/client.c
23  * @brief code for access to services
24  * @author Christian Grothoff
25  *
26  * Generic TCP code for reliable, record-oriented TCP
27  * connections between clients and service providers.
28  */
29 #include "platform.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_resolver_service.h"
33 #include "gnunet_socks.h"
34
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "util-client",__VA_ARGS__)
37
38
39 /**
40  * Internal state for a client connected to a GNUnet service.
41  */
42 struct ClientState;
43
44
45 /**
46  * During connect, we try multiple possible IP addresses
47  * to find out which one might work.
48  */
49 struct AddressProbe
50 {
51
52   /**
53    * This is a linked list.
54    */
55   struct AddressProbe *next;
56
57   /**
58    * This is a doubly-linked list.
59    */
60   struct AddressProbe *prev;
61
62   /**
63    * The address; do not free (allocated at the end of this struct).
64    */
65   const struct sockaddr *addr;
66
67   /**
68    * Underlying OS's socket.
69    */
70   struct GNUNET_NETWORK_Handle *sock;
71
72   /**
73    * Connection for which we are probing.
74    */
75   struct ClientState *cstate;
76
77   /**
78    * Lenth of addr.
79    */
80   socklen_t addrlen;
81
82   /**
83    * Task waiting for the connection to finish connecting.
84    */
85   struct GNUNET_SCHEDULER_Task *task;
86 };
87
88
89 /**
90  * Internal state for a client connected to a GNUnet service.
91  */
92 struct ClientState
93 {
94
95   /**
96    * The connection handle, NULL if not live
97    */
98   struct GNUNET_NETWORK_Handle *sock;
99
100   /**
101    * Handle to a pending DNS lookup request, NULL if DNS is finished.
102    */
103   struct GNUNET_RESOLVER_RequestHandle *dns_active;
104
105   /**
106    * Our configuration.
107    */
108   const struct GNUNET_CONFIGURATION_Handle *cfg;
109
110   /**
111    * Linked list of sockets we are currently trying out
112    * (during connect).
113    */
114   struct AddressProbe *ap_head;
115
116   /**
117    * Linked list of sockets we are currently trying out
118    * (during connect).
119    */
120   struct AddressProbe *ap_tail;
121
122   /**
123    * Name of the service we interact with.
124    */
125   char *service_name;
126
127   /**
128    * Hostname, if any.
129    */
130   char *hostname;
131
132   /**
133    * Next message to transmit to the service. NULL for none.
134    */
135   const struct GNUNET_MessageHeader *msg;
136
137   /**
138    * Task for trying to connect to the service.
139    */
140   struct GNUNET_SCHEDULER_Task *retry_task;
141
142   /**
143    * Task for sending messages to the service.
144    */
145   struct GNUNET_SCHEDULER_Task *send_task;
146
147   /**
148    * Task for sending messages to the service.
149    */
150   struct GNUNET_SCHEDULER_Task *recv_task;
151
152   /**
153    * Tokenizer for inbound messages.
154    */
155   struct GNUNET_MessageStreamTokenizer *mst;
156
157   /**
158    * Message queue under our control.
159    */
160   struct GNUNET_MQ_Handle *mq;
161
162   /**
163    * Timeout for receiving a response (absolute time).
164    */
165   struct GNUNET_TIME_Absolute receive_timeout;
166
167   /**
168    * Current value for our incremental back-off (for
169    * connect re-tries).
170    */
171   struct GNUNET_TIME_Relative back_off;
172
173   /**
174    * TCP port (0 for disabled).
175    */
176   unsigned long long port;
177
178   /**
179    * Offset in the message where we are for transmission.
180    */
181   size_t msg_off;
182
183   /**
184    * How often have we tried to connect?
185    */
186   unsigned int attempts;
187
188   /**
189    * Are we supposed to die?  #GNUNET_SYSERR if destruction must be
190    * deferred, #GNUNET_NO by default, #GNUNET_YES if destruction was
191    * deferred.
192    */
193   int in_destroy;
194
195 };
196
197
198 /**
199  * Try to connect to the service.
200  *
201  * @param cls the `struct ClientState` to try to connect to the service
202  */
203 static void
204 start_connect (void *cls);
205
206
207 /**
208  * We've failed for good to establish a connection (timeout or
209  * no more addresses to try).
210  *
211  * @param cstate the connection we tried to establish
212  */
213 static void
214 connect_fail_continuation (struct ClientState *cstate)
215 {
216   GNUNET_break (NULL == cstate->ap_head);
217   GNUNET_break (NULL == cstate->ap_tail);
218   GNUNET_break (NULL == cstate->dns_active);
219   GNUNET_break (NULL == cstate->sock);
220   GNUNET_assert (NULL == cstate->send_task);
221   GNUNET_assert (NULL == cstate->recv_task);
222   // GNUNET_assert (NULL == cstate->proxy_handshake);
223
224   cstate->back_off = GNUNET_TIME_STD_BACKOFF (cstate->back_off);
225   LOG (GNUNET_ERROR_TYPE_DEBUG,
226        "Failed to establish connection to `%s', no further addresses to try, will try again in %s.\n",
227        cstate->service_name,
228        GNUNET_STRINGS_relative_time_to_string (cstate->back_off,
229                                                GNUNET_YES));
230   cstate->retry_task
231     = GNUNET_SCHEDULER_add_delayed (cstate->back_off,
232                                     &start_connect,
233                                     cstate);
234 }
235
236
237 /**
238  * We are ready to send a message to the service.
239  *
240  * @param cls the `struct ClientState` with the `msg` to transmit
241  */
242 static void
243 transmit_ready (void *cls)
244 {
245   struct ClientState *cstate = cls;
246   ssize_t ret;
247   size_t len;
248   const char *pos;
249   int notify_in_flight;
250
251   cstate->send_task = NULL;
252   pos = (const char *) cstate->msg;
253   len = ntohs (cstate->msg->size);
254   GNUNET_assert (cstate->msg_off < len);
255  RETRY:
256   ret = GNUNET_NETWORK_socket_send (cstate->sock,
257                                     &pos[cstate->msg_off],
258                                     len - cstate->msg_off);
259   if (-1 == ret)
260   {
261     if (EINTR == errno)
262       goto RETRY;
263     GNUNET_MQ_inject_error (cstate->mq,
264                             GNUNET_MQ_ERROR_WRITE);
265     return;
266   }
267   notify_in_flight = (0 == cstate->msg_off);
268   cstate->msg_off += ret;
269   if (cstate->msg_off < len)
270   {
271     cstate->send_task
272       = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
273                                         cstate->sock,
274                                         &transmit_ready,
275                                         cstate);
276     if (notify_in_flight)
277       GNUNET_MQ_impl_send_in_flight (cstate->mq);
278     return;
279   }
280   cstate->msg = NULL;
281   GNUNET_MQ_impl_send_continue (cstate->mq);
282 }
283
284
285 /**
286  * We have received a full message, pass to the MQ dispatcher.
287  * Called by the tokenizer via #receive_ready().
288  *
289  * @param cls the `struct ClientState`
290  * @param msg message we received.
291  * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
292  */
293 static int
294 recv_message (void *cls,
295               const struct GNUNET_MessageHeader *msg)
296 {
297   struct ClientState *cstate = cls;
298
299   if (GNUNET_YES == cstate->in_destroy)
300     return GNUNET_SYSERR;
301
302   LOG (GNUNET_ERROR_TYPE_INFO,
303        "Received message of type %u and size %u from %s\n",
304        ntohs (msg->type), ntohs (msg->size), cstate->service_name);
305
306   GNUNET_MQ_inject_message (cstate->mq,
307                             msg);
308   if (GNUNET_YES == cstate->in_destroy)
309     return GNUNET_SYSERR;
310   return GNUNET_OK;
311 }
312
313
314 /**
315  * Cancel all remaining connect attempts
316  *
317  * @param cstate handle of the client state to process
318  */
319 static void
320 cancel_aps (struct ClientState *cstate)
321 {
322   struct AddressProbe *pos;
323
324   while (NULL != (pos = cstate->ap_head))
325   {
326     GNUNET_break (GNUNET_OK ==
327                   GNUNET_NETWORK_socket_close (pos->sock));
328     GNUNET_SCHEDULER_cancel (pos->task);
329     GNUNET_CONTAINER_DLL_remove (cstate->ap_head,
330                                  cstate->ap_tail,
331                                  pos);
332     GNUNET_free (pos);
333   }
334 }
335
336
337 /**
338  * Implement the destruction of a message queue.  Implementations must
339  * not free @a mq, but should take care of @a impl_state.
340  *
341  * @param mq the message queue to destroy
342  * @param impl_state our `struct ClientState`
343  */
344 static void
345 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
346                                 void *impl_state)
347 {
348   struct ClientState *cstate = impl_state;
349
350   if (GNUNET_SYSERR == cstate->in_destroy)
351   {
352     /* defer destruction */
353     cstate->in_destroy = GNUNET_YES;
354     cstate->mq = NULL;
355     return;
356   }
357   if (NULL != cstate->dns_active)
358     GNUNET_RESOLVER_request_cancel (cstate->dns_active);
359   if (NULL != cstate->send_task)
360     GNUNET_SCHEDULER_cancel (cstate->send_task);
361   if (NULL != cstate->recv_task)
362     GNUNET_SCHEDULER_cancel (cstate->recv_task);
363   if (NULL != cstate->retry_task)
364     GNUNET_SCHEDULER_cancel (cstate->retry_task);
365   if (NULL != cstate->sock)
366     GNUNET_NETWORK_socket_close (cstate->sock);
367   cancel_aps (cstate);
368   GNUNET_free (cstate->service_name);
369   GNUNET_free_non_null (cstate->hostname);
370   GNUNET_MST_destroy (cstate->mst);
371   GNUNET_free (cstate);
372 }
373
374
375 /**
376  * This function is called once we have data ready to read.
377  *
378  * @param cls `struct ClientState` with connection to read from
379  */
380 static void
381 receive_ready (void *cls)
382 {
383   struct ClientState *cstate = cls;
384   int ret;
385
386   cstate->recv_task = NULL;
387   cstate->in_destroy = GNUNET_SYSERR;
388   ret = GNUNET_MST_read (cstate->mst,
389                          cstate->sock,
390                          GNUNET_NO,
391                          GNUNET_NO);
392   if (GNUNET_SYSERR == ret)
393   {
394     if (NULL != cstate->mq)
395       GNUNET_MQ_inject_error (cstate->mq,
396                               GNUNET_MQ_ERROR_READ);
397     if (GNUNET_YES == cstate->in_destroy)
398       connection_client_destroy_impl (cstate->mq,
399                                       cstate);
400     return;
401   }
402   if (GNUNET_YES == cstate->in_destroy)
403   {
404     connection_client_destroy_impl (cstate->mq,
405                                     cstate);
406     return;
407   }
408   cstate->in_destroy = GNUNET_NO;
409   cstate->recv_task
410     = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
411                                      cstate->sock,
412                                      &receive_ready,
413                                      cstate);
414 }
415
416
417 /**
418  * We've succeeded in establishing a connection.
419  *
420  * @param cstate the connection we tried to establish
421  */
422 static void
423 connect_success_continuation (struct ClientState *cstate)
424 {
425   GNUNET_assert (NULL == cstate->recv_task);
426   cstate->recv_task
427     = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
428                                      cstate->sock,
429                                      &receive_ready,
430                                      cstate);
431   if (NULL != cstate->msg)
432   {
433     GNUNET_assert (NULL == cstate->send_task);
434     cstate->send_task
435       = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
436                                         cstate->sock,
437                                         &transmit_ready,
438                                         cstate);
439   }
440 }
441
442
443 /**
444  * Try connecting to the server using UNIX domain sockets.
445  *
446  * @param service_name name of service to connect to
447  * @param cfg configuration to use
448  * @return NULL on error, socket connected to UNIX otherwise
449  */
450 static struct GNUNET_NETWORK_Handle *
451 try_unixpath (const char *service_name,
452               const struct GNUNET_CONFIGURATION_Handle *cfg)
453 {
454 #if AF_UNIX
455   struct GNUNET_NETWORK_Handle *sock;
456   char *unixpath;
457   struct sockaddr_un s_un;
458
459   unixpath = NULL;
460   if ((GNUNET_OK ==
461        GNUNET_CONFIGURATION_get_value_filename (cfg,
462                                                 service_name,
463                                                 "UNIXPATH",
464                                                 &unixpath)) &&
465       (0 < strlen (unixpath)))
466   {
467     /* We have a non-NULL unixpath, need to validate it */
468     if (strlen (unixpath) >= sizeof (s_un.sun_path))
469     {
470       LOG (GNUNET_ERROR_TYPE_WARNING,
471            _("UNIXPATH `%s' too long, maximum length is %llu\n"),
472            unixpath,
473            (unsigned long long) sizeof (s_un.sun_path));
474       unixpath = GNUNET_NETWORK_shorten_unixpath (unixpath);
475       LOG (GNUNET_ERROR_TYPE_INFO,
476            _("Using `%s' instead\n"),
477            unixpath);
478       if (NULL == unixpath)
479         return NULL;
480     }
481     memset (&s_un,
482             0,
483             sizeof (s_un));
484     s_un.sun_family = AF_UNIX;
485     strncpy (s_un.sun_path,
486              unixpath,
487              sizeof (s_un.sun_path) - 1);
488 #ifdef LINUX
489     {
490       int abstract;
491
492       abstract = GNUNET_CONFIGURATION_get_value_yesno (cfg,
493                                                        "TESTING",
494                                                        "USE_ABSTRACT_SOCKETS");
495       if (GNUNET_YES == abstract)
496         s_un.sun_path[0] = '\0';
497     }
498 #endif
499 #if HAVE_SOCKADDR_UN_SUN_LEN
500     s_un.sun_len = (u_char) sizeof (struct sockaddr_un);
501 #endif
502     sock = GNUNET_NETWORK_socket_create (AF_UNIX,
503                                          SOCK_STREAM,
504                                          0);
505     if ( (NULL != sock) &&
506          ( (GNUNET_OK ==
507             GNUNET_NETWORK_socket_connect (sock,
508                                            (struct sockaddr *) &s_un,
509                                            sizeof (s_un))) ||
510            (EINPROGRESS == errno) ) )
511     {
512       LOG (GNUNET_ERROR_TYPE_DEBUG,
513            "Successfully connected to unixpath `%s'!\n",
514            unixpath);
515       GNUNET_free (unixpath);
516       return sock;
517     }
518     if (NULL != sock)
519       GNUNET_NETWORK_socket_close (sock);
520   }
521   GNUNET_free_non_null (unixpath);
522 #endif
523   return NULL;
524 }
525
526
527 /**
528  * Scheduler let us know that we're either ready to write on the
529  * socket OR connect timed out.  Do the right thing.
530  *
531  * @param cls the `struct AddressProbe *` with the address that we are probing
532  */
533 static void
534 connect_probe_continuation (void *cls)
535 {
536   struct AddressProbe *ap = cls;
537   struct ClientState *cstate = ap->cstate;
538   const struct GNUNET_SCHEDULER_TaskContext *tc;
539   int error;
540   socklen_t len;
541
542   ap->task = NULL;
543   GNUNET_assert (NULL != ap->sock);
544   GNUNET_CONTAINER_DLL_remove (cstate->ap_head,
545                                cstate->ap_tail,
546                                ap);
547   len = sizeof (error);
548   error = 0;
549   tc = GNUNET_SCHEDULER_get_task_context ();
550   if ( (0 == (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) ||
551        (GNUNET_OK !=
552         GNUNET_NETWORK_socket_getsockopt (ap->sock,
553                                           SOL_SOCKET,
554                                           SO_ERROR,
555                                           &error,
556                                           &len)) ||
557        (0 != error) )
558   {
559     GNUNET_break (GNUNET_OK ==
560                   GNUNET_NETWORK_socket_close (ap->sock));
561     GNUNET_free (ap);
562     if ( (NULL == cstate->ap_head) &&
563          //      (NULL == cstate->proxy_handshake) &&
564          (NULL == cstate->dns_active) )
565       connect_fail_continuation (cstate);
566     return;
567   }
568   LOG (GNUNET_ERROR_TYPE_DEBUG,
569        "Connection to `%s' succeeded!\n",
570        cstate->service_name);
571   /* trigger jobs that waited for the connection */
572   GNUNET_assert (NULL == cstate->sock);
573   cstate->sock = ap->sock;
574   GNUNET_free (ap);
575   cancel_aps (cstate);
576   connect_success_continuation (cstate);
577 }
578
579
580 /**
581  * Try to establish a connection given the specified address.
582  * This function is called by the resolver once we have a DNS reply.
583  *
584  * @param cls our `struct ClientState *`
585  * @param addr address to try, NULL for "last call"
586  * @param addrlen length of @a addr
587  */
588 static void
589 try_connect_using_address (void *cls,
590                            const struct sockaddr *addr,
591                            socklen_t addrlen)
592 {
593   struct ClientState *cstate = cls;
594   struct AddressProbe *ap;
595
596   if (NULL == addr)
597   {
598     cstate->dns_active = NULL;
599     if ( (NULL == cstate->ap_head) &&
600          //  (NULL == cstate->proxy_handshake) &&
601          (NULL == cstate->sock) )
602       connect_fail_continuation (cstate);
603     return;
604   }
605   if (NULL != cstate->sock)
606     return;                     /* already connected */
607   /* try to connect */
608   LOG (GNUNET_ERROR_TYPE_DEBUG,
609        "Trying to connect using address `%s:%u'\n",
610        GNUNET_a2s (addr,
611                    addrlen),
612        cstate->port);
613   ap = GNUNET_malloc (sizeof (struct AddressProbe) + addrlen);
614   ap->addr = (const struct sockaddr *) &ap[1];
615   GNUNET_memcpy (&ap[1],
616                  addr,
617                  addrlen);
618   ap->addrlen = addrlen;
619   ap->cstate = cstate;
620
621   switch (ap->addr->sa_family)
622   {
623   case AF_INET:
624     ((struct sockaddr_in *) ap->addr)->sin_port = htons (cstate->port);
625     break;
626   case AF_INET6:
627     ((struct sockaddr_in6 *) ap->addr)->sin6_port = htons (cstate->port);
628     break;
629   default:
630     GNUNET_break (0);
631     GNUNET_free (ap);
632     return;                     /* not supported by us */
633   }
634   ap->sock = GNUNET_NETWORK_socket_create (ap->addr->sa_family,
635                                            SOCK_STREAM,
636                                            0);
637   if (NULL == ap->sock)
638   {
639     GNUNET_free (ap);
640     return;                     /* not supported by OS */
641   }
642   if ( (GNUNET_OK !=
643         GNUNET_NETWORK_socket_connect (ap->sock,
644                                        ap->addr,
645                                        ap->addrlen)) &&
646        (EINPROGRESS != errno) )
647   {
648     /* maybe refused / unsupported address, try next */
649     GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
650                          "connect");
651     GNUNET_break (GNUNET_OK ==
652                   GNUNET_NETWORK_socket_close (ap->sock));
653     GNUNET_free (ap);
654     return;
655   }
656   GNUNET_CONTAINER_DLL_insert (cstate->ap_head,
657                                cstate->ap_tail,
658                                ap);
659   ap->task = GNUNET_SCHEDULER_add_write_net (GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
660                                              ap->sock,
661                                              &connect_probe_continuation,
662                                              ap);
663 }
664
665
666 /**
667  * Test whether the configuration has proper values for connection
668  * (UNIXPATH || (PORT && HOSTNAME)).
669  *
670  * @param service_name name of service to connect to
671  * @param cfg configuration to use
672  * @return #GNUNET_OK if the configuration is valid, #GNUNET_SYSERR if not
673  */
674 static int
675 test_service_configuration (const char *service_name,
676                             const struct GNUNET_CONFIGURATION_Handle *cfg)
677 {
678   int ret = GNUNET_SYSERR;
679   char *hostname = NULL;
680   unsigned long long port;
681 #if AF_UNIX
682   char *unixpath = NULL;
683
684   if ((GNUNET_OK ==
685        GNUNET_CONFIGURATION_get_value_filename (cfg,
686                                                 service_name,
687                                                 "UNIXPATH",
688                                                 &unixpath)) &&
689       (0 < strlen (unixpath)))
690     ret = GNUNET_OK;
691   GNUNET_free_non_null (unixpath);
692 #endif
693
694   if ( (GNUNET_YES ==
695         GNUNET_CONFIGURATION_have_value (cfg,
696                                          service_name,
697                                          "PORT")) &&
698        (GNUNET_OK ==
699         GNUNET_CONFIGURATION_get_value_number (cfg,
700                                                service_name,
701                                                "PORT",
702                                                &port)) &&
703        (port <= 65535) &&
704        (0 != port) &&
705        (GNUNET_OK ==
706         GNUNET_CONFIGURATION_get_value_string (cfg,
707                                                service_name,
708                                                "HOSTNAME",
709                                                &hostname)) &&
710        (0 != strlen (hostname)) )
711     ret = GNUNET_OK;
712   GNUNET_free_non_null (hostname);
713   return ret;
714 }
715
716
717 /**
718  * Try to connect to the service.
719  *
720  * @param cls the `struct ClientState` to try to connect to the service
721  */
722 static void
723 start_connect (void *cls)
724 {
725   struct ClientState *cstate = cls;
726
727   cstate->retry_task = NULL;
728 #if 0
729   /* Never use a local source if a proxy is configured */
730   if (GNUNET_YES ==
731       GNUNET_SOCKS_check_service (cstate->service_name,
732                                   cstate->cfg))
733   {
734     socks_connect (cstate);
735     return;
736   }
737 #endif
738
739   if ( (0 == (cstate->attempts++ % 2)) ||
740        (0 == cstate->port) ||
741        (NULL == cstate->hostname) )
742   {
743     /* on even rounds, try UNIX first, or always
744        if we do not have a DNS name and TCP port. */
745     cstate->sock = try_unixpath (cstate->service_name,
746                                  cstate->cfg);
747     if (NULL != cstate->sock)
748     {
749       connect_success_continuation (cstate);
750       return;
751     }
752   }
753   if ( (NULL == cstate->hostname) ||
754        (0 == cstate->port) )
755   {
756     /* All options failed. Boo! */
757     connect_fail_continuation (cstate);
758     return;
759   }
760   cstate->dns_active
761     = GNUNET_RESOLVER_ip_get (cstate->hostname,
762                               AF_UNSPEC,
763                               GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
764                               &try_connect_using_address,
765                               cstate);
766 }
767
768
769 /**
770  * Implements the transmission functionality of a message queue.
771  *
772  * @param mq the message queue
773  * @param msg the message to send
774  * @param impl_state our `struct ClientState`
775  */
776 static void
777 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
778                              const struct GNUNET_MessageHeader *msg,
779                              void *impl_state)
780 {
781   struct ClientState *cstate = impl_state;
782
783   /* only one message at a time allowed */
784   GNUNET_assert (NULL == cstate->msg);
785   GNUNET_assert (NULL == cstate->send_task);
786   cstate->msg = msg;
787   cstate->msg_off = 0;
788   if (NULL == cstate->sock)
789     return; /* still waiting for connection */
790   cstate->send_task
791     = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
792                                       cstate->sock,
793                                       &transmit_ready,
794                                       cstate);
795 }
796
797
798 /**
799  * Cancel the currently sent message.
800  *
801  * @param mq message queue
802  * @param impl_state our `struct ClientState`
803  */
804 static void
805 connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
806                                void *impl_state)
807 {
808   struct ClientState *cstate = impl_state;
809
810   GNUNET_assert (NULL != cstate->msg);
811   GNUNET_assert (0 == cstate->msg_off);
812   cstate->msg = NULL;
813   if (NULL != cstate->send_task)
814   {
815     GNUNET_SCHEDULER_cancel (cstate->send_task);
816     cstate->send_task = NULL;
817   }
818 }
819
820
821 /**
822  * Create a message queue to connect to a GNUnet service.
823  * If handlers are specfied, receive messages from the connection.
824  *
825  * @param cfg our configuration
826  * @param service_name name of the service to connect to
827  * @param handlers handlers for receiving messages, can be NULL
828  * @param error_handler error handler
829  * @param error_handler_cls closure for the @a error_handler
830  * @return the message queue, NULL on error
831  */
832 struct GNUNET_MQ_Handle *
833 GNUNET_CLIENT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
834                        const char *service_name,
835                        const struct GNUNET_MQ_MessageHandler *handlers,
836                        GNUNET_MQ_ErrorHandler error_handler,
837                        void *error_handler_cls)
838 {
839   struct ClientState *cstate;
840
841   if (GNUNET_OK !=
842       test_service_configuration (service_name,
843                                   cfg))
844     return NULL;
845   cstate = GNUNET_new (struct ClientState);
846   cstate->service_name = GNUNET_strdup (service_name);
847   cstate->cfg = cfg;
848   cstate->retry_task = GNUNET_SCHEDULER_add_now (&start_connect,
849                                                  cstate);
850   cstate->mst = GNUNET_MST_create (&recv_message,
851                                    cstate);
852   if (GNUNET_YES ==
853       GNUNET_CONFIGURATION_have_value (cfg,
854                                        service_name,
855                                        "PORT"))
856   {
857     if (! ( (GNUNET_OK !=
858              GNUNET_CONFIGURATION_get_value_number (cfg,
859                                                     service_name,
860                                                     "PORT",
861                                                     &cstate->port)) ||
862             (cstate->port > 65535) ||
863             (GNUNET_OK !=
864              GNUNET_CONFIGURATION_get_value_string (cfg,
865                                                     service_name,
866                                                     "HOSTNAME",
867                                                     &cstate->hostname)) ) &&
868         (0 == strlen (cstate->hostname)) )
869     {
870       GNUNET_free (cstate->hostname);
871       cstate->hostname = NULL;
872       LOG (GNUNET_ERROR_TYPE_WARNING,
873            _("Need a non-empty hostname for service `%s'.\n"),
874            service_name);
875     }
876   }
877   cstate->mq = GNUNET_MQ_queue_for_callbacks (&connection_client_send_impl,
878                                               &connection_client_destroy_impl,
879                                               &connection_client_cancel_impl,
880                                               cstate,
881                                               handlers,
882                                               error_handler,
883                                               error_handler_cls);
884   return cstate->mq;
885 }
886
887 /* end of client.c */