More API function tests...
[oweals/gnunet.git] / src / util / client.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2001-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file util/client.c
23  * @brief code for access to services
24  * @author Christian Grothoff
25  *
26  * Generic TCP code for reliable, record-oriented TCP
27  * connections between clients and service providers.
28  */
29 #include "platform.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_resolver_service.h"
33 #include "gnunet_socks.h"
34
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "util",__VA_ARGS__)
37
38
39 /**
40  * Internal state for a client connected to a GNUnet service.
41  */
42 struct ClientState;
43
44
45 /**
46  * During connect, we try multiple possible IP addresses
47  * to find out which one might work.
48  */
49 struct AddressProbe
50 {
51
52   /**
53    * This is a linked list.
54    */
55   struct AddressProbe *next;
56
57   /**
58    * This is a doubly-linked list.
59    */
60   struct AddressProbe *prev;
61
62   /**
63    * The address; do not free (allocated at the end of this struct).
64    */
65   const struct sockaddr *addr;
66
67   /**
68    * Underlying OS's socket.
69    */
70   struct GNUNET_NETWORK_Handle *sock;
71
72   /**
73    * Connection for which we are probing.
74    */
75   struct ClientState *cstate;
76
77   /**
78    * Lenth of addr.
79    */
80   socklen_t addrlen;
81
82   /**
83    * Task waiting for the connection to finish connecting.
84    */
85   struct GNUNET_SCHEDULER_Task *task;
86 };
87
88
89 /**
90  * Internal state for a client connected to a GNUnet service.
91  */
92 struct ClientState
93 {
94
95   /**
96    * The connection handle, NULL if not live
97    */
98   struct GNUNET_NETWORK_Handle *sock;
99
100   /**
101    * Handle to a pending DNS lookup request, NULL if DNS is finished.
102    */
103   struct GNUNET_RESOLVER_RequestHandle *dns_active;
104
105   /**
106    * Our configuration.
107    */
108   const struct GNUNET_CONFIGURATION_Handle *cfg;
109
110   /**
111    * Linked list of sockets we are currently trying out
112    * (during connect).
113    */
114   struct AddressProbe *ap_head;
115
116   /**
117    * Linked list of sockets we are currently trying out
118    * (during connect).
119    */
120   struct AddressProbe *ap_tail;
121
122   /**
123    * Name of the service we interact with.
124    */
125   char *service_name;
126
127   /**
128    * Hostname, if any.
129    */
130   char *hostname;
131
132   /**
133    * Next message to transmit to the service. NULL for none.
134    */
135   const struct GNUNET_MessageHeader *msg;
136
137   /**
138    * Task for trying to connect to the service.
139    */
140   struct GNUNET_SCHEDULER_Task *retry_task;
141
142   /**
143    * Task for sending messages to the service.
144    */
145   struct GNUNET_SCHEDULER_Task *send_task;
146
147   /**
148    * Task for sending messages to the service.
149    */
150   struct GNUNET_SCHEDULER_Task *recv_task;
151
152   /**
153    * Tokenizer for inbound messages.
154    */
155   struct GNUNET_MessageStreamTokenizer *mst;
156
157   /**
158    * Message queue under our control.
159    */
160   struct GNUNET_MQ_Handle *mq;
161
162   /**
163    * Timeout for receiving a response (absolute time).
164    */
165   struct GNUNET_TIME_Absolute receive_timeout;
166
167   /**
168    * Current value for our incremental back-off (for
169    * connect re-tries).
170    */
171   struct GNUNET_TIME_Relative back_off;
172
173   /**
174    * TCP port (0 for disabled).
175    */
176   unsigned long long port;
177
178   /**
179    * Offset in the message where we are for transmission.
180    */
181   size_t msg_off;
182
183   /**
184    * How often have we tried to connect?
185    */
186   unsigned int attempts;
187
188   /**
189    * Are we supposed to die?  #GNUNET_SYSERR if destruction must be
190    * deferred, #GNUNET_NO by default, #GNUNET_YES if destruction was
191    * deferred.
192    */
193   int in_destroy;
194
195 };
196
197
198 /**
199  * Try to connect to the service.
200  *
201  * @param cls the `struct ClientState` to try to connect to the service
202  */
203 static void
204 start_connect (void *cls);
205
206
207 /**
208  * We've failed for good to establish a connection (timeout or
209  * no more addresses to try).
210  *
211  * @param cstate the connection we tried to establish
212  */
213 static void
214 connect_fail_continuation (struct ClientState *cstate)
215 {
216   GNUNET_break (NULL == cstate->ap_head);
217   GNUNET_break (NULL == cstate->ap_tail);
218   GNUNET_break (NULL == cstate->dns_active);
219   GNUNET_break (NULL == cstate->sock);
220   GNUNET_assert (NULL == cstate->send_task);
221   GNUNET_assert (NULL == cstate->recv_task);
222   // GNUNET_assert (NULL == cstate->proxy_handshake);
223
224   cstate->back_off = GNUNET_TIME_STD_BACKOFF (cstate->back_off);
225   LOG (GNUNET_ERROR_TYPE_DEBUG,
226        "Failed to establish connection to `%s', no further addresses to try, will try again in %s.\n",
227        cstate->service_name,
228        GNUNET_STRINGS_relative_time_to_string (cstate->back_off,
229                                                GNUNET_YES));
230   cstate->retry_task
231     = GNUNET_SCHEDULER_add_delayed (cstate->back_off,
232                                     &start_connect,
233                                     cstate);
234 }
235
236
237 /**
238  * We are ready to send a message to the service.
239  *
240  * @param cls the `struct ClientState` with the `msg` to transmit
241  */
242 static void
243 transmit_ready (void *cls)
244 {
245   struct ClientState *cstate = cls;
246   ssize_t ret;
247   size_t len;
248   const char *pos;
249   int notify_in_flight;
250
251   cstate->send_task = NULL;
252   pos = (const char *) cstate->msg;
253   len = ntohs (cstate->msg->size);
254   GNUNET_assert (cstate->msg_off < len);
255  RETRY:
256   ret = GNUNET_NETWORK_socket_send (cstate->sock,
257                                     &pos[cstate->msg_off],
258                                     len - cstate->msg_off);
259   if (-1 == ret)
260   {
261     if (EINTR == errno)
262       goto RETRY;
263     GNUNET_MQ_inject_error (cstate->mq,
264                             GNUNET_MQ_ERROR_WRITE);
265     return;
266   }
267   notify_in_flight = (0 == cstate->msg_off);
268   cstate->msg_off += ret;
269   if (cstate->msg_off < len)
270   {
271     cstate->send_task
272       = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
273                                         cstate->sock,
274                                         &transmit_ready,
275                                         cstate);
276     if (notify_in_flight)
277       GNUNET_MQ_impl_send_in_flight (cstate->mq);
278     return;
279   }
280   cstate->msg = NULL;
281   GNUNET_MQ_impl_send_continue (cstate->mq);
282 }
283
284
285 /**
286  * We have received a full message, pass to the MQ dispatcher.
287  * Called by the tokenizer via #receive_ready().
288  *
289  * @param cls the `struct ClientState`
290  * @param msg message we received.
291  * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
292  */
293 static int
294 recv_message (void *cls,
295               const struct GNUNET_MessageHeader *msg)
296 {
297   struct ClientState *cstate = cls;
298
299   if (GNUNET_YES == cstate->in_destroy)
300     return GNUNET_SYSERR;
301   GNUNET_MQ_inject_message (cstate->mq,
302                             msg);
303   if (GNUNET_YES == cstate->in_destroy)
304     return GNUNET_SYSERR;
305   return GNUNET_OK;
306 }
307
308
309 /**
310  * Cancel all remaining connect attempts
311  *
312  * @param cstate handle of the client state to process
313  */
314 static void
315 cancel_aps (struct ClientState *cstate)
316 {
317   struct AddressProbe *pos;
318
319   while (NULL != (pos = cstate->ap_head))
320   {
321     GNUNET_break (GNUNET_OK ==
322                   GNUNET_NETWORK_socket_close (pos->sock));
323     GNUNET_SCHEDULER_cancel (pos->task);
324     GNUNET_CONTAINER_DLL_remove (cstate->ap_head,
325                                  cstate->ap_tail,
326                                  pos);
327     GNUNET_free (pos);
328   }
329 }
330
331
332 /**
333  * Implement the destruction of a message queue.  Implementations must
334  * not free @a mq, but should take care of @a impl_state.
335  *
336  * @param mq the message queue to destroy
337  * @param impl_state our `struct ClientState`
338  */
339 static void
340 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
341                                 void *impl_state)
342 {
343   struct ClientState *cstate = impl_state;
344
345   if (GNUNET_SYSERR == cstate->in_destroy)
346   {
347     /* defer destruction */
348     cstate->in_destroy = GNUNET_YES;
349     cstate->mq = NULL;
350     return;
351   }
352   if (NULL != cstate->dns_active)
353     GNUNET_RESOLVER_request_cancel (cstate->dns_active);
354   if (NULL != cstate->send_task)
355     GNUNET_SCHEDULER_cancel (cstate->send_task);
356   if (NULL != cstate->recv_task)
357     GNUNET_SCHEDULER_cancel (cstate->recv_task);
358   if (NULL != cstate->retry_task)
359     GNUNET_SCHEDULER_cancel (cstate->retry_task);
360   if (NULL != cstate->sock)
361     GNUNET_NETWORK_socket_close (cstate->sock);
362   cancel_aps (cstate);
363   GNUNET_free (cstate->service_name);
364   GNUNET_free_non_null (cstate->hostname);
365   GNUNET_MST_destroy (cstate->mst);
366   GNUNET_free (cstate);
367 }
368
369
370 /**
371  * This function is called once we have data ready to read.
372  *
373  * @param cls `struct ClientState` with connection to read from
374  */
375 static void
376 receive_ready (void *cls)
377 {
378   struct ClientState *cstate = cls;
379   int ret;
380
381   cstate->recv_task = NULL;
382   cstate->in_destroy = GNUNET_SYSERR;
383   ret = GNUNET_MST_read (cstate->mst,
384                          cstate->sock,
385                          GNUNET_NO,
386                          GNUNET_NO);
387   if (GNUNET_SYSERR == ret)
388   {
389     if (NULL != cstate->mq)
390       GNUNET_MQ_inject_error (cstate->mq,
391                               GNUNET_MQ_ERROR_READ);
392     if (GNUNET_YES == cstate->in_destroy)
393       connection_client_destroy_impl (cstate->mq,
394                                       cstate);
395     return;
396   }
397   if (GNUNET_YES == cstate->in_destroy)
398   {
399     connection_client_destroy_impl (cstate->mq,
400                                     cstate);
401     return;
402   }
403   cstate->in_destroy = GNUNET_NO;
404   cstate->recv_task
405     = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
406                                      cstate->sock,
407                                      &receive_ready,
408                                      cstate);
409 }
410
411
412 /**
413  * We've succeeded in establishing a connection.
414  *
415  * @param cstate the connection we tried to establish
416  */
417 static void
418 connect_success_continuation (struct ClientState *cstate)
419 {
420   GNUNET_assert (NULL == cstate->recv_task);
421   cstate->recv_task
422     = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
423                                      cstate->sock,
424                                      &receive_ready,
425                                      cstate);
426   if (NULL != cstate->msg)
427   {
428     GNUNET_assert (NULL == cstate->send_task);
429     cstate->send_task
430       = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
431                                         cstate->sock,
432                                         &transmit_ready,
433                                         cstate);
434   }
435 }
436
437
438 /**
439  * Try connecting to the server using UNIX domain sockets.
440  *
441  * @param service_name name of service to connect to
442  * @param cfg configuration to use
443  * @return NULL on error, socket connected to UNIX otherwise
444  */
445 static struct GNUNET_NETWORK_Handle *
446 try_unixpath (const char *service_name,
447               const struct GNUNET_CONFIGURATION_Handle *cfg)
448 {
449 #if AF_UNIX
450   struct GNUNET_NETWORK_Handle *sock;
451   char *unixpath;
452   struct sockaddr_un s_un;
453
454   unixpath = NULL;
455   if ((GNUNET_OK ==
456        GNUNET_CONFIGURATION_get_value_filename (cfg,
457                                                 service_name,
458                                                 "UNIXPATH",
459                                                 &unixpath)) &&
460       (0 < strlen (unixpath)))
461   {
462     /* We have a non-NULL unixpath, need to validate it */
463     if (strlen (unixpath) >= sizeof (s_un.sun_path))
464     {
465       LOG (GNUNET_ERROR_TYPE_WARNING,
466            _("UNIXPATH `%s' too long, maximum length is %llu\n"),
467            unixpath,
468            (unsigned long long) sizeof (s_un.sun_path));
469       unixpath = GNUNET_NETWORK_shorten_unixpath (unixpath);
470       LOG (GNUNET_ERROR_TYPE_INFO,
471            _("Using `%s' instead\n"),
472            unixpath);
473       if (NULL == unixpath)
474         return NULL;
475     }
476     memset (&s_un,
477             0,
478             sizeof (s_un));
479     s_un.sun_family = AF_UNIX;
480     strncpy (s_un.sun_path,
481              unixpath,
482              sizeof (s_un.sun_path) - 1);
483 #ifdef LINUX
484     {
485       int abstract;
486
487       abstract = GNUNET_CONFIGURATION_get_value_yesno (cfg,
488                                                        "TESTING",
489                                                        "USE_ABSTRACT_SOCKETS");
490       if (GNUNET_YES == abstract)
491         s_un.sun_path[0] = '\0';
492     }
493 #endif
494 #if HAVE_SOCKADDR_UN_SUN_LEN
495     s_un.sun_len = (u_char) sizeof (struct sockaddr_un);
496 #endif
497     sock = GNUNET_NETWORK_socket_create (AF_UNIX,
498                                          SOCK_STREAM,
499                                          0);
500     if ( (NULL != sock) &&
501          ( (GNUNET_OK ==
502             GNUNET_NETWORK_socket_connect (sock,
503                                            (struct sockaddr *) &s_un,
504                                            sizeof (s_un))) ||
505            (EINPROGRESS == errno) ) )
506     {
507       LOG (GNUNET_ERROR_TYPE_DEBUG,
508            "Successfully connected to unixpath `%s'!\n",
509            unixpath);
510       GNUNET_free (unixpath);
511       return sock;
512     }
513     if (NULL != sock)
514       GNUNET_NETWORK_socket_close (sock);
515   }
516   GNUNET_free_non_null (unixpath);
517 #endif
518   return NULL;
519 }
520
521
522 /**
523  * Scheduler let us know that we're either ready to write on the
524  * socket OR connect timed out.  Do the right thing.
525  *
526  * @param cls the `struct AddressProbe *` with the address that we are probing
527  */
528 static void
529 connect_probe_continuation (void *cls)
530 {
531   struct AddressProbe *ap = cls;
532   struct ClientState *cstate = ap->cstate;
533   const struct GNUNET_SCHEDULER_TaskContext *tc;
534   int error;
535   socklen_t len;
536
537   ap->task = NULL;
538   GNUNET_assert (NULL != ap->sock);
539   GNUNET_CONTAINER_DLL_remove (cstate->ap_head,
540                                cstate->ap_tail,
541                                ap);
542   len = sizeof (error);
543   error = 0;
544   tc = GNUNET_SCHEDULER_get_task_context ();
545   if ( (0 == (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) ||
546        (GNUNET_OK !=
547         GNUNET_NETWORK_socket_getsockopt (ap->sock,
548                                           SOL_SOCKET,
549                                           SO_ERROR,
550                                           &error,
551                                           &len)) ||
552        (0 != error) )
553   {
554     GNUNET_break (GNUNET_OK ==
555                   GNUNET_NETWORK_socket_close (ap->sock));
556     GNUNET_free (ap);
557     if ( (NULL == cstate->ap_head) &&
558          //      (NULL == cstate->proxy_handshake) &&
559          (NULL == cstate->dns_active) )
560       connect_fail_continuation (cstate);
561     return;
562   }
563   LOG (GNUNET_ERROR_TYPE_DEBUG,
564        "Connection to `%s' succeeded!\n",
565        cstate->service_name);
566   /* trigger jobs that waited for the connection */
567   GNUNET_assert (NULL == cstate->sock);
568   cstate->sock = ap->sock;
569   GNUNET_free (ap);
570   cancel_aps (cstate);
571   connect_success_continuation (cstate);
572 }
573
574
575 /**
576  * Try to establish a connection given the specified address.
577  * This function is called by the resolver once we have a DNS reply.
578  *
579  * @param cls our `struct ClientState *`
580  * @param addr address to try, NULL for "last call"
581  * @param addrlen length of @a addr
582  */
583 static void
584 try_connect_using_address (void *cls,
585                            const struct sockaddr *addr,
586                            socklen_t addrlen)
587 {
588   struct ClientState *cstate = cls;
589   struct AddressProbe *ap;
590
591   if (NULL == addr)
592   {
593     cstate->dns_active = NULL;
594     if ( (NULL == cstate->ap_head) &&
595          //  (NULL == cstate->proxy_handshake) &&
596          (NULL == cstate->sock) )
597       connect_fail_continuation (cstate);
598     return;
599   }
600   if (NULL != cstate->sock)
601     return;                     /* already connected */
602   /* try to connect */
603   LOG (GNUNET_ERROR_TYPE_DEBUG,
604        "Trying to connect using address `%s:%u'\n",
605        GNUNET_a2s (addr,
606                    addrlen),
607        cstate->port);
608   ap = GNUNET_malloc (sizeof (struct AddressProbe) + addrlen);
609   ap->addr = (const struct sockaddr *) &ap[1];
610   GNUNET_memcpy (&ap[1],
611                  addr,
612                  addrlen);
613   ap->addrlen = addrlen;
614   ap->cstate = cstate;
615
616   switch (ap->addr->sa_family)
617   {
618   case AF_INET:
619     ((struct sockaddr_in *) ap->addr)->sin_port = htons (cstate->port);
620     break;
621   case AF_INET6:
622     ((struct sockaddr_in6 *) ap->addr)->sin6_port = htons (cstate->port);
623     break;
624   default:
625     GNUNET_break (0);
626     GNUNET_free (ap);
627     return;                     /* not supported by us */
628   }
629   ap->sock = GNUNET_NETWORK_socket_create (ap->addr->sa_family,
630                                            SOCK_STREAM,
631                                            0);
632   if (NULL == ap->sock)
633   {
634     GNUNET_free (ap);
635     return;                     /* not supported by OS */
636   }
637   if ( (GNUNET_OK !=
638         GNUNET_NETWORK_socket_connect (ap->sock,
639                                        ap->addr,
640                                        ap->addrlen)) &&
641        (EINPROGRESS != errno) )
642   {
643     /* maybe refused / unsupported address, try next */
644     GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
645                          "connect");
646     GNUNET_break (GNUNET_OK ==
647                   GNUNET_NETWORK_socket_close (ap->sock));
648     GNUNET_free (ap);
649     return;
650   }
651   GNUNET_CONTAINER_DLL_insert (cstate->ap_head,
652                                cstate->ap_tail,
653                                ap);
654   ap->task = GNUNET_SCHEDULER_add_write_net (GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
655                                              ap->sock,
656                                              &connect_probe_continuation,
657                                              ap);
658 }
659
660
661 /**
662  * Test whether the configuration has proper values for connection
663  * (UNIXPATH || (PORT && HOSTNAME)).
664  *
665  * @param service_name name of service to connect to
666  * @param cfg configuration to use
667  * @return #GNUNET_OK if the configuration is valid, #GNUNET_SYSERR if not
668  */
669 static int
670 test_service_configuration (const char *service_name,
671                             const struct GNUNET_CONFIGURATION_Handle *cfg)
672 {
673   int ret = GNUNET_SYSERR;
674   char *hostname = NULL;
675   unsigned long long port;
676 #if AF_UNIX
677   char *unixpath = NULL;
678
679   if ((GNUNET_OK ==
680        GNUNET_CONFIGURATION_get_value_filename (cfg,
681                                                 service_name,
682                                                 "UNIXPATH",
683                                                 &unixpath)) &&
684       (0 < strlen (unixpath)))
685     ret = GNUNET_OK;
686   GNUNET_free_non_null (unixpath);
687 #endif
688
689   if ( (GNUNET_YES ==
690         GNUNET_CONFIGURATION_have_value (cfg,
691                                          service_name,
692                                          "PORT")) &&
693        (GNUNET_OK ==
694         GNUNET_CONFIGURATION_get_value_number (cfg,
695                                                service_name,
696                                                "PORT",
697                                                &port)) &&
698        (port <= 65535) &&
699        (0 != port) &&
700        (GNUNET_OK ==
701         GNUNET_CONFIGURATION_get_value_string (cfg,
702                                                service_name,
703                                                "HOSTNAME",
704                                                &hostname)) &&
705        (0 != strlen (hostname)) )
706     ret = GNUNET_OK;
707   GNUNET_free_non_null (hostname);
708   return ret;
709 }
710
711
712 /**
713  * Try to connect to the service.
714  *
715  * @param cls the `struct ClientState` to try to connect to the service
716  */
717 static void
718 start_connect (void *cls)
719 {
720   struct ClientState *cstate = cls;
721
722   cstate->retry_task = NULL;
723 #if 0
724   /* Never use a local source if a proxy is configured */
725   if (GNUNET_YES ==
726       GNUNET_SOCKS_check_service (cstate->service_name,
727                                   cstate->cfg))
728   {
729     socks_connect (cstate);
730     return;
731   }
732 #endif
733
734   if ( (0 == (cstate->attempts++ % 2)) ||
735        (0 == cstate->port) ||
736        (NULL == cstate->hostname) )
737   {
738     /* on even rounds, try UNIX first, or always
739        if we do not have a DNS name and TCP port. */
740     cstate->sock = try_unixpath (cstate->service_name,
741                                  cstate->cfg);
742     if (NULL != cstate->sock)
743     {
744       connect_success_continuation (cstate);
745       return;
746     }
747   }
748   if ( (NULL == cstate->hostname) ||
749        (0 == cstate->port) )
750   {
751     /* All options failed. Boo! */
752     connect_fail_continuation (cstate);
753     return;
754   }
755   cstate->dns_active
756     = GNUNET_RESOLVER_ip_get (cstate->hostname,
757                               AF_UNSPEC,
758                               GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
759                               &try_connect_using_address,
760                               cstate);
761 }
762
763
764 /**
765  * Implements the transmission functionality of a message queue.
766  *
767  * @param mq the message queue
768  * @param msg the message to send
769  * @param impl_state our `struct ClientState`
770  */
771 static void
772 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
773                              const struct GNUNET_MessageHeader *msg,
774                              void *impl_state)
775 {
776   struct ClientState *cstate = impl_state;
777
778   /* only one message at a time allowed */
779   GNUNET_assert (NULL == cstate->msg);
780   GNUNET_assert (NULL == cstate->send_task);
781   cstate->msg = msg;
782   cstate->msg_off = 0;
783   if (NULL == cstate->sock)
784     return; /* still waiting for connection */
785   cstate->send_task
786     = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
787                                       cstate->sock,
788                                       &transmit_ready,
789                                       cstate);
790 }
791
792
793 /**
794  * Cancel the currently sent message.
795  *
796  * @param mq message queue
797  * @param impl_state our `struct ClientState`
798  */
799 static void
800 connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
801                                void *impl_state)
802 {
803   struct ClientState *cstate = impl_state;
804
805   GNUNET_assert (NULL != cstate->msg);
806   GNUNET_assert (0 == cstate->msg_off);
807   cstate->msg = NULL;
808   if (NULL != cstate->send_task)
809   {
810     GNUNET_SCHEDULER_cancel (cstate->send_task);
811     cstate->send_task = NULL;
812   }
813 }
814
815
816 /**
817  * Create a message queue to connect to a GNUnet service.
818  * If handlers are specfied, receive messages from the connection.
819  *
820  * @param cfg our configuration
821  * @param service_name name of the service to connect to
822  * @param handlers handlers for receiving messages, can be NULL
823  * @param error_handler error handler
824  * @param error_handler_cls closure for the @a error_handler
825  * @return the message queue, NULL on error
826  */
827 struct GNUNET_MQ_Handle *
828 GNUNET_CLIENT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
829                        const char *service_name,
830                        const struct GNUNET_MQ_MessageHandler *handlers,
831                        GNUNET_MQ_ErrorHandler error_handler,
832                        void *error_handler_cls)
833 {
834   struct ClientState *cstate;
835
836   if (GNUNET_OK !=
837       test_service_configuration (service_name,
838                                   cfg))
839     return NULL;
840   cstate = GNUNET_new (struct ClientState);
841   cstate->service_name = GNUNET_strdup (service_name);
842   cstate->cfg = cfg;
843   cstate->retry_task = GNUNET_SCHEDULER_add_now (&start_connect,
844                                                  cstate);
845   cstate->mst = GNUNET_MST_create (&recv_message,
846                                    cstate);
847   if (GNUNET_YES ==
848       GNUNET_CONFIGURATION_have_value (cfg,
849                                        service_name,
850                                        "PORT"))
851   {
852     if (! ( (GNUNET_OK !=
853              GNUNET_CONFIGURATION_get_value_number (cfg,
854                                                     service_name,
855                                                     "PORT",
856                                                     &cstate->port)) ||
857             (cstate->port > 65535) ||
858             (GNUNET_OK !=
859              GNUNET_CONFIGURATION_get_value_string (cfg,
860                                                     service_name,
861                                                     "HOSTNAME",
862                                                     &cstate->hostname)) ) &&
863         (0 == strlen (cstate->hostname)) )
864     {
865       GNUNET_free (cstate->hostname);
866       cstate->hostname = NULL;
867       LOG (GNUNET_ERROR_TYPE_WARNING,
868            _("Need a non-empty hostname for service `%s'.\n"),
869            service_name);
870     }
871   }
872   cstate->mq = GNUNET_MQ_queue_for_callbacks (&connection_client_send_impl,
873                                               &connection_client_destroy_impl,
874                                               &connection_client_cancel_impl,
875                                               cstate,
876                                               handlers,
877                                               error_handler,
878                                               error_handler_cls);
879   return cstate->mq;
880 }
881
882 /* end of client.c */