use NULL as flag for evaluation of query, ensure we pass non-NULL for reply_block...
[oweals/gnunet.git] / src / util / client.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2001-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file util/client.c
23  * @brief code for access to services
24  * @author Christian Grothoff
25  *
26  * Generic TCP code for reliable, record-oriented TCP
27  * connections between clients and service providers.
28  */
29 #include "platform.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_resolver_service.h"
33 #include "gnunet_socks.h"
34
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "util",__VA_ARGS__)
37
38
39 /**
40  * Internal state for a client connected to a GNUnet service.
41  */
42 struct ClientState;
43
44
45 /**
46  * During connect, we try multiple possible IP addresses
47  * to find out which one might work.
48  */
49 struct AddressProbe
50 {
51
52   /**
53    * This is a linked list.
54    */
55   struct AddressProbe *next;
56
57   /**
58    * This is a doubly-linked list.
59    */
60   struct AddressProbe *prev;
61
62   /**
63    * The address; do not free (allocated at the end of this struct).
64    */
65   const struct sockaddr *addr;
66
67   /**
68    * Underlying OS's socket.
69    */
70   struct GNUNET_NETWORK_Handle *sock;
71
72   /**
73    * Connection for which we are probing.
74    */
75   struct ClientState *cstate;
76
77   /**
78    * Lenth of addr.
79    */
80   socklen_t addrlen;
81
82   /**
83    * Task waiting for the connection to finish connecting.
84    */
85   struct GNUNET_SCHEDULER_Task *task;
86 };
87
88
89 /**
90  * Internal state for a client connected to a GNUnet service.
91  */
92 struct ClientState
93 {
94
95   /**
96    * The connection handle, NULL if not live
97    */
98   struct GNUNET_NETWORK_Handle *sock;
99
100   /**
101    * Handle to a pending DNS lookup request, NULL if DNS is finished.
102    */
103   struct GNUNET_RESOLVER_RequestHandle *dns_active;
104
105   /**
106    * Our configuration.
107    */
108   const struct GNUNET_CONFIGURATION_Handle *cfg;
109
110   /**
111    * Linked list of sockets we are currently trying out
112    * (during connect).
113    */
114   struct AddressProbe *ap_head;
115
116   /**
117    * Linked list of sockets we are currently trying out
118    * (during connect).
119    */
120   struct AddressProbe *ap_tail;
121
122   /**
123    * Name of the service we interact with.
124    */
125   char *service_name;
126
127   /**
128    * Hostname, if any.
129    */
130   char *hostname;
131
132   /**
133    * Next message to transmit to the service. NULL for none.
134    */
135   const struct GNUNET_MessageHeader *msg;
136
137   /**
138    * Task for trying to connect to the service.
139    */
140   struct GNUNET_SCHEDULER_Task *retry_task;
141
142   /**
143    * Task for sending messages to the service.
144    */
145   struct GNUNET_SCHEDULER_Task *send_task;
146
147   /**
148    * Task for sending messages to the service.
149    */
150   struct GNUNET_SCHEDULER_Task *recv_task;
151
152   /**
153    * Tokenizer for inbound messages.
154    */
155   struct GNUNET_MessageStreamTokenizer *mst;
156
157   /**
158    * Message queue under our control.
159    */
160   struct GNUNET_MQ_Handle *mq;
161
162   /**
163    * Timeout for receiving a response (absolute time).
164    */
165   struct GNUNET_TIME_Absolute receive_timeout;
166
167   /**
168    * Current value for our incremental back-off (for
169    * connect re-tries).
170    */
171   struct GNUNET_TIME_Relative back_off;
172
173   /**
174    * TCP port (0 for disabled).
175    */
176   unsigned long long port;
177
178   /**
179    * Offset in the message where we are for transmission.
180    */
181   size_t msg_off;
182
183   /**
184    * How often have we tried to connect?
185    */
186   unsigned int attempts;
187
188   /**
189    * Are we supposed to die?  #GNUNET_SYSERR if destruction must be
190    * deferred, #GNUNET_NO by default, #GNUNET_YES if destruction was
191    * deferred.
192    */
193   int in_destroy;
194
195 };
196
197
198 /**
199  * Try to connect to the service.
200  *
201  * @param cls the `struct ClientState` to try to connect to the service
202  */
203 static void
204 start_connect (void *cls);
205
206
207 /**
208  * We've failed for good to establish a connection (timeout or
209  * no more addresses to try).
210  *
211  * @param cstate the connection we tried to establish
212  */
213 static void
214 connect_fail_continuation (struct ClientState *cstate)
215 {
216   GNUNET_break (NULL == cstate->ap_head);
217   GNUNET_break (NULL == cstate->ap_tail);
218   GNUNET_break (NULL == cstate->dns_active);
219   GNUNET_break (NULL == cstate->sock);
220   GNUNET_assert (NULL == cstate->send_task);
221   GNUNET_assert (NULL == cstate->recv_task);
222   // GNUNET_assert (NULL == cstate->proxy_handshake);
223
224   cstate->back_off = GNUNET_TIME_STD_BACKOFF (cstate->back_off);
225   LOG (GNUNET_ERROR_TYPE_DEBUG,
226        "Failed to establish connection to `%s', no further addresses to try, will try again in %s.\n",
227        cstate->service_name,
228        GNUNET_STRINGS_relative_time_to_string (cstate->back_off,
229                                                GNUNET_YES));
230   cstate->retry_task
231     = GNUNET_SCHEDULER_add_delayed (cstate->back_off,
232                                     &start_connect,
233                                     cstate);
234 }
235
236
237 /**
238  * We are ready to send a message to the service.
239  *
240  * @param cls the `struct ClientState` with the `msg` to transmit
241  */
242 static void
243 transmit_ready (void *cls)
244 {
245   struct ClientState *cstate = cls;
246   ssize_t ret;
247   size_t len;
248   const char *pos;
249   int notify_in_flight;
250
251   cstate->send_task = NULL;
252   pos = (const char *) cstate->msg;
253   len = ntohs (cstate->msg->size);
254   GNUNET_assert (cstate->msg_off < len);
255  RETRY:
256   ret = GNUNET_NETWORK_socket_send (cstate->sock,
257                                     &pos[cstate->msg_off],
258                                     len - cstate->msg_off);
259   if (-1 == ret)
260   {
261     if (EINTR == errno)
262       goto RETRY;
263     GNUNET_MQ_inject_error (cstate->mq,
264                             GNUNET_MQ_ERROR_WRITE);
265     return;
266   }
267   notify_in_flight = (0 == cstate->msg_off);
268   cstate->msg_off += ret;
269   if (cstate->msg_off < len)
270   {
271     cstate->send_task
272       = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
273                                         cstate->sock,
274                                         &transmit_ready,
275                                         cstate);
276     if (notify_in_flight)
277       GNUNET_MQ_impl_send_in_flight (cstate->mq);
278     return;
279   }
280   cstate->msg = NULL;
281   GNUNET_MQ_impl_send_continue (cstate->mq);
282 }
283
284
285 /**
286  * We have received a full message, pass to the MQ dispatcher.
287  * Called by the tokenizer via #receive_ready().
288  *
289  * @param cls the `struct ClientState`
290  * @param msg message we received.
291  * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
292  */
293 static int
294 recv_message (void *cls,
295               const struct GNUNET_MessageHeader *msg)
296 {
297   struct ClientState *cstate = cls;
298
299   if (GNUNET_YES == cstate->in_destroy)
300     return GNUNET_SYSERR;
301   GNUNET_MQ_inject_message (cstate->mq,
302                             msg);
303   if (GNUNET_YES == cstate->in_destroy)
304     return GNUNET_SYSERR;
305   return GNUNET_OK;
306 }
307
308
309 /**
310  * Cancel all remaining connect attempts
311  *
312  * @param cstate handle of the client state to process
313  */
314 static void
315 cancel_aps (struct ClientState *cstate)
316 {
317   struct AddressProbe *pos;
318
319   while (NULL != (pos = cstate->ap_head))
320   {
321     GNUNET_break (GNUNET_OK ==
322                   GNUNET_NETWORK_socket_close (pos->sock));
323     GNUNET_SCHEDULER_cancel (pos->task);
324     GNUNET_CONTAINER_DLL_remove (cstate->ap_head,
325                                  cstate->ap_tail,
326                                  pos);
327     GNUNET_free (pos);
328   }
329 }
330
331
332 /**
333  * Implement the destruction of a message queue.  Implementations must
334  * not free @a mq, but should take care of @a impl_state.
335  *
336  * @param mq the message queue to destroy
337  * @param impl_state our `struct ClientState`
338  */
339 static void
340 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
341                                 void *impl_state)
342 {
343   struct ClientState *cstate = impl_state;
344
345   if (GNUNET_SYSERR == cstate->in_destroy)
346   {
347     /* defer destruction */
348     cstate->in_destroy = GNUNET_YES;
349     cstate->mq = NULL;
350     return;
351   }
352   if (NULL != cstate->dns_active)
353     GNUNET_RESOLVER_request_cancel (cstate->dns_active);
354   if (NULL != cstate->send_task)
355     GNUNET_SCHEDULER_cancel (cstate->send_task);
356   if (NULL != cstate->recv_task)
357     GNUNET_SCHEDULER_cancel (cstate->recv_task);
358   if (NULL != cstate->retry_task)
359     GNUNET_SCHEDULER_cancel (cstate->retry_task);
360   if (NULL != cstate->sock)
361     GNUNET_NETWORK_socket_close (cstate->sock);
362   cancel_aps (cstate);
363   GNUNET_free (cstate->service_name);
364   GNUNET_free_non_null (cstate->hostname);
365   GNUNET_MST_destroy (cstate->mst);
366   GNUNET_free (cstate);
367 }
368
369
370 /**
371  * This function is called once we have data ready to read.
372  *
373  * @param cls `struct ClientState` with connection to read from
374  */
375 static void
376 receive_ready (void *cls)
377 {
378   struct ClientState *cstate = cls;
379   int ret;
380
381   cstate->recv_task = NULL;
382   cstate->in_destroy = GNUNET_SYSERR;
383   ret = GNUNET_MST_read (cstate->mst,
384                          cstate->sock,
385                          GNUNET_NO,
386                          GNUNET_NO);
387   if (GNUNET_SYSERR == ret)
388   {
389     if (NULL != cstate->mq)
390       GNUNET_MQ_inject_error (cstate->mq,
391                               GNUNET_MQ_ERROR_READ);
392     if (GNUNET_YES == cstate->in_destroy)
393       connection_client_destroy_impl (cstate->mq,
394                                       cstate);
395     return;
396   }
397   if (GNUNET_YES == cstate->in_destroy)
398   {
399     connection_client_destroy_impl (cstate->mq,
400                                     cstate);
401     return;
402   }
403   cstate->in_destroy = GNUNET_NO;
404   cstate->recv_task
405     = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
406                                      cstate->sock,
407                                      &receive_ready,
408                                      cstate);
409 }
410
411
412 /**
413  * We've succeeded in establishing a connection.
414  *
415  * @param cstate the connection we tried to establish
416  */
417 static void
418 connect_success_continuation (struct ClientState *cstate)
419 {
420   GNUNET_assert (NULL == cstate->recv_task);
421   cstate->recv_task
422     = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
423                                      cstate->sock,
424                                      &receive_ready,
425                                      cstate);
426   if (NULL != cstate->msg)
427   {
428     GNUNET_assert (NULL == cstate->send_task);
429     cstate->send_task
430       = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
431                                         cstate->sock,
432                                         &transmit_ready,
433                                         cstate);
434   }
435 }
436
437
438 /**
439  * Try connecting to the server using UNIX domain sockets.
440  *
441  * @param service_name name of service to connect to
442  * @param cfg configuration to use
443  * @return NULL on error, socket connected to UNIX otherwise
444  */
445 static struct GNUNET_NETWORK_Handle *
446 try_unixpath (const char *service_name,
447               const struct GNUNET_CONFIGURATION_Handle *cfg)
448 {
449 #if AF_UNIX
450   struct GNUNET_NETWORK_Handle *sock;
451   char *unixpath;
452   struct sockaddr_un s_un;
453
454   unixpath = NULL;
455   if ((GNUNET_OK ==
456        GNUNET_CONFIGURATION_get_value_filename (cfg,
457                                                 service_name,
458                                                 "UNIXPATH",
459                                                 &unixpath)) &&
460       (0 < strlen (unixpath)))
461   {
462     /* We have a non-NULL unixpath, need to validate it */
463     if (strlen (unixpath) >= sizeof (s_un.sun_path))
464     {
465       LOG (GNUNET_ERROR_TYPE_WARNING,
466            _("UNIXPATH `%s' too long, maximum length is %llu\n"),
467            unixpath,
468            (unsigned long long) sizeof (s_un.sun_path));
469       unixpath = GNUNET_NETWORK_shorten_unixpath (unixpath);
470       LOG (GNUNET_ERROR_TYPE_INFO,
471            _("Using `%s' instead\n"),
472            unixpath);
473       if (NULL == unixpath)
474         return NULL;
475     }
476     memset (&s_un,
477             0,
478             sizeof (s_un));
479     s_un.sun_family = AF_UNIX;
480     strncpy (s_un.sun_path,
481              unixpath,
482              sizeof (s_un.sun_path) - 1);
483 #ifdef LINUX
484     {
485       int abstract;
486
487       abstract = GNUNET_CONFIGURATION_get_value_yesno (cfg,
488                                                        "TESTING",
489                                                        "USE_ABSTRACT_SOCKETS");
490       if (GNUNET_YES == abstract)
491         s_un.sun_path[0] = '\0';
492     }
493 #endif
494 #if HAVE_SOCKADDR_IN_SIN_LEN
495     un.sun_len = (u_char) sizeof (struct sockaddr_un);
496 #endif
497     sock = GNUNET_NETWORK_socket_create (AF_UNIX,
498                                          SOCK_STREAM,
499                                          0);
500     if ( (NULL != sock) &&
501          ( (GNUNET_OK ==
502             GNUNET_NETWORK_socket_connect (sock,
503                                            (struct sockaddr *) &s_un,
504                                            sizeof (s_un))) ||
505            (EINPROGRESS == errno) ) )
506     {
507       LOG (GNUNET_ERROR_TYPE_DEBUG,
508            "Successfully connected to unixpath `%s'!\n",
509            unixpath);
510       GNUNET_free (unixpath);
511       return sock;
512     }
513   }
514   GNUNET_free_non_null (unixpath);
515 #endif
516   return NULL;
517 }
518
519
520 /**
521  * Scheduler let us know that we're either ready to write on the
522  * socket OR connect timed out.  Do the right thing.
523  *
524  * @param cls the `struct AddressProbe *` with the address that we are probing
525  */
526 static void
527 connect_probe_continuation (void *cls)
528 {
529   struct AddressProbe *ap = cls;
530   struct ClientState *cstate = ap->cstate;
531   const struct GNUNET_SCHEDULER_TaskContext *tc;
532   int error;
533   socklen_t len;
534
535   ap->task = NULL;
536   GNUNET_assert (NULL != ap->sock);
537   GNUNET_CONTAINER_DLL_remove (cstate->ap_head,
538                                cstate->ap_tail,
539                                ap);
540   len = sizeof (error);
541   error = 0;
542   tc = GNUNET_SCHEDULER_get_task_context ();
543   if ( (0 == (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) ||
544        (GNUNET_OK !=
545         GNUNET_NETWORK_socket_getsockopt (ap->sock,
546                                           SOL_SOCKET,
547                                           SO_ERROR,
548                                           &error,
549                                           &len)) ||
550        (0 != error) )
551   {
552     GNUNET_break (GNUNET_OK ==
553                   GNUNET_NETWORK_socket_close (ap->sock));
554     GNUNET_free (ap);
555     if ( (NULL == cstate->ap_head) &&
556          //      (NULL == cstate->proxy_handshake) &&
557          (NULL == cstate->dns_active) )
558       connect_fail_continuation (cstate);
559     return;
560   }
561   LOG (GNUNET_ERROR_TYPE_DEBUG,
562        "Connection to `%s' succeeded!\n",
563        cstate->service_name);
564   /* trigger jobs that waited for the connection */
565   GNUNET_assert (NULL == cstate->sock);
566   cstate->sock = ap->sock;
567   GNUNET_free (ap);
568   cancel_aps (cstate);
569   connect_success_continuation (cstate);
570 }
571
572
573 /**
574  * Try to establish a connection given the specified address.
575  * This function is called by the resolver once we have a DNS reply.
576  *
577  * @param cls our `struct ClientState *`
578  * @param addr address to try, NULL for "last call"
579  * @param addrlen length of @a addr
580  */
581 static void
582 try_connect_using_address (void *cls,
583                            const struct sockaddr *addr,
584                            socklen_t addrlen)
585 {
586   struct ClientState *cstate = cls;
587   struct AddressProbe *ap;
588
589   if (NULL == addr)
590   {
591     cstate->dns_active = NULL;
592     if ( (NULL == cstate->ap_head) &&
593          //  (NULL == cstate->proxy_handshake) &&
594          (NULL == cstate->sock) )
595       connect_fail_continuation (cstate);
596     return;
597   }
598   if (NULL != cstate->sock)
599     return;                     /* already connected */
600   /* try to connect */
601   LOG (GNUNET_ERROR_TYPE_DEBUG,
602        "Trying to connect using address `%s:%u'\n",
603        GNUNET_a2s (addr,
604                    addrlen),
605        cstate->port);
606   ap = GNUNET_malloc (sizeof (struct AddressProbe) + addrlen);
607   ap->addr = (const struct sockaddr *) &ap[1];
608   GNUNET_memcpy (&ap[1],
609                  addr,
610                  addrlen);
611   ap->addrlen = addrlen;
612   ap->cstate = cstate;
613
614   switch (ap->addr->sa_family)
615   {
616   case AF_INET:
617     ((struct sockaddr_in *) ap->addr)->sin_port = htons (cstate->port);
618     break;
619   case AF_INET6:
620     ((struct sockaddr_in6 *) ap->addr)->sin6_port = htons (cstate->port);
621     break;
622   default:
623     GNUNET_break (0);
624     GNUNET_free (ap);
625     return;                     /* not supported by us */
626   }
627   ap->sock = GNUNET_NETWORK_socket_create (ap->addr->sa_family,
628                                            SOCK_STREAM,
629                                            0);
630   if (NULL == ap->sock)
631   {
632     GNUNET_free (ap);
633     return;                     /* not supported by OS */
634   }
635   if ( (GNUNET_OK !=
636         GNUNET_NETWORK_socket_connect (ap->sock,
637                                        ap->addr,
638                                        ap->addrlen)) &&
639        (EINPROGRESS != errno) )
640   {
641     /* maybe refused / unsupported address, try next */
642     GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
643                          "connect");
644     GNUNET_break (GNUNET_OK ==
645                   GNUNET_NETWORK_socket_close (ap->sock));
646     GNUNET_free (ap);
647     return;
648   }
649   GNUNET_CONTAINER_DLL_insert (cstate->ap_head,
650                                cstate->ap_tail,
651                                ap);
652   ap->task = GNUNET_SCHEDULER_add_write_net (GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
653                                              ap->sock,
654                                              &connect_probe_continuation,
655                                              ap);
656 }
657
658
659 /**
660  * Test whether the configuration has proper values for connection
661  * (UNIXPATH || (PORT && HOSTNAME)).
662  *
663  * @param service_name name of service to connect to
664  * @param cfg configuration to use
665  * @return #GNUNET_OK if the configuration is valid, #GNUNET_SYSERR if not
666  */
667 static int
668 test_service_configuration (const char *service_name,
669                             const struct GNUNET_CONFIGURATION_Handle *cfg)
670 {
671   int ret = GNUNET_SYSERR;
672   char *hostname = NULL;
673   unsigned long long port;
674 #if AF_UNIX
675   char *unixpath = NULL;
676
677   if ((GNUNET_OK ==
678        GNUNET_CONFIGURATION_get_value_filename (cfg,
679                                                 service_name,
680                                                 "UNIXPATH",
681                                                 &unixpath)) &&
682       (0 < strlen (unixpath)))
683     ret = GNUNET_OK;
684   GNUNET_free_non_null (unixpath);
685 #endif
686
687   if ( (GNUNET_YES ==
688         GNUNET_CONFIGURATION_have_value (cfg,
689                                          service_name,
690                                          "PORT")) &&
691        (GNUNET_OK ==
692         GNUNET_CONFIGURATION_get_value_number (cfg,
693                                                service_name,
694                                                "PORT",
695                                                &port)) &&
696        (port <= 65535) &&
697        (0 != port) &&
698        (GNUNET_OK ==
699         GNUNET_CONFIGURATION_get_value_string (cfg,
700                                                service_name,
701                                                "HOSTNAME",
702                                                &hostname)) &&
703        (0 != strlen (hostname)) )
704     ret = GNUNET_OK;
705   GNUNET_free_non_null (hostname);
706   return ret;
707 }
708
709
710 /**
711  * Try to connect to the service.
712  *
713  * @param cls the `struct ClientState` to try to connect to the service
714  */
715 static void
716 start_connect (void *cls)
717 {
718   struct ClientState *cstate = cls;
719
720   cstate->retry_task = NULL;
721 #if 0
722   /* Never use a local source if a proxy is configured */
723   if (GNUNET_YES ==
724       GNUNET_SOCKS_check_service (cstate->service_name,
725                                   cstate->cfg))
726   {
727     socks_connect (cstate);
728     return;
729   }
730 #endif
731
732   if ( (0 == (cstate->attempts++ % 2)) ||
733        (0 == cstate->port) ||
734        (NULL == cstate->hostname) )
735   {
736     /* on even rounds, try UNIX first, or always
737        if we do not have a DNS name and TCP port. */
738     cstate->sock = try_unixpath (cstate->service_name,
739                                  cstate->cfg);
740     if (NULL != cstate->sock)
741     {
742       connect_success_continuation (cstate);
743       return;
744     }
745   }
746   if ( (NULL == cstate->hostname) ||
747        (0 == cstate->port) )
748   {
749     /* All options failed. Boo! */
750     connect_fail_continuation (cstate);
751     return;
752   }
753   cstate->dns_active
754     = GNUNET_RESOLVER_ip_get (cstate->hostname,
755                               AF_UNSPEC,
756                               GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
757                               &try_connect_using_address,
758                               cstate);
759 }
760
761
762 /**
763  * Implements the transmission functionality of a message queue.
764  *
765  * @param mq the message queue
766  * @param msg the message to send
767  * @param impl_state our `struct ClientState`
768  */
769 static void
770 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
771                              const struct GNUNET_MessageHeader *msg,
772                              void *impl_state)
773 {
774   struct ClientState *cstate = impl_state;
775
776   /* only one message at a time allowed */
777   GNUNET_assert (NULL == cstate->msg);
778   GNUNET_assert (NULL == cstate->send_task);
779   cstate->msg = msg;
780   cstate->msg_off = 0;
781   if (NULL == cstate->sock)
782     return; /* still waiting for connection */
783   cstate->send_task
784     = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
785                                       cstate->sock,
786                                       &transmit_ready,
787                                       cstate);
788 }
789
790
791 /**
792  * Cancel the currently sent message.
793  *
794  * @param mq message queue
795  * @param impl_state our `struct ClientState`
796  */
797 static void
798 connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
799                                void *impl_state)
800 {
801   struct ClientState *cstate = impl_state;
802
803   GNUNET_assert (NULL != cstate->msg);
804   GNUNET_assert (0 == cstate->msg_off);
805   cstate->msg = NULL;
806   if (NULL != cstate->send_task)
807   {
808     GNUNET_SCHEDULER_cancel (cstate->send_task);
809     cstate->send_task = NULL;
810   }
811 }
812
813
814 /**
815  * Create a message queue to connect to a GNUnet service.
816  * If handlers are specfied, receive messages from the connection.
817  *
818  * @param cfg our configuration
819  * @param service_name name of the service to connect to
820  * @param handlers handlers for receiving messages, can be NULL
821  * @param error_handler error handler
822  * @param error_handler_cls closure for the @a error_handler
823  * @return the message queue, NULL on error
824  */
825 struct GNUNET_MQ_Handle *
826 GNUNET_CLIENT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
827                        const char *service_name,
828                        const struct GNUNET_MQ_MessageHandler *handlers,
829                        GNUNET_MQ_ErrorHandler error_handler,
830                        void *error_handler_cls)
831 {
832   struct ClientState *cstate;
833
834   if (GNUNET_OK !=
835       test_service_configuration (service_name,
836                                   cfg))
837     return NULL;
838   cstate = GNUNET_new (struct ClientState);
839   cstate->service_name = GNUNET_strdup (service_name);
840   cstate->cfg = cfg;
841   cstate->retry_task = GNUNET_SCHEDULER_add_now (&start_connect,
842                                                  cstate);
843   cstate->mst = GNUNET_MST_create (&recv_message,
844                                    cstate);
845   if (GNUNET_YES ==
846       GNUNET_CONFIGURATION_have_value (cfg,
847                                        service_name,
848                                        "PORT"))
849   {
850     if (! ( (GNUNET_OK !=
851              GNUNET_CONFIGURATION_get_value_number (cfg,
852                                                     service_name,
853                                                     "PORT",
854                                                     &cstate->port)) ||
855             (cstate->port > 65535) ||
856             (GNUNET_OK !=
857              GNUNET_CONFIGURATION_get_value_string (cfg,
858                                                     service_name,
859                                                     "HOSTNAME",
860                                                     &cstate->hostname)) ) &&
861         (0 == strlen (cstate->hostname)) )
862     {
863       GNUNET_free (cstate->hostname);
864       cstate->hostname = NULL;
865       LOG (GNUNET_ERROR_TYPE_WARNING,
866            _("Need a non-empty hostname for service `%s'.\n"),
867            service_name);
868     }
869   }
870   cstate->mq = GNUNET_MQ_queue_for_callbacks (&connection_client_send_impl,
871                                               &connection_client_destroy_impl,
872                                               &connection_client_cancel_impl,
873                                               cstate,
874                                               handlers,
875                                               error_handler,
876                                               error_handler_cls);
877   return cstate->mq;
878 }
879
880 /* end of client_new.c */