Merge branch 'master' of git+ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / util / client_new.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2001-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file util/client.c
23  * @brief code for access to services
24  * @author Christian Grothoff
25  *
26  * Generic TCP code for reliable, record-oriented TCP
27  * connections between clients and service providers.
28  */
29 #include "platform.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_resolver_service.h"
33 #include "gnunet_socks.h"
34
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "util",__VA_ARGS__)
37
38
39 /**
40  * Internal state for a client connected to a GNUnet service.
41  */
42 struct ClientState;
43
44
45 /**
46  * During connect, we try multiple possible IP addresses
47  * to find out which one might work.
48  */
49 struct AddressProbe
50 {
51
52   /**
53    * This is a linked list.
54    */
55   struct AddressProbe *next;
56
57   /**
58    * This is a doubly-linked list.
59    */
60   struct AddressProbe *prev;
61
62   /**
63    * The address; do not free (allocated at the end of this struct).
64    */
65   const struct sockaddr *addr;
66
67   /**
68    * Underlying OS's socket.
69    */
70   struct GNUNET_NETWORK_Handle *sock;
71
72   /**
73    * Connection for which we are probing.
74    */
75   struct ClientState *cstate;
76
77   /**
78    * Lenth of addr.
79    */
80   socklen_t addrlen;
81
82   /**
83    * Task waiting for the connection to finish connecting.
84    */
85   struct GNUNET_SCHEDULER_Task *task;
86 };
87
88
89 /**
90  * Internal state for a client connected to a GNUnet service.
91  */
92 struct ClientState
93 {
94
95   /**
96    * The connection handle, NULL if not live
97    */
98   struct GNUNET_NETWORK_Handle *sock;
99
100   /**
101    * Handle to a pending DNS lookup request, NULL if DNS is finished.
102    */
103   struct GNUNET_RESOLVER_RequestHandle *dns_active;
104
105   /**
106    * Our configuration.
107    */
108   const struct GNUNET_CONFIGURATION_Handle *cfg;
109
110   /**
111    * Linked list of sockets we are currently trying out
112    * (during connect).
113    */
114   struct AddressProbe *ap_head;
115
116   /**
117    * Linked list of sockets we are currently trying out
118    * (during connect).
119    */
120   struct AddressProbe *ap_tail;
121
122   /**
123    * Name of the service we interact with.
124    */
125   char *service_name;
126
127   /**
128    * Hostname, if any.
129    */
130   char *hostname;
131
132   /**
133    * Next message to transmit to the service. NULL for none.
134    */
135   const struct GNUNET_MessageHeader *msg;
136
137   /**
138    * Task for trying to connect to the service.
139    */
140   struct GNUNET_SCHEDULER_Task *retry_task;
141
142   /**
143    * Task for sending messages to the service.
144    */
145   struct GNUNET_SCHEDULER_Task *send_task;
146
147   /**
148    * Task for sending messages to the service.
149    */
150   struct GNUNET_SCHEDULER_Task *recv_task;
151
152   /**
153    * Tokenizer for inbound messages.
154    */
155   struct GNUNET_MessageStreamTokenizer *mst;
156
157   /**
158    * Message queue under our control.
159    */
160   struct GNUNET_MQ_Handle *mq;
161
162   /**
163    * Timeout for receiving a response (absolute time).
164    */
165   struct GNUNET_TIME_Absolute receive_timeout;
166
167   /**
168    * Current value for our incremental back-off (for
169    * connect re-tries).
170    */
171   struct GNUNET_TIME_Relative back_off;
172
173   /**
174    * TCP port (0 for disabled).
175    */
176   unsigned long long port;
177
178   /**
179    * Offset in the message where we are for transmission.
180    */
181   size_t msg_off;
182
183   /**
184    * How often have we tried to connect?
185    */
186   unsigned int attempts;
187
188   /**
189    * Are we supposed to die?  #GNUNET_SYSERR if destruction must be
190    * deferred, #GNUNET_NO by default, #GNUNET_YES if destruction was
191    * deferred.
192    */
193   int in_destroy;
194
195 };
196
197
198 /**
199  * Try to connect to the service.
200  *
201  * @param cls the `struct ClientState` to try to connect to the service
202  */
203 static void
204 start_connect (void *cls);
205
206
207 /**
208  * We've failed for good to establish a connection (timeout or
209  * no more addresses to try).
210  *
211  * @param cstate the connection we tried to establish
212  */
213 static void
214 connect_fail_continuation (struct ClientState *cstate)
215 {
216   LOG (GNUNET_ERROR_TYPE_WARNING,
217        "Failed to establish connection to `%s', no further addresses to try.\n",
218        cstate->service_name);
219   GNUNET_break (NULL == cstate->ap_head);
220   GNUNET_break (NULL == cstate->ap_tail);
221   GNUNET_break (NULL == cstate->dns_active);
222   GNUNET_break (NULL == cstate->sock);
223   GNUNET_assert (NULL == cstate->send_task);
224   GNUNET_assert (NULL == cstate->recv_task);
225   // GNUNET_assert (NULL == cstate->proxy_handshake);
226
227   cstate->back_off = GNUNET_TIME_STD_BACKOFF (cstate->back_off);
228   cstate->retry_task
229     = GNUNET_SCHEDULER_add_delayed (cstate->back_off,
230                                     &start_connect,
231                                     cstate);
232 }
233
234
235 /**
236  * We are ready to send a message to the service.
237  *
238  * @param cls the `struct ClientState` with the `msg` to transmit
239  */
240 static void
241 transmit_ready (void *cls)
242 {
243   struct ClientState *cstate = cls;
244   ssize_t ret;
245   size_t len;
246   const char *pos;
247   int notify_in_flight;
248
249   cstate->send_task = NULL;
250   pos = (const char *) cstate->msg;
251   len = ntohs (cstate->msg->size);
252   GNUNET_assert (cstate->msg_off < len);
253  RETRY:
254   ret = GNUNET_NETWORK_socket_send (cstate->sock,
255                                     &pos[cstate->msg_off],
256                                     len - cstate->msg_off);
257   if (-1 == ret)
258   {
259     if (EINTR == errno)
260       goto RETRY;
261     GNUNET_MQ_inject_error (cstate->mq,
262                             GNUNET_MQ_ERROR_WRITE);
263     return;
264   }
265   notify_in_flight = (0 == cstate->msg_off);
266   cstate->msg_off += ret;
267   if (cstate->msg_off < len)
268   {
269     cstate->send_task
270       = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
271                                         cstate->sock,
272                                         &transmit_ready,
273                                         cstate);
274     if (notify_in_flight) 
275       GNUNET_MQ_impl_send_in_flight (cstate->mq);
276     return;
277   }
278   cstate->msg = NULL;
279   GNUNET_MQ_impl_send_continue (cstate->mq);
280 }
281
282
283 /**
284  * We have received a full message, pass to the MQ dispatcher.
285  * Called by the tokenizer via #receive_ready().
286  *
287  * @param cls the `struct ClientState`
288  * @param msg message we received.
289  * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
290  */
291 static int
292 recv_message (void *cls,
293               const struct GNUNET_MessageHeader *msg)
294 {
295   struct ClientState *cstate = cls;
296
297   if (GNUNET_YES == cstate->in_destroy)
298     return GNUNET_SYSERR;
299   GNUNET_MQ_inject_message (cstate->mq,
300                             msg);
301   if (GNUNET_YES == cstate->in_destroy)
302     return GNUNET_SYSERR;
303   return GNUNET_OK;
304 }
305
306
307 /**
308  * Cancel all remaining connect attempts
309  *
310  * @param cstate handle of the client state to process
311  */
312 static void
313 cancel_aps (struct ClientState *cstate)
314 {
315   struct AddressProbe *pos;
316
317   while (NULL != (pos = cstate->ap_head))
318   {
319     GNUNET_break (GNUNET_OK ==
320                   GNUNET_NETWORK_socket_close (pos->sock));
321     GNUNET_SCHEDULER_cancel (pos->task);
322     GNUNET_CONTAINER_DLL_remove (cstate->ap_head,
323                                  cstate->ap_tail,
324                                  pos);
325     GNUNET_free (pos);
326   }
327 }
328
329
330 /**
331  * Implement the destruction of a message queue.  Implementations must
332  * not free @a mq, but should take care of @a impl_state.
333  *
334  * @param mq the message queue to destroy
335  * @param impl_state our `struct ClientState`
336  */
337 static void
338 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
339                                 void *impl_state)
340 {
341   struct ClientState *cstate = impl_state;
342
343   if (GNUNET_SYSERR == cstate->in_destroy)
344   {
345     /* defer destruction */
346     cstate->in_destroy = GNUNET_YES;
347     cstate->mq = NULL;
348     return;
349   }
350   if (NULL != cstate->dns_active)
351     GNUNET_RESOLVER_request_cancel (cstate->dns_active);
352   if (NULL != cstate->send_task)
353     GNUNET_SCHEDULER_cancel (cstate->send_task);
354   if (NULL != cstate->recv_task)
355     GNUNET_SCHEDULER_cancel (cstate->recv_task);
356   if (NULL != cstate->retry_task)
357     GNUNET_SCHEDULER_cancel (cstate->retry_task);
358   if (NULL != cstate->sock)
359     GNUNET_NETWORK_socket_close (cstate->sock);
360   cancel_aps (cstate);
361   GNUNET_free (cstate->service_name);
362   GNUNET_free_non_null (cstate->hostname);
363   GNUNET_MST_destroy (cstate->mst);
364   GNUNET_free (cstate);
365 }
366
367
368 /**
369  * This function is called once we have data ready to read.
370  *
371  * @param cls `struct ClientState` with connection to read from
372  */
373 static void
374 receive_ready (void *cls)
375 {
376   struct ClientState *cstate = cls;
377   int ret;
378
379   cstate->recv_task = NULL;
380   cstate->in_destroy = GNUNET_SYSERR;
381   ret = GNUNET_MST_read (cstate->mst,
382                          cstate->sock,
383                          GNUNET_NO,
384                          GNUNET_NO);
385   if (GNUNET_SYSERR == ret)
386   {
387     if (NULL != cstate->mq)
388       GNUNET_MQ_inject_error (cstate->mq,
389                               GNUNET_MQ_ERROR_READ);
390     if (GNUNET_YES == cstate->in_destroy)
391       connection_client_destroy_impl (cstate->mq,
392                                       cstate);
393     return;
394   }
395   if (GNUNET_YES == cstate->in_destroy)
396   {
397     connection_client_destroy_impl (cstate->mq,
398                                     cstate);
399     return;
400   }
401   cstate->in_destroy = GNUNET_NO;
402   cstate->recv_task
403     = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
404                                      cstate->sock,
405                                      &receive_ready,
406                                      cstate);
407 }
408
409
410 /**
411  * We've succeeded in establishing a connection.
412  *
413  * @param cstate the connection we tried to establish
414  */
415 static void
416 connect_success_continuation (struct ClientState *cstate)
417 {
418   GNUNET_assert (NULL == cstate->recv_task);
419   cstate->recv_task
420     = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
421                                      cstate->sock,
422                                      &receive_ready,
423                                      cstate);
424   if (NULL != cstate->msg)
425   {
426     GNUNET_assert (NULL == cstate->send_task);
427     cstate->send_task
428       = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
429                                         cstate->sock,
430                                         &transmit_ready,
431                                         cstate);
432   }
433 }
434
435
436 /**
437  * Try connecting to the server using UNIX domain sockets.
438  *
439  * @param service_name name of service to connect to
440  * @param cfg configuration to use
441  * @return NULL on error, socket connected to UNIX otherwise
442  */
443 static struct GNUNET_NETWORK_Handle *
444 try_unixpath (const char *service_name,
445               const struct GNUNET_CONFIGURATION_Handle *cfg)
446 {
447 #if AF_UNIX
448   struct GNUNET_NETWORK_Handle *sock;
449   char *unixpath;
450   struct sockaddr_un s_un;
451
452   unixpath = NULL;
453   if ((GNUNET_OK ==
454        GNUNET_CONFIGURATION_get_value_filename (cfg,
455                                                 service_name,
456                                                 "UNIXPATH",
457                                                 &unixpath)) &&
458       (0 < strlen (unixpath)))
459   {
460     /* We have a non-NULL unixpath, need to validate it */
461     if (strlen (unixpath) >= sizeof (s_un.sun_path))
462     {
463       LOG (GNUNET_ERROR_TYPE_WARNING,
464            _("UNIXPATH `%s' too long, maximum length is %llu\n"),
465            unixpath,
466            (unsigned long long) sizeof (s_un.sun_path));
467       unixpath = GNUNET_NETWORK_shorten_unixpath (unixpath);
468       LOG (GNUNET_ERROR_TYPE_INFO,
469            _("Using `%s' instead\n"),
470            unixpath);
471       if (NULL == unixpath)
472         return NULL;
473     }
474     memset (&s_un,
475             0,
476             sizeof (s_un));
477     s_un.sun_family = AF_UNIX;
478     strncpy (s_un.sun_path,
479              unixpath,
480              sizeof (s_un.sun_path) - 1);
481 #ifdef LINUX
482     {
483       int abstract;
484
485       abstract = GNUNET_CONFIGURATION_get_value_yesno (cfg,
486                                                        "TESTING",
487                                                        "USE_ABSTRACT_SOCKETS");
488       if (GNUNET_YES == abstract)
489         s_un.sun_path[0] = '\0';
490     }
491 #endif
492 #if HAVE_SOCKADDR_IN_SIN_LEN
493     un.sun_len = (u_char) sizeof (struct sockaddr_un);
494 #endif
495     sock = GNUNET_NETWORK_socket_create (AF_UNIX,
496                                          SOCK_STREAM,
497                                          0);
498     if ( (NULL != sock) &&
499          ( (GNUNET_OK ==
500             GNUNET_NETWORK_socket_connect (sock,
501                                            (struct sockaddr *) &s_un,
502                                            sizeof (s_un))) ||
503            (EINPROGRESS == errno) ) )
504     {
505       LOG (GNUNET_ERROR_TYPE_DEBUG,
506            "Successfully connected to unixpath `%s'!\n",
507            unixpath);
508       GNUNET_free (unixpath);
509       return sock;
510     }
511   }
512   GNUNET_free_non_null (unixpath);
513 #endif
514   return NULL;
515 }
516
517
518 /**
519  * Scheduler let us know that we're either ready to write on the
520  * socket OR connect timed out.  Do the right thing.
521  *
522  * @param cls the `struct AddressProbe *` with the address that we are probing
523  */
524 static void
525 connect_probe_continuation (void *cls)
526 {
527   struct AddressProbe *ap = cls;
528   struct ClientState *cstate = ap->cstate;
529   const struct GNUNET_SCHEDULER_TaskContext *tc;
530   int error;
531   socklen_t len;
532
533   ap->task = NULL;
534   GNUNET_assert (NULL != ap->sock);
535   GNUNET_CONTAINER_DLL_remove (cstate->ap_head,
536                                cstate->ap_tail,
537                                ap);
538   len = sizeof (error);
539   error = 0;
540   tc = GNUNET_SCHEDULER_get_task_context ();
541   if ( (0 == (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) ||
542        (GNUNET_OK !=
543         GNUNET_NETWORK_socket_getsockopt (ap->sock,
544                                           SOL_SOCKET,
545                                           SO_ERROR,
546                                           &error,
547                                           &len)) ||
548        (0 != error) )
549   {
550     GNUNET_break (GNUNET_OK ==
551                   GNUNET_NETWORK_socket_close (ap->sock));
552     GNUNET_free (ap);
553     if ( (NULL == cstate->ap_head) &&
554          //      (NULL == cstate->proxy_handshake) &&
555          (NULL == cstate->dns_active) )
556       connect_fail_continuation (cstate);
557     return;
558   }
559   LOG (GNUNET_ERROR_TYPE_DEBUG,
560        "Connection to `%s' succeeded!\n",
561        cstate->service_name);
562   /* trigger jobs that waited for the connection */
563   GNUNET_assert (NULL == cstate->sock);
564   cstate->sock = ap->sock;
565   GNUNET_free (ap);
566   cancel_aps (cstate);
567   connect_success_continuation (cstate);
568 }
569
570
571 /**
572  * Try to establish a connection given the specified address.
573  * This function is called by the resolver once we have a DNS reply.
574  *
575  * @param cls our `struct ClientState *`
576  * @param addr address to try, NULL for "last call"
577  * @param addrlen length of @a addr
578  */
579 static void
580 try_connect_using_address (void *cls,
581                            const struct sockaddr *addr,
582                            socklen_t addrlen)
583 {
584   struct ClientState *cstate = cls;
585   struct AddressProbe *ap;
586   
587   if (NULL == addr)
588   {
589     cstate->dns_active = NULL;
590     if ( (NULL == cstate->ap_head) &&
591          //  (NULL == cstate->proxy_handshake) &&
592          (NULL == cstate->sock) )
593       connect_fail_continuation (cstate);
594     return;
595   }
596   if (NULL != cstate->sock)
597     return;                     /* already connected */
598   /* try to connect */
599   LOG (GNUNET_ERROR_TYPE_DEBUG,
600        "Trying to connect using address `%s:%u'\n",
601        GNUNET_a2s (addr,
602                    addrlen),
603        cstate->port);
604   ap = GNUNET_malloc (sizeof (struct AddressProbe) + addrlen);
605   ap->addr = (const struct sockaddr *) &ap[1];
606   GNUNET_memcpy (&ap[1],
607                  addr,
608                  addrlen);
609   ap->addrlen = addrlen;
610   ap->cstate = cstate;
611
612   switch (ap->addr->sa_family)
613   {
614   case AF_INET:
615     ((struct sockaddr_in *) ap->addr)->sin_port = htons (cstate->port);
616     break;
617   case AF_INET6:
618     ((struct sockaddr_in6 *) ap->addr)->sin6_port = htons (cstate->port);
619     break;
620   default:
621     GNUNET_break (0);
622     GNUNET_free (ap);
623     return;                     /* not supported by us */
624   }
625   ap->sock = GNUNET_NETWORK_socket_create (ap->addr->sa_family,
626                                            SOCK_STREAM,
627                                            0);
628   if (NULL == ap->sock)
629   {
630     GNUNET_free (ap);
631     return;                     /* not supported by OS */
632   }
633   if ( (GNUNET_OK !=
634         GNUNET_NETWORK_socket_connect (ap->sock,
635                                        ap->addr,
636                                        ap->addrlen)) &&
637        (EINPROGRESS != errno) )
638   {
639     /* maybe refused / unsupported address, try next */
640     GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
641                          "connect");
642     GNUNET_break (GNUNET_OK ==
643                   GNUNET_NETWORK_socket_close (ap->sock));
644     GNUNET_free (ap);
645     return;
646   }
647   GNUNET_CONTAINER_DLL_insert (cstate->ap_head,
648                                cstate->ap_tail,
649                                ap);
650   ap->task = GNUNET_SCHEDULER_add_write_net (GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
651                                              ap->sock,
652                                              &connect_probe_continuation,
653                                              ap);
654 }
655
656
657 /**
658  * Test whether the configuration has proper values for connection
659  * (UNIXPATH || (PORT && HOSTNAME)).
660  *
661  * @param service_name name of service to connect to
662  * @param cfg configuration to use
663  * @return #GNUNET_OK if the configuration is valid, #GNUNET_SYSERR if not
664  */
665 static int
666 test_service_configuration (const char *service_name,
667                             const struct GNUNET_CONFIGURATION_Handle *cfg)
668 {
669   int ret = GNUNET_SYSERR;
670   char *hostname = NULL;
671   unsigned long long port;
672 #if AF_UNIX
673   char *unixpath = NULL;
674
675   if ((GNUNET_OK ==
676        GNUNET_CONFIGURATION_get_value_filename (cfg,
677                                                 service_name,
678                                                 "UNIXPATH",
679                                                 &unixpath)) &&
680       (0 < strlen (unixpath)))
681     ret = GNUNET_OK;
682   GNUNET_free_non_null (unixpath);
683 #endif
684
685   if ( (GNUNET_YES ==
686         GNUNET_CONFIGURATION_have_value (cfg,
687                                          service_name,
688                                          "PORT")) &&
689        (GNUNET_OK ==
690         GNUNET_CONFIGURATION_get_value_number (cfg,
691                                                service_name,
692                                                "PORT",
693                                                &port)) &&
694        (port <= 65535) &&
695        (0 != port) &&
696        (GNUNET_OK ==
697         GNUNET_CONFIGURATION_get_value_string (cfg,
698                                                service_name,
699                                                "HOSTNAME",
700                                                &hostname)) &&
701        (0 != strlen (hostname)) )
702     ret = GNUNET_OK;
703   GNUNET_free_non_null (hostname);
704   return ret;
705 }
706
707
708 /**
709  * Try to connect to the service.
710  *
711  * @param cls the `struct ClientState` to try to connect to the service
712  */
713 static void
714 start_connect (void *cls)
715 {
716   struct ClientState *cstate = cls;
717
718   cstate->retry_task = NULL;
719 #if 0
720   /* Never use a local source if a proxy is configured */
721   if (GNUNET_YES ==
722       GNUNET_SOCKS_check_service (cstate->service_name,
723                                   cstate->cfg))
724   {
725     socks_connect (cstate);
726     return;
727   }
728 #endif
729
730   if ( (0 == (cstate->attempts++ % 2)) ||
731        (0 == cstate->port) ||
732        (NULL == cstate->hostname) )
733   {
734     /* on even rounds, try UNIX first, or always
735        if we do not have a DNS name and TCP port. */
736     cstate->sock = try_unixpath (cstate->service_name,
737                                  cstate->cfg);
738     if (NULL != cstate->sock)
739     {
740       connect_success_continuation (cstate);
741       return;
742     }    
743   }
744   if ( (NULL == cstate->hostname) ||
745        (0 == cstate->port) )
746   {
747     /* All options failed. Boo! */
748     connect_fail_continuation (cstate);
749     return;
750   }
751   cstate->dns_active
752     = GNUNET_RESOLVER_ip_get (cstate->hostname,
753                               AF_UNSPEC,
754                               GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
755                               &try_connect_using_address,
756                               cstate);
757 }
758
759
760 /**
761  * Implements the transmission functionality of a message queue.
762  *
763  * @param mq the message queue
764  * @param msg the message to send
765  * @param impl_state our `struct ClientState`
766  */
767 static void
768 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
769                              const struct GNUNET_MessageHeader *msg,
770                              void *impl_state)
771 {
772   struct ClientState *cstate = impl_state;
773
774   /* only one message at a time allowed */
775   GNUNET_assert (NULL == cstate->msg);
776   GNUNET_assert (NULL == cstate->send_task);
777   cstate->msg = msg;
778   cstate->msg_off = 0;
779   if (NULL == cstate->sock)
780     return; /* still waiting for connection */
781   cstate->send_task
782     = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
783                                       cstate->sock,
784                                       &transmit_ready,
785                                       cstate);
786 }
787
788
789 /**
790  * Cancel the currently sent message.
791  *
792  * @param mq message queue
793  * @param impl_state our `struct ClientState`
794  */
795 static void
796 connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
797                                void *impl_state)
798 {
799   struct ClientState *cstate = impl_state;
800
801   GNUNET_assert (NULL != cstate->msg);
802   GNUNET_assert (0 == cstate->msg_off);
803   cstate->msg = NULL;
804   if (NULL != cstate->send_task)
805   {
806     GNUNET_SCHEDULER_cancel (cstate->send_task);
807     cstate->send_task = NULL;
808   }
809 }
810
811
812 /**
813  * Create a message queue to connect to a GNUnet service.
814  * If handlers are specfied, receive messages from the connection.
815  *
816  * @param cfg our configuration
817  * @param service_name name of the service to connect to
818  * @param handlers handlers for receiving messages, can be NULL
819  * @param error_handler error handler
820  * @param error_handler_cls closure for the @a error_handler
821  * @return the message queue, NULL on error
822  */
823 struct GNUNET_MQ_Handle *
824 GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
825                        const char *service_name,
826                        const struct GNUNET_MQ_MessageHandler *handlers,
827                        GNUNET_MQ_ErrorHandler error_handler,
828                        void *error_handler_cls)
829 {
830   struct ClientState *cstate;
831
832   if (GNUNET_OK !=
833       test_service_configuration (service_name,
834                                   cfg))
835     return NULL;
836   cstate = GNUNET_new (struct ClientState);
837   cstate->service_name = GNUNET_strdup (service_name);
838   cstate->cfg = cfg;
839   cstate->retry_task = GNUNET_SCHEDULER_add_now (&start_connect,
840                                                  cstate);
841   cstate->mst = GNUNET_MST_create (&recv_message,
842                                    cstate);
843   if (GNUNET_YES ==
844       GNUNET_CONFIGURATION_have_value (cfg,
845                                        service_name,
846                                        "PORT"))
847   {
848     if (! ( (GNUNET_OK !=
849              GNUNET_CONFIGURATION_get_value_number (cfg,
850                                                     service_name,
851                                                     "PORT",
852                                                     &cstate->port)) ||
853             (cstate->port > 65535) ||
854             (GNUNET_OK !=
855              GNUNET_CONFIGURATION_get_value_string (cfg,
856                                                     service_name,
857                                                     "HOSTNAME",
858                                                     &cstate->hostname)) ) &&
859         (0 == strlen (cstate->hostname)) )
860     {
861       GNUNET_free (cstate->hostname);
862       cstate->hostname = NULL;
863       LOG (GNUNET_ERROR_TYPE_WARNING,
864            _("Need a non-empty hostname for service `%s'.\n"),
865            service_name);
866     }
867   }
868   cstate->mq = GNUNET_MQ_queue_for_callbacks (&connection_client_send_impl,
869                                               &connection_client_destroy_impl,
870                                               &connection_client_cancel_impl,
871                                               cstate,
872                                               handlers,
873                                               error_handler,
874                                               error_handler_cls);
875   return cstate->mq;
876 }
877
878 /* end of client_new.c */