first step to remove plibc
[oweals/gnunet.git] / src / transport / gnunet-communicator-unix.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2010-2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file transport/gnunet-communicator-unix.c
23  * @brief Transport plugin using unix domain sockets (!)
24  *        Clearly, can only be used locally on Unix/Linux hosts...
25  *        ONLY INTENDED FOR TESTING!!!
26  * @author Christian Grothoff
27  * @author Nathan Evans
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_nt_lib.h"
34 #include "gnunet_statistics_service.h"
35 #include "gnunet_transport_communication_service.h"
36
37 /**
38  * How many messages do we keep at most in the queue to the
39  * transport service before we start to drop (default,
40  * can be changed via the configuration file).
41  * Should be _below_ the level of the communicator API, as
42  * otherwise we may read messages just to have them dropped
43  * by the communicator API.
44  */
45 #define DEFAULT_MAX_QUEUE_LENGTH 8
46
47 /**
48  * Address prefix used by the communicator.
49  */
50 #define COMMUNICATOR_ADDRESS_PREFIX "unix"
51
52 /**
53  * Configuration section used by the communicator.
54  */
55 #define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
56
57 /**
58  * Our MTU.
59  */
60 #define UNIX_MTU UINT16_MAX
61
62 GNUNET_NETWORK_STRUCT_BEGIN
63
64 /**
65  * UNIX Message-Packet header.
66  */
67 struct UNIXMessage
68 {
69   /**
70    * Message header.
71    */
72   struct GNUNET_MessageHeader header;
73
74   /**
75    * What is the identity of the sender (GNUNET_hash of public key)
76    */
77   struct GNUNET_PeerIdentity sender;
78 };
79
80 GNUNET_NETWORK_STRUCT_END
81
82
83 /**
84  * Handle for a queue.
85  */
86 struct Queue
87 {
88
89   /**
90    * Queues with pending messages (!) are kept in a DLL.
91    */
92   struct Queue *next;
93
94   /**
95    * Queues with pending messages (!) are kept in a DLL.
96    */
97   struct Queue *prev;
98
99   /**
100    * To whom are we talking to.
101    */
102   struct GNUNET_PeerIdentity target;
103
104   /**
105    * Address of the other peer.
106    */
107   struct sockaddr_un *address;
108
109   /**
110    * Length of the address.
111    */
112   socklen_t address_len;
113
114   /**
115    * Message currently scheduled for transmission, non-NULL if and only
116    * if this queue is in the #queue_head DLL.
117    */
118   const struct GNUNET_MessageHeader *msg;
119
120   /**
121    * Message queue we are providing for the #ch.
122    */
123   struct GNUNET_MQ_Handle *mq;
124
125   /**
126    * handle for this queue with the #ch.
127    */
128   struct GNUNET_TRANSPORT_QueueHandle *qh;
129
130   /**
131    * Number of bytes we currently have in our write queue.
132    */
133   unsigned long long bytes_in_queue;
134
135   /**
136    * Timeout for this queue.
137    */
138   struct GNUNET_TIME_Absolute timeout;
139
140   /**
141    * Queue timeout task.
142    */
143   struct GNUNET_SCHEDULER_Task *timeout_task;
144 };
145
146
147 /**
148  * ID of read task
149  */
150 static struct GNUNET_SCHEDULER_Task *read_task;
151
152 /**
153  * ID of write task
154  */
155 static struct GNUNET_SCHEDULER_Task *write_task;
156
157 /**
158  * Number of messages we currently have in our queues towards the transport service.
159  */
160 static unsigned long long delivering_messages;
161
162 /**
163  * Maximum queue length before we stop reading towards the transport service.
164  */
165 static unsigned long long max_queue_length;
166
167 /**
168  * For logging statistics.
169  */
170 static struct GNUNET_STATISTICS_Handle *stats;
171
172 /**
173  * Our environment.
174  */
175 static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
176
177 /**
178  * Queues (map from peer identity to `struct Queue`)
179  */
180 static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
181
182 /**
183  * Head of queue of messages to transmit.
184  */
185 static struct Queue *queue_head;
186
187 /**
188  * Tail of queue of messages to transmit.
189  */
190 static struct Queue *queue_tail;
191
192 /**
193  * socket that we transmit all data with
194  */
195 static struct GNUNET_NETWORK_Handle *unix_sock;
196
197 /**
198  * Handle to the operation that publishes our address.
199  */
200 static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
201
202
203 /**
204  * Functions with this signature are called whenever we need
205  * to close a queue due to a disconnect or failure to
206  * establish a connection.
207  *
208  * @param queue queue to close down
209  */
210 static void
211 queue_destroy (struct Queue *queue)
212 {
213   struct GNUNET_MQ_Handle *mq;
214
215   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
216               "Disconnecting queue for peer `%s'\n",
217               GNUNET_i2s (&queue->target));
218   if (0 != queue->bytes_in_queue)
219   {
220     GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
221     queue->bytes_in_queue = 0;
222   }
223   if (NULL != (mq = queue->mq))
224   {
225     queue->mq = NULL;
226     GNUNET_MQ_destroy (mq);
227   }
228   GNUNET_assert (
229     GNUNET_YES ==
230     GNUNET_CONTAINER_multipeermap_remove (queue_map, &queue->target, queue));
231   GNUNET_STATISTICS_set (stats,
232                          "# queues active",
233                          GNUNET_CONTAINER_multipeermap_size (queue_map),
234                          GNUNET_NO);
235   if (NULL != queue->timeout_task)
236   {
237     GNUNET_SCHEDULER_cancel (queue->timeout_task);
238     queue->timeout_task = NULL;
239   }
240   GNUNET_free (queue->address);
241   GNUNET_free (queue);
242 }
243
244
245 /**
246  * Queue was idle for too long, so disconnect it
247  *
248  * @param cls the `struct Queue *` to disconnect
249  */
250 static void
251 queue_timeout (void *cls)
252 {
253   struct Queue *queue = cls;
254   struct GNUNET_TIME_Relative left;
255
256   queue->timeout_task = NULL;
257   left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
258   if (0 != left.rel_value_us)
259   {
260     /* not actually our turn yet, but let's at least update
261        the monitor, it may think we're about to die ... */
262     queue->timeout_task =
263       GNUNET_SCHEDULER_add_delayed (left, &queue_timeout, queue);
264     return;
265   }
266   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
267               "Queue %p was idle for %s, disconnecting\n",
268               queue,
269               GNUNET_STRINGS_relative_time_to_string (
270                 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
271                 GNUNET_YES));
272   queue_destroy (queue);
273 }
274
275
276 /**
277  * Increment queue timeout due to activity.  We do not immediately
278  * notify the monitor here as that might generate excessive
279  * signalling.
280  *
281  * @param queue queue for which the timeout should be rescheduled
282  */
283 static void
284 reschedule_queue_timeout (struct Queue *queue)
285 {
286   GNUNET_assert (NULL != queue->timeout_task);
287   queue->timeout =
288     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
289 }
290
291
292 /**
293  * Convert unix path to a `struct sockaddr_un *`
294  *
295  * @param unixpath path to convert
296  * @param[out] sock_len set to the length of the address
297  * @param is_abstract is this an abstract @a unixpath
298  * @return converted unix path
299  */
300 static struct sockaddr_un *
301 unix_address_to_sockaddr (const char *unixpath, socklen_t *sock_len)
302 {
303   struct sockaddr_un *un;
304   size_t slen;
305
306   GNUNET_assert (0 < strlen (unixpath)); /* sanity check */
307   un = GNUNET_new (struct sockaddr_un);
308   un->sun_family = AF_UNIX;
309   slen = strlen (unixpath);
310   if (slen >= sizeof (un->sun_path))
311     slen = sizeof (un->sun_path) - 1;
312   GNUNET_memcpy (un->sun_path, unixpath, slen);
313   un->sun_path[slen] = '\0';
314   slen = sizeof (struct sockaddr_un);
315 #if HAVE_SOCKADDR_UN_SUN_LEN
316   un->sun_len = (u_char) slen;
317 #endif
318   (*sock_len) = slen;
319   if ('@' == un->sun_path[0])
320     un->sun_path[0] = '\0';
321   return un;
322 }
323
324
325 /**
326  * Closure to #lookup_queue_it().
327  */
328 struct LookupCtx
329 {
330   /**
331    * Location to store the queue, if found.
332    */
333   struct Queue *res;
334
335   /**
336    * Address we are looking for.
337    */
338   const struct sockaddr_un *un;
339
340   /**
341    * Number of bytes in @a un
342    */
343   socklen_t un_len;
344 };
345
346
347 /**
348  * Function called to find a queue by address.
349  *
350  * @param cls the `struct LookupCtx *`
351  * @param key peer we are looking for (unused)
352  * @param value a queue
353  * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
354  */
355 static int
356 lookup_queue_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
357 {
358   struct LookupCtx *lctx = cls;
359   struct Queue *queue = value;
360
361   if ((queue->address_len = lctx->un_len) &&
362       (0 == memcmp (lctx->un, queue->address, queue->address_len)))
363   {
364     lctx->res = queue;
365     return GNUNET_NO;
366   }
367   return GNUNET_YES;
368 }
369
370
371 /**
372  * Find an existing queue by address.
373  *
374  * @param plugin the plugin
375  * @param address the address to find
376  * @return NULL if queue was not found
377  */
378 static struct Queue *
379 lookup_queue (const struct GNUNET_PeerIdentity *peer,
380               const struct sockaddr_un *un,
381               socklen_t un_len)
382 {
383   struct LookupCtx lctx;
384
385   lctx.un = un;
386   lctx.un_len = un_len;
387   GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
388                                               peer,
389                                               &lookup_queue_it,
390                                               &lctx);
391   return lctx.res;
392 }
393
394
395 /**
396  * We have been notified that our socket is ready to write.
397  * Then reschedule this function to be called again once more is available.
398  *
399  * @param cls NULL
400  */
401 static void
402 select_write_cb (void *cls)
403 {
404   struct Queue *queue = queue_tail;
405   const struct GNUNET_MessageHeader *msg = queue->msg;
406   size_t msg_size = ntohs (msg->size);
407   ssize_t sent;
408
409   /* take queue of the ready list */
410   write_task = NULL;
411   GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
412   if (NULL != queue_head)
413     write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
414                                                  unix_sock,
415                                                  &select_write_cb,
416                                                  NULL);
417
418   /* send 'msg' */
419   queue->msg = NULL;
420   GNUNET_MQ_impl_send_continue (queue->mq);
421 resend:
422   /* Send the data */
423   sent = GNUNET_NETWORK_socket_sendto (unix_sock,
424                                        msg,
425                                        msg_size,
426                                        (const struct sockaddr *) queue->address,
427                                        queue->address_len);
428   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
429               "UNIX transmitted message to %s (%d/%u: %s)\n",
430               GNUNET_i2s (&queue->target),
431               (int) sent,
432               (unsigned int) msg_size,
433               (sent < 0) ? strerror (errno) : "ok");
434   if (-1 != sent)
435   {
436     GNUNET_STATISTICS_update (stats,
437                               "# bytes sent",
438                               (long long) sent,
439                               GNUNET_NO);
440     reschedule_queue_timeout (queue);
441     return; /* all good */
442   }
443   GNUNET_STATISTICS_update (stats,
444                             "# network transmission failures",
445                             1,
446                             GNUNET_NO);
447   switch (errno)
448   {
449   case EAGAIN:
450   case ENOBUFS:
451     /* We should retry later... */
452     GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, "send");
453     return;
454   case EMSGSIZE: {
455     socklen_t size = 0;
456     socklen_t len = sizeof (size);
457
458     GNUNET_NETWORK_socket_getsockopt (unix_sock,
459                                       SOL_SOCKET,
460                                       SO_SNDBUF,
461                                       &size,
462                                       &len);
463     if (size > ntohs (msg->size))
464     {
465       /* Buffer is bigger than message:  error, no retry
466          * This should never happen!*/
467       GNUNET_break (0);
468       return;
469     }
470     GNUNET_log (
471       GNUNET_ERROR_TYPE_DEBUG,
472       "Trying to increase socket buffer size from %u to %u for message size %u\n",
473       (unsigned int) size,
474       (unsigned int) ((msg_size / 1000) + 2) * 1000,
475       (unsigned int) msg_size);
476     size = ((msg_size / 1000) + 2) * 1000;
477     if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt (unix_sock,
478                                                        SOL_SOCKET,
479                                                        SO_SNDBUF,
480                                                        &size,
481                                                        sizeof (size)))
482       goto resend; /* Increased buffer size, retry sending */
483     /* Ok, then just try very modest increase */
484     size = msg_size;
485     if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt (unix_sock,
486                                                        SOL_SOCKET,
487                                                        SO_SNDBUF,
488                                                        &size,
489                                                        sizeof (size)))
490       goto resend; /* Increased buffer size, retry sending */
491     /* Could not increase buffer size: error, no retry */
492     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt");
493     return;
494   }
495   default:
496     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "send");
497     return;
498   }
499 }
500
501
502 /**
503  * Signature of functions implementing the sending functionality of a
504  * message queue.
505  *
506  * @param mq the message queue
507  * @param msg the message to send
508  * @param impl_state our `struct Queue`
509  */
510 static void
511 mq_send (struct GNUNET_MQ_Handle *mq,
512          const struct GNUNET_MessageHeader *msg,
513          void *impl_state)
514 {
515   struct Queue *queue = impl_state;
516
517   GNUNET_assert (mq == queue->mq);
518   GNUNET_assert (NULL == queue->msg);
519   queue->msg = msg;
520   GNUNET_CONTAINER_DLL_insert (queue_head, queue_tail, queue);
521   GNUNET_assert (NULL != unix_sock);
522   if (NULL == write_task)
523     write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
524                                                  unix_sock,
525                                                  &select_write_cb,
526                                                  NULL);
527 }
528
529
530 /**
531  * Signature of functions implementing the destruction of a message
532  * queue.  Implementations must not free @a mq, but should take care
533  * of @a impl_state.
534  *
535  * @param mq the message queue to destroy
536  * @param impl_state our `struct Queue`
537  */
538 static void
539 mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
540 {
541   struct Queue *queue = impl_state;
542
543   if (mq == queue->mq)
544   {
545     queue->mq = NULL;
546     queue_destroy (queue);
547   }
548 }
549
550
551 /**
552  * Implementation function that cancels the currently sent message.
553  *
554  * @param mq message queue
555  * @param impl_state our `struct Queue`
556  */
557 static void
558 mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
559 {
560   struct Queue *queue = impl_state;
561
562   GNUNET_assert (NULL != queue->msg);
563   queue->msg = NULL;
564   GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
565   GNUNET_assert (NULL != write_task);
566   if (NULL == queue_head)
567   {
568     GNUNET_SCHEDULER_cancel (write_task);
569     write_task = NULL;
570   }
571 }
572
573
574 /**
575  * Generic error handler, called with the appropriate
576  * error code and the same closure specified at the creation of
577  * the message queue.
578  * Not every message queue implementation supports an error handler.
579  *
580  * @param cls our `struct Queue`
581  * @param error error code
582  */
583 static void
584 mq_error (void *cls, enum GNUNET_MQ_Error error)
585 {
586   struct Queue *queue = cls;
587
588   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
589               "UNIX MQ error in queue to %s: %d\n",
590               GNUNET_i2s (&queue->target),
591               (int) error);
592   queue_destroy (queue);
593 }
594
595
596 /**
597  * Creates a new outbound queue the transport service will use to send
598  * data to another peer.
599  *
600  * @param peer the target peer
601  * @param cs inbound or outbound queue
602  * @param un the address
603  * @param un_len number of bytes in @a un
604  * @return the queue or NULL of max connections exceeded
605  */
606 static struct Queue *
607 setup_queue (const struct GNUNET_PeerIdentity *target,
608              enum GNUNET_TRANSPORT_ConnectionStatus cs,
609              const struct sockaddr_un *un,
610              socklen_t un_len)
611 {
612   struct Queue *queue;
613
614   queue = GNUNET_new (struct Queue);
615   queue->target = *target;
616   queue->address = GNUNET_memdup (un, un_len);
617   queue->address_len = un_len;
618   (void) GNUNET_CONTAINER_multipeermap_put (
619     queue_map,
620     &queue->target,
621     queue,
622     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
623   GNUNET_STATISTICS_set (stats,
624                          "# queues active",
625                          GNUNET_CONTAINER_multipeermap_size (queue_map),
626                          GNUNET_NO);
627   queue->timeout =
628     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
629   queue->timeout_task =
630     GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
631                                   &queue_timeout,
632                                   queue);
633   queue->mq = GNUNET_MQ_queue_for_callbacks (&mq_send,
634                                              &mq_destroy,
635                                              &mq_cancel,
636                                              queue,
637                                              NULL,
638                                              &mq_error,
639                                              queue);
640   {
641     char *foreign_addr;
642
643     if ('\0' == un->sun_path[0])
644       GNUNET_asprintf (&foreign_addr,
645                        "%s-@%s",
646                        COMMUNICATOR_ADDRESS_PREFIX,
647                        &un->sun_path[1]);
648     else
649       GNUNET_asprintf (&foreign_addr,
650                        "%s-%s",
651                        COMMUNICATOR_ADDRESS_PREFIX,
652                        un->sun_path);
653     queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch,
654                                                       &queue->target,
655                                                       foreign_addr,
656                                                       UNIX_MTU,
657                                                       GNUNET_NT_LOOPBACK,
658                                                       cs,
659                                                       queue->mq);
660     GNUNET_free (foreign_addr);
661   }
662   return queue;
663 }
664
665
666 /**
667  * We have been notified that our socket has something to read. Do the
668  * read and reschedule this function to be called again once more is
669  * available.
670  *
671  * @param cls NULL
672  */
673 static void
674 select_read_cb (void *cls);
675
676
677 /**
678  * Function called when message was successfully passed to
679  * transport service.  Continue read activity.
680  *
681  * @param cls NULL
682  * @param success #GNUNET_OK on success
683  */
684 static void
685 receive_complete_cb (void *cls, int success)
686 {
687   (void) cls;
688   delivering_messages--;
689   if (GNUNET_OK != success)
690     GNUNET_STATISTICS_update (stats,
691                               "# transport transmission failures",
692                               1,
693                               GNUNET_NO);
694   GNUNET_assert (NULL != unix_sock);
695   if ((NULL == read_task) && (delivering_messages < max_queue_length))
696     read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
697                                                unix_sock,
698                                                &select_read_cb,
699                                                NULL);
700 }
701
702
703 /**
704  * We have been notified that our socket has something to read. Do the
705  * read and reschedule this function to be called again once more is
706  * available.
707  *
708  * @param cls NULL
709  */
710 static void
711 select_read_cb (void *cls)
712 {
713   char buf[65536] GNUNET_ALIGN;
714   struct Queue *queue;
715   const struct UNIXMessage *msg;
716   struct sockaddr_un un;
717   socklen_t addrlen;
718   ssize_t ret;
719   uint16_t msize;
720
721   GNUNET_assert (NULL != unix_sock);
722   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
723                                              unix_sock,
724                                              &select_read_cb,
725                                              NULL);
726   addrlen = sizeof (un);
727   memset (&un, 0, sizeof (un));
728   ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
729                                         buf,
730                                         sizeof (buf),
731                                         (struct sockaddr *) &un,
732                                         &addrlen);
733   if ((-1 == ret) && ((EAGAIN == errno) || (ENOBUFS == errno)))
734     return;
735   if (-1 == ret)
736   {
737     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "recvfrom");
738     return;
739   }
740   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
741               "Read %d bytes from socket %s\n",
742               (int) ret,
743               un.sun_path);
744   GNUNET_assert (AF_UNIX == (un.sun_family));
745   msg = (struct UNIXMessage *) buf;
746   msize = ntohs (msg->header.size);
747   if ((msize < sizeof (struct UNIXMessage)) || (msize > ret))
748   {
749     GNUNET_break_op (0);
750     return;
751   }
752   queue = lookup_queue (&msg->sender, &un, addrlen);
753   if (NULL == queue)
754     queue =
755       setup_queue (&msg->sender, GNUNET_TRANSPORT_CS_INBOUND, &un, addrlen);
756   else
757     reschedule_queue_timeout (queue);
758   if (NULL == queue)
759   {
760     GNUNET_log (
761       GNUNET_ERROR_TYPE_ERROR,
762       _ (
763         "Maximum number of UNIX connections exceeded, dropping incoming message\n"));
764     return;
765   }
766
767   {
768     uint16_t offset = 0;
769     uint16_t tsize = msize - sizeof (struct UNIXMessage);
770     const char *msgbuf = (const char *) &msg[1];
771
772     while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
773     {
774       const struct GNUNET_MessageHeader *currhdr;
775       struct GNUNET_MessageHeader al_hdr;
776       uint16_t csize;
777
778       currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
779       /* ensure aligned access */
780       memcpy (&al_hdr, currhdr, sizeof (al_hdr));
781       csize = ntohs (al_hdr.size);
782       if ((csize < sizeof (struct GNUNET_MessageHeader)) ||
783           (csize > tsize - offset))
784       {
785         GNUNET_break_op (0);
786         break;
787       }
788       ret = GNUNET_TRANSPORT_communicator_receive (ch,
789                                                    &msg->sender,
790                                                    currhdr,
791                                                    GNUNET_TIME_UNIT_FOREVER_REL,
792                                                    &receive_complete_cb,
793                                                    NULL);
794       if (GNUNET_SYSERR == ret)
795         return; /* transport not up */
796       if (GNUNET_NO == ret)
797         break;
798       delivering_messages++;
799       offset += csize;
800     }
801   }
802   if (delivering_messages >= max_queue_length)
803   {
804     /* we should try to apply 'back pressure' */
805     GNUNET_SCHEDULER_cancel (read_task);
806     read_task = NULL;
807   }
808 }
809
810
811 /**
812  * Function called by the transport service to initialize a
813  * message queue given address information about another peer.
814  * If and when the communication channel is established, the
815  * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
816  * to notify the service that the channel is now up.  It is
817  * the responsibility of the communicator to manage sane
818  * retries and timeouts for any @a peer/@a address combination
819  * provided by the transport service.  Timeouts and retries
820  * do not need to be signalled to the transport service.
821  *
822  * @param cls closure
823  * @param peer identity of the other peer
824  * @param address where to send the message, human-readable
825  *        communicator-specific format, 0-terminated, UTF-8
826  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
827  */
828 static int
829 mq_init (void *cls, const struct GNUNET_PeerIdentity *peer, const char *address)
830 {
831   struct Queue *queue;
832   const char *path;
833   struct sockaddr_un *un;
834   socklen_t un_len;
835
836   (void) cls;
837   if (0 != strncmp (address,
838                     COMMUNICATOR_ADDRESS_PREFIX "-",
839                     strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
840   {
841     GNUNET_break_op (0);
842     return GNUNET_SYSERR;
843   }
844   path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
845   un = unix_address_to_sockaddr (path, &un_len);
846   queue = lookup_queue (peer, un, un_len);
847   if (NULL != queue)
848   {
849     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
850                 "Address `%s' for %s ignored, queue exists\n",
851                 path,
852                 GNUNET_i2s (peer));
853     GNUNET_free (un);
854     return GNUNET_OK;
855   }
856   queue = setup_queue (peer, GNUNET_TRANSPORT_CS_OUTBOUND, un, un_len);
857   GNUNET_free (un);
858   if (NULL == queue)
859   {
860     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
861                 "Failed to setup queue to %s at `%s'\n",
862                 GNUNET_i2s (peer),
863                 path);
864     return GNUNET_NO;
865   }
866   return GNUNET_OK;
867 }
868
869
870 /**
871  * Iterator over all message queues to clean up.
872  *
873  * @param cls NULL
874  * @param target unused
875  * @param value the queue to destroy
876  * @return #GNUNET_OK to continue to iterate
877  */
878 static int
879 get_queue_delete_it (void *cls,
880                      const struct GNUNET_PeerIdentity *target,
881                      void *value)
882 {
883   struct Queue *queue = value;
884
885   (void) cls;
886   (void) target;
887   queue_destroy (queue);
888   return GNUNET_OK;
889 }
890
891
892 /**
893  * Shutdown the UNIX communicator.
894  *
895  * @param cls NULL (always)
896  */
897 static void
898 do_shutdown (void *cls)
899 {
900   if (NULL != read_task)
901   {
902     GNUNET_SCHEDULER_cancel (read_task);
903     read_task = NULL;
904   }
905   if (NULL != write_task)
906   {
907     GNUNET_SCHEDULER_cancel (write_task);
908     write_task = NULL;
909   }
910   if (NULL != unix_sock)
911   {
912     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (unix_sock));
913     unix_sock = NULL;
914   }
915   GNUNET_CONTAINER_multipeermap_iterate (queue_map, &get_queue_delete_it, NULL);
916   GNUNET_CONTAINER_multipeermap_destroy (queue_map);
917   if (NULL != ai)
918   {
919     GNUNET_TRANSPORT_communicator_address_remove (ai);
920     ai = NULL;
921   }
922   if (NULL != ch)
923   {
924     GNUNET_TRANSPORT_communicator_disconnect (ch);
925     ch = NULL;
926   }
927   if (NULL != stats)
928   {
929     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
930     stats = NULL;
931   }
932 }
933
934
935 /**
936  * Function called when the transport service has received an
937  * acknowledgement for this communicator (!) via a different return
938  * path.
939  *
940  * Not applicable for UNIX.
941  *
942  * @param cls closure
943  * @param sender which peer sent the notification
944  * @param msg payload
945  */
946 static void
947 enc_notify_cb (void *cls,
948                const struct GNUNET_PeerIdentity *sender,
949                const struct GNUNET_MessageHeader *msg)
950 {
951   (void) cls;
952   (void) sender;
953   (void) msg;
954   GNUNET_break_op (0);
955 }
956
957
958 /**
959  * Setup communicator and launch network interactions.
960  *
961  * @param cls NULL (always)
962  * @param args remaining command-line arguments
963  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
964  * @param cfg configuration
965  */
966 static void
967 run (void *cls,
968      char *const *args,
969      const char *cfgfile,
970      const struct GNUNET_CONFIGURATION_Handle *cfg)
971 {
972   char *unix_socket_path;
973   struct sockaddr_un *un;
974   socklen_t un_len;
975   char *my_addr;
976   (void) cls;
977
978   if (GNUNET_OK !=
979       GNUNET_CONFIGURATION_get_value_filename (cfg,
980                                                COMMUNICATOR_CONFIG_SECTION,
981                                                "UNIXPATH",
982                                                &unix_socket_path))
983   {
984     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
985                                COMMUNICATOR_CONFIG_SECTION,
986                                "UNIXPATH");
987     return;
988   }
989   if (GNUNET_OK !=
990       GNUNET_CONFIGURATION_get_value_number (cfg,
991                                              COMMUNICATOR_CONFIG_SECTION,
992                                              "MAX_QUEUE_LENGTH",
993                                              &max_queue_length))
994     max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
995
996   un = unix_address_to_sockaddr (unix_socket_path, &un_len);
997   if (NULL == un)
998   {
999     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1000                 "Failed to setup UNIX domain socket address with path `%s'\n",
1001                 unix_socket_path);
1002     GNUNET_free (unix_socket_path);
1003     return;
1004   }
1005   unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX, SOCK_DGRAM, 0);
1006   if (NULL == unix_sock)
1007   {
1008     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "socket");
1009     GNUNET_free (un);
1010     GNUNET_free (unix_socket_path);
1011     return;
1012   }
1013   if (('\0' != un->sun_path[0]) &&
1014       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (un->sun_path)))
1015   {
1016     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1017                 _ ("Cannot create path to `%s'\n"),
1018                 un->sun_path);
1019     GNUNET_NETWORK_socket_close (unix_sock);
1020     unix_sock = NULL;
1021     GNUNET_free (un);
1022     GNUNET_free (unix_socket_path);
1023     return;
1024   }
1025   if (GNUNET_OK != GNUNET_NETWORK_socket_bind (unix_sock,
1026                                                (const struct sockaddr *) un,
1027                                                un_len))
1028   {
1029     GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR, "bind", un->sun_path);
1030     GNUNET_NETWORK_socket_close (unix_sock);
1031     unix_sock = NULL;
1032     GNUNET_free (un);
1033     GNUNET_free (unix_socket_path);
1034     return;
1035   }
1036   GNUNET_free (un);
1037   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bound to `%s'\n", unix_socket_path);
1038   stats = GNUNET_STATISTICS_create ("C-UNIX", cfg);
1039   GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
1040   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1041                                              unix_sock,
1042                                              &select_read_cb,
1043                                              NULL);
1044   queue_map = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1045   ch = GNUNET_TRANSPORT_communicator_connect (cfg,
1046                                               COMMUNICATOR_CONFIG_SECTION,
1047                                               COMMUNICATOR_ADDRESS_PREFIX,
1048                                               GNUNET_TRANSPORT_CC_RELIABLE,
1049                                               &mq_init,
1050                                               NULL,
1051                                               &enc_notify_cb,
1052                                               NULL);
1053   if (NULL == ch)
1054   {
1055     GNUNET_break (0);
1056     GNUNET_SCHEDULER_shutdown ();
1057     GNUNET_free (unix_socket_path);
1058     return;
1059   }
1060   GNUNET_asprintf (&my_addr,
1061                    "%s-%s",
1062                    COMMUNICATOR_ADDRESS_PREFIX,
1063                    unix_socket_path);
1064   GNUNET_free (unix_socket_path);
1065   ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1066                                                   my_addr,
1067                                                   GNUNET_NT_LOOPBACK,
1068                                                   GNUNET_TIME_UNIT_FOREVER_REL);
1069   GNUNET_free (my_addr);
1070 }
1071
1072
1073 /**
1074  * The main function for the UNIX communicator.
1075  *
1076  * @param argc number of arguments from the command line
1077  * @param argv command line arguments
1078  * @return 0 ok, 1 on error
1079  */
1080 int
1081 main (int argc, char *const *argv)
1082 {
1083   static const struct GNUNET_GETOPT_CommandLineOption options[] = {
1084     GNUNET_GETOPT_OPTION_END};
1085   int ret;
1086
1087   if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv))
1088     return 2;
1089
1090   ret = (GNUNET_OK ==
1091          GNUNET_PROGRAM_run (argc,
1092                              argv,
1093                              "gnunet-communicator-unix",
1094                              _ ("GNUnet UNIX domain socket communicator"),
1095                              options,
1096                              &run,
1097                              NULL))
1098           ? 0
1099           : 1;
1100   GNUNET_free ((void *) argv);
1101   return ret;
1102 }
1103
1104
1105 #if defined(LINUX) && defined(__GLIBC__)
1106 #include <malloc.h>
1107
1108 /**
1109  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1110  */
1111 void __attribute__ ((constructor)) GNUNET_ARM_memory_init ()
1112 {
1113   mallopt (M_TRIM_THRESHOLD, 4 * 1024);
1114   mallopt (M_TOP_PAD, 1 * 1024);
1115   malloc_trim (0);
1116 }
1117 #endif
1118
1119 /* end of gnunet-communicator-unix.c */