add changelog
[oweals/gnunet.git] / src / transport / gnunet-communicator-unix.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2010-2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file transport/gnunet-communicator-unix.c
23  * @brief Transport plugin using unix domain sockets (!)
24  *        Clearly, can only be used locally on Unix/Linux hosts...
25  *        ONLY INTENDED FOR TESTING!!!
26  * @author Christian Grothoff
27  * @author Nathan Evans
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_nt_lib.h"
34 #include "gnunet_statistics_service.h"
35 #include "gnunet_transport_communication_service.h"
36
37 /**
38  * How many messages do we keep at most in the queue to the
39  * transport service before we start to drop (default,
40  * can be changed via the configuration file).
41  * Should be _below_ the level of the communicator API, as
42  * otherwise we may read messages just to have them dropped
43  * by the communicator API.
44  */
45 #define DEFAULT_MAX_QUEUE_LENGTH 8
46
47 /**
48  * Address prefix used by the communicator.
49  */
50 #define COMMUNICATOR_ADDRESS_PREFIX "unix"
51
52 /**
53  * Configuration section used by the communicator.
54  */
55 #define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
56
57 /**
58  * Our MTU.
59  */
60 #define UNIX_MTU UINT16_MAX
61
62 GNUNET_NETWORK_STRUCT_BEGIN
63
64 /**
65  * UNIX Message-Packet header.
66  */
67 struct UNIXMessage
68 {
69   /**
70    * Message header.
71    */
72   struct GNUNET_MessageHeader header;
73
74   /**
75    * What is the identity of the sender (GNUNET_hash of public key)
76    */
77   struct GNUNET_PeerIdentity sender;
78 };
79
80 GNUNET_NETWORK_STRUCT_END
81
82
83 /**
84  * Handle for a queue.
85  */
86 struct Queue
87 {
88   /**
89    * Queues with pending messages (!) are kept in a DLL.
90    */
91   struct Queue *next;
92
93   /**
94    * Queues with pending messages (!) are kept in a DLL.
95    */
96   struct Queue *prev;
97
98   /**
99    * To whom are we talking to.
100    */
101   struct GNUNET_PeerIdentity target;
102
103   /**
104    * Address of the other peer.
105    */
106   struct sockaddr_un *address;
107
108   /**
109    * Length of the address.
110    */
111   socklen_t address_len;
112
113   /**
114    * Message currently scheduled for transmission, non-NULL if and only
115    * if this queue is in the #queue_head DLL.
116    */
117   const struct GNUNET_MessageHeader *msg;
118
119   /**
120    * Message queue we are providing for the #ch.
121    */
122   struct GNUNET_MQ_Handle *mq;
123
124   /**
125    * handle for this queue with the #ch.
126    */
127   struct GNUNET_TRANSPORT_QueueHandle *qh;
128
129   /**
130    * Number of bytes we currently have in our write queue.
131    */
132   unsigned long long bytes_in_queue;
133
134   /**
135    * Timeout for this queue.
136    */
137   struct GNUNET_TIME_Absolute timeout;
138
139   /**
140    * Queue timeout task.
141    */
142   struct GNUNET_SCHEDULER_Task *timeout_task;
143 };
144
145
146 /**
147  * ID of read task
148  */
149 static struct GNUNET_SCHEDULER_Task *read_task;
150
151 /**
152  * ID of write task
153  */
154 static struct GNUNET_SCHEDULER_Task *write_task;
155
156 /**
157  * Number of messages we currently have in our queues towards the transport service.
158  */
159 static unsigned long long delivering_messages;
160
161 /**
162  * Maximum queue length before we stop reading towards the transport service.
163  */
164 static unsigned long long max_queue_length;
165
166 /**
167  * For logging statistics.
168  */
169 static struct GNUNET_STATISTICS_Handle *stats;
170
171 /**
172  * Our environment.
173  */
174 static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
175
176 /**
177  * Queues (map from peer identity to `struct Queue`)
178  */
179 static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
180
181 /**
182  * Head of queue of messages to transmit.
183  */
184 static struct Queue *queue_head;
185
186 /**
187  * Tail of queue of messages to transmit.
188  */
189 static struct Queue *queue_tail;
190
191 /**
192  * socket that we transmit all data with
193  */
194 static struct GNUNET_NETWORK_Handle *unix_sock;
195
196 /**
197  * Handle to the operation that publishes our address.
198  */
199 static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
200
201
202 /**
203  * Functions with this signature are called whenever we need
204  * to close a queue due to a disconnect or failure to
205  * establish a connection.
206  *
207  * @param queue queue to close down
208  */
209 static void
210 queue_destroy (struct Queue *queue)
211 {
212   struct GNUNET_MQ_Handle *mq;
213
214   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
215               "Disconnecting queue for peer `%s'\n",
216               GNUNET_i2s (&queue->target));
217   if (0 != queue->bytes_in_queue)
218   {
219     GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
220     queue->bytes_in_queue = 0;
221   }
222   if (NULL != (mq = queue->mq))
223   {
224     queue->mq = NULL;
225     GNUNET_MQ_destroy (mq);
226   }
227   GNUNET_assert (
228     GNUNET_YES ==
229     GNUNET_CONTAINER_multipeermap_remove (queue_map, &queue->target, queue));
230   GNUNET_STATISTICS_set (stats,
231                          "# queues active",
232                          GNUNET_CONTAINER_multipeermap_size (queue_map),
233                          GNUNET_NO);
234   if (NULL != queue->timeout_task)
235   {
236     GNUNET_SCHEDULER_cancel (queue->timeout_task);
237     queue->timeout_task = NULL;
238   }
239   GNUNET_free (queue->address);
240   GNUNET_free (queue);
241 }
242
243
244 /**
245  * Queue was idle for too long, so disconnect it
246  *
247  * @param cls the `struct Queue *` to disconnect
248  */
249 static void
250 queue_timeout (void *cls)
251 {
252   struct Queue *queue = cls;
253   struct GNUNET_TIME_Relative left;
254
255   queue->timeout_task = NULL;
256   left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
257   if (0 != left.rel_value_us)
258   {
259     /* not actually our turn yet, but let's at least update
260        the monitor, it may think we're about to die ... */
261     queue->timeout_task =
262       GNUNET_SCHEDULER_add_delayed (left, &queue_timeout, queue);
263     return;
264   }
265   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266               "Queue %p was idle for %s, disconnecting\n",
267               queue,
268               GNUNET_STRINGS_relative_time_to_string (
269                 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
270                 GNUNET_YES));
271   queue_destroy (queue);
272 }
273
274
275 /**
276  * Increment queue timeout due to activity.  We do not immediately
277  * notify the monitor here as that might generate excessive
278  * signalling.
279  *
280  * @param queue queue for which the timeout should be rescheduled
281  */
282 static void
283 reschedule_queue_timeout (struct Queue *queue)
284 {
285   GNUNET_assert (NULL != queue->timeout_task);
286   queue->timeout =
287     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
288 }
289
290
291 /**
292  * Convert unix path to a `struct sockaddr_un *`
293  *
294  * @param unixpath path to convert
295  * @param[out] sock_len set to the length of the address
296  * @param is_abstract is this an abstract @a unixpath
297  * @return converted unix path
298  */
299 static struct sockaddr_un *
300 unix_address_to_sockaddr (const char *unixpath, socklen_t *sock_len)
301 {
302   struct sockaddr_un *un;
303   size_t slen;
304
305   GNUNET_assert (0 < strlen (unixpath));   /* sanity check */
306   un = GNUNET_new (struct sockaddr_un);
307   un->sun_family = AF_UNIX;
308   slen = strlen (unixpath);
309   if (slen >= sizeof(un->sun_path))
310     slen = sizeof(un->sun_path) - 1;
311   GNUNET_memcpy (un->sun_path, unixpath, slen);
312   un->sun_path[slen] = '\0';
313   slen = sizeof(struct sockaddr_un);
314 #if HAVE_SOCKADDR_UN_SUN_LEN
315   un->sun_len = (u_char) slen;
316 #endif
317   (*sock_len) = slen;
318   if ('@' == un->sun_path[0])
319     un->sun_path[0] = '\0';
320   return un;
321 }
322
323
324 /**
325  * Closure to #lookup_queue_it().
326  */
327 struct LookupCtx
328 {
329   /**
330    * Location to store the queue, if found.
331    */
332   struct Queue *res;
333
334   /**
335    * Address we are looking for.
336    */
337   const struct sockaddr_un *un;
338
339   /**
340    * Number of bytes in @a un
341    */
342   socklen_t un_len;
343 };
344
345
346 /**
347  * Function called to find a queue by address.
348  *
349  * @param cls the `struct LookupCtx *`
350  * @param key peer we are looking for (unused)
351  * @param value a queue
352  * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
353  */
354 static int
355 lookup_queue_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
356 {
357   struct LookupCtx *lctx = cls;
358   struct Queue *queue = value;
359
360   if ((queue->address_len = lctx->un_len) &&
361       (0 == memcmp (lctx->un, queue->address, queue->address_len)))
362   {
363     lctx->res = queue;
364     return GNUNET_NO;
365   }
366   return GNUNET_YES;
367 }
368
369
370 /**
371  * Find an existing queue by address.
372  *
373  * @param plugin the plugin
374  * @param address the address to find
375  * @return NULL if queue was not found
376  */
377 static struct Queue *
378 lookup_queue (const struct GNUNET_PeerIdentity *peer,
379               const struct sockaddr_un *un,
380               socklen_t un_len)
381 {
382   struct LookupCtx lctx;
383
384   lctx.un = un;
385   lctx.un_len = un_len;
386   GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
387                                               peer,
388                                               &lookup_queue_it,
389                                               &lctx);
390   return lctx.res;
391 }
392
393
394 /**
395  * We have been notified that our socket is ready to write.
396  * Then reschedule this function to be called again once more is available.
397  *
398  * @param cls NULL
399  */
400 static void
401 select_write_cb (void *cls)
402 {
403   struct Queue *queue = queue_tail;
404   const struct GNUNET_MessageHeader *msg = queue->msg;
405   size_t msg_size = ntohs (msg->size);
406   ssize_t sent;
407
408   /* take queue of the ready list */
409   write_task = NULL;
410   GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
411   if (NULL != queue_head)
412     write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
413                                                  unix_sock,
414                                                  &select_write_cb,
415                                                  NULL);
416
417   /* send 'msg' */
418   queue->msg = NULL;
419   GNUNET_MQ_impl_send_continue (queue->mq);
420 resend:
421   /* Send the data */
422   sent = GNUNET_NETWORK_socket_sendto (unix_sock,
423                                        msg,
424                                        msg_size,
425                                        (const struct sockaddr *) queue->address,
426                                        queue->address_len);
427   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
428               "UNIX transmitted message to %s (%d/%u: %s)\n",
429               GNUNET_i2s (&queue->target),
430               (int) sent,
431               (unsigned int) msg_size,
432               (sent < 0) ? strerror (errno) : "ok");
433   if (-1 != sent)
434   {
435     GNUNET_STATISTICS_update (stats,
436                               "# bytes sent",
437                               (long long) sent,
438                               GNUNET_NO);
439     reschedule_queue_timeout (queue);
440     return;   /* all good */
441   }
442   GNUNET_STATISTICS_update (stats,
443                             "# network transmission failures",
444                             1,
445                             GNUNET_NO);
446   switch (errno)
447   {
448   case EAGAIN:
449   case ENOBUFS:
450     /* We should retry later... */
451     GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, "send");
452     return;
453
454   case EMSGSIZE: {
455       socklen_t size = 0;
456       socklen_t len = sizeof(size);
457
458       GNUNET_NETWORK_socket_getsockopt (unix_sock,
459                                         SOL_SOCKET,
460                                         SO_SNDBUF,
461                                         &size,
462                                         &len);
463       if (size > ntohs (msg->size))
464       {
465         /* Buffer is bigger than message:  error, no retry
466          * This should never happen!*/
467         GNUNET_break (0);
468         return;
469       }
470       GNUNET_log (
471         GNUNET_ERROR_TYPE_DEBUG,
472         "Trying to increase socket buffer size from %u to %u for message size %u\n",
473         (unsigned int) size,
474         (unsigned int) ((msg_size / 1000) + 2) * 1000,
475         (unsigned int) msg_size);
476       size = ((msg_size / 1000) + 2) * 1000;
477       if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt (unix_sock,
478                                                          SOL_SOCKET,
479                                                          SO_SNDBUF,
480                                                          &size,
481                                                          sizeof(size)))
482         goto resend; /* Increased buffer size, retry sending */
483       /* Ok, then just try very modest increase */
484       size = msg_size;
485       if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt (unix_sock,
486                                                          SOL_SOCKET,
487                                                          SO_SNDBUF,
488                                                          &size,
489                                                          sizeof(size)))
490         goto resend; /* Increased buffer size, retry sending */
491       /* Could not increase buffer size: error, no retry */
492       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt");
493       return;
494     }
495
496   default:
497     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "send");
498     return;
499   }
500 }
501
502
503 /**
504  * Signature of functions implementing the sending functionality of a
505  * message queue.
506  *
507  * @param mq the message queue
508  * @param msg the message to send
509  * @param impl_state our `struct Queue`
510  */
511 static void
512 mq_send (struct GNUNET_MQ_Handle *mq,
513          const struct GNUNET_MessageHeader *msg,
514          void *impl_state)
515 {
516   struct Queue *queue = impl_state;
517
518   GNUNET_assert (mq == queue->mq);
519   GNUNET_assert (NULL == queue->msg);
520   queue->msg = msg;
521   GNUNET_CONTAINER_DLL_insert (queue_head, queue_tail, queue);
522   GNUNET_assert (NULL != unix_sock);
523   if (NULL == write_task)
524     write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
525                                                  unix_sock,
526                                                  &select_write_cb,
527                                                  NULL);
528 }
529
530
531 /**
532  * Signature of functions implementing the destruction of a message
533  * queue.  Implementations must not free @a mq, but should take care
534  * of @a impl_state.
535  *
536  * @param mq the message queue to destroy
537  * @param impl_state our `struct Queue`
538  */
539 static void
540 mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
541 {
542   struct Queue *queue = impl_state;
543
544   if (mq == queue->mq)
545   {
546     queue->mq = NULL;
547     queue_destroy (queue);
548   }
549 }
550
551
552 /**
553  * Implementation function that cancels the currently sent message.
554  *
555  * @param mq message queue
556  * @param impl_state our `struct Queue`
557  */
558 static void
559 mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
560 {
561   struct Queue *queue = impl_state;
562
563   GNUNET_assert (NULL != queue->msg);
564   queue->msg = NULL;
565   GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
566   GNUNET_assert (NULL != write_task);
567   if (NULL == queue_head)
568   {
569     GNUNET_SCHEDULER_cancel (write_task);
570     write_task = NULL;
571   }
572 }
573
574
575 /**
576  * Generic error handler, called with the appropriate
577  * error code and the same closure specified at the creation of
578  * the message queue.
579  * Not every message queue implementation supports an error handler.
580  *
581  * @param cls our `struct Queue`
582  * @param error error code
583  */
584 static void
585 mq_error (void *cls, enum GNUNET_MQ_Error error)
586 {
587   struct Queue *queue = cls;
588
589   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
590               "UNIX MQ error in queue to %s: %d\n",
591               GNUNET_i2s (&queue->target),
592               (int) error);
593   queue_destroy (queue);
594 }
595
596
597 /**
598  * Creates a new outbound queue the transport service will use to send
599  * data to another peer.
600  *
601  * @param peer the target peer
602  * @param cs inbound or outbound queue
603  * @param un the address
604  * @param un_len number of bytes in @a un
605  * @return the queue or NULL of max connections exceeded
606  */
607 static struct Queue *
608 setup_queue (const struct GNUNET_PeerIdentity *target,
609              enum GNUNET_TRANSPORT_ConnectionStatus cs,
610              const struct sockaddr_un *un,
611              socklen_t un_len)
612 {
613   struct Queue *queue;
614
615   queue = GNUNET_new (struct Queue);
616   queue->target = *target;
617   queue->address = GNUNET_memdup (un, un_len);
618   queue->address_len = un_len;
619   (void) GNUNET_CONTAINER_multipeermap_put (
620     queue_map,
621     &queue->target,
622     queue,
623     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
624   GNUNET_STATISTICS_set (stats,
625                          "# queues active",
626                          GNUNET_CONTAINER_multipeermap_size (queue_map),
627                          GNUNET_NO);
628   queue->timeout =
629     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
630   queue->timeout_task =
631     GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
632                                   &queue_timeout,
633                                   queue);
634   queue->mq = GNUNET_MQ_queue_for_callbacks (&mq_send,
635                                              &mq_destroy,
636                                              &mq_cancel,
637                                              queue,
638                                              NULL,
639                                              &mq_error,
640                                              queue);
641   {
642     char *foreign_addr;
643
644     if ('\0' == un->sun_path[0])
645       GNUNET_asprintf (&foreign_addr,
646                        "%s-@%s",
647                        COMMUNICATOR_ADDRESS_PREFIX,
648                        &un->sun_path[1]);
649     else
650       GNUNET_asprintf (&foreign_addr,
651                        "%s-%s",
652                        COMMUNICATOR_ADDRESS_PREFIX,
653                        un->sun_path);
654     queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch,
655                                                       &queue->target,
656                                                       foreign_addr,
657                                                       UNIX_MTU,
658                                                       GNUNET_NT_LOOPBACK,
659                                                       cs,
660                                                       queue->mq);
661     GNUNET_free (foreign_addr);
662   }
663   return queue;
664 }
665
666
667 /**
668  * We have been notified that our socket has something to read. Do the
669  * read and reschedule this function to be called again once more is
670  * available.
671  *
672  * @param cls NULL
673  */
674 static void
675 select_read_cb (void *cls);
676
677
678 /**
679  * Function called when message was successfully passed to
680  * transport service.  Continue read activity.
681  *
682  * @param cls NULL
683  * @param success #GNUNET_OK on success
684  */
685 static void
686 receive_complete_cb (void *cls, int success)
687 {
688   (void) cls;
689   delivering_messages--;
690   if (GNUNET_OK != success)
691     GNUNET_STATISTICS_update (stats,
692                               "# transport transmission failures",
693                               1,
694                               GNUNET_NO);
695   GNUNET_assert (NULL != unix_sock);
696   if ((NULL == read_task) && (delivering_messages < max_queue_length))
697     read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
698                                                unix_sock,
699                                                &select_read_cb,
700                                                NULL);
701 }
702
703
704 /**
705  * We have been notified that our socket has something to read. Do the
706  * read and reschedule this function to be called again once more is
707  * available.
708  *
709  * @param cls NULL
710  */
711 static void
712 select_read_cb (void *cls)
713 {
714   char buf[65536] GNUNET_ALIGN;
715   struct Queue *queue;
716   const struct UNIXMessage *msg;
717   struct sockaddr_un un;
718   socklen_t addrlen;
719   ssize_t ret;
720   uint16_t msize;
721
722   GNUNET_assert (NULL != unix_sock);
723   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
724                                              unix_sock,
725                                              &select_read_cb,
726                                              NULL);
727   addrlen = sizeof(un);
728   memset (&un, 0, sizeof(un));
729   ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
730                                         buf,
731                                         sizeof(buf),
732                                         (struct sockaddr *) &un,
733                                         &addrlen);
734   if ((-1 == ret) && ((EAGAIN == errno) || (ENOBUFS == errno)))
735     return;
736   if (-1 == ret)
737   {
738     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "recvfrom");
739     return;
740   }
741   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
742               "Read %d bytes from socket %s\n",
743               (int) ret,
744               un.sun_path);
745   GNUNET_assert (AF_UNIX == (un.sun_family));
746   msg = (struct UNIXMessage *) buf;
747   msize = ntohs (msg->header.size);
748   if ((msize < sizeof(struct UNIXMessage)) || (msize > ret))
749   {
750     GNUNET_break_op (0);
751     return;
752   }
753   queue = lookup_queue (&msg->sender, &un, addrlen);
754   if (NULL == queue)
755     queue =
756       setup_queue (&msg->sender, GNUNET_TRANSPORT_CS_INBOUND, &un, addrlen);
757   else
758     reschedule_queue_timeout (queue);
759   if (NULL == queue)
760   {
761     GNUNET_log (
762       GNUNET_ERROR_TYPE_ERROR,
763       _ (
764         "Maximum number of UNIX connections exceeded, dropping incoming message\n"));
765     return;
766   }
767
768   {
769     uint16_t offset = 0;
770     uint16_t tsize = msize - sizeof(struct UNIXMessage);
771     const char *msgbuf = (const char *) &msg[1];
772
773     while (offset + sizeof(struct GNUNET_MessageHeader) <= tsize)
774     {
775       const struct GNUNET_MessageHeader *currhdr;
776       struct GNUNET_MessageHeader al_hdr;
777       uint16_t csize;
778
779       currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
780       /* ensure aligned access */
781       memcpy (&al_hdr, currhdr, sizeof(al_hdr));
782       csize = ntohs (al_hdr.size);
783       if ((csize < sizeof(struct GNUNET_MessageHeader)) ||
784           (csize > tsize - offset))
785       {
786         GNUNET_break_op (0);
787         break;
788       }
789       ret = GNUNET_TRANSPORT_communicator_receive (ch,
790                                                    &msg->sender,
791                                                    currhdr,
792                                                    GNUNET_TIME_UNIT_FOREVER_REL,
793                                                    &receive_complete_cb,
794                                                    NULL);
795       if (GNUNET_SYSERR == ret)
796         return;   /* transport not up */
797       if (GNUNET_NO == ret)
798         break;
799       delivering_messages++;
800       offset += csize;
801     }
802   }
803   if (delivering_messages >= max_queue_length)
804   {
805     /* we should try to apply 'back pressure' */
806     GNUNET_SCHEDULER_cancel (read_task);
807     read_task = NULL;
808   }
809 }
810
811
812 /**
813  * Function called by the transport service to initialize a
814  * message queue given address information about another peer.
815  * If and when the communication channel is established, the
816  * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
817  * to notify the service that the channel is now up.  It is
818  * the responsibility of the communicator to manage sane
819  * retries and timeouts for any @a peer/@a address combination
820  * provided by the transport service.  Timeouts and retries
821  * do not need to be signalled to the transport service.
822  *
823  * @param cls closure
824  * @param peer identity of the other peer
825  * @param address where to send the message, human-readable
826  *        communicator-specific format, 0-terminated, UTF-8
827  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
828  */
829 static int
830 mq_init (void *cls, const struct GNUNET_PeerIdentity *peer, const char *address)
831 {
832   struct Queue *queue;
833   const char *path;
834   struct sockaddr_un *un;
835   socklen_t un_len;
836
837   (void) cls;
838   if (0 != strncmp (address,
839                     COMMUNICATOR_ADDRESS_PREFIX "-",
840                     strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
841   {
842     GNUNET_break_op (0);
843     return GNUNET_SYSERR;
844   }
845   path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
846   un = unix_address_to_sockaddr (path, &un_len);
847   queue = lookup_queue (peer, un, un_len);
848   if (NULL != queue)
849   {
850     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
851                 "Address `%s' for %s ignored, queue exists\n",
852                 path,
853                 GNUNET_i2s (peer));
854     GNUNET_free (un);
855     return GNUNET_OK;
856   }
857   queue = setup_queue (peer, GNUNET_TRANSPORT_CS_OUTBOUND, un, un_len);
858   GNUNET_free (un);
859   if (NULL == queue)
860   {
861     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
862                 "Failed to setup queue to %s at `%s'\n",
863                 GNUNET_i2s (peer),
864                 path);
865     return GNUNET_NO;
866   }
867   return GNUNET_OK;
868 }
869
870
871 /**
872  * Iterator over all message queues to clean up.
873  *
874  * @param cls NULL
875  * @param target unused
876  * @param value the queue to destroy
877  * @return #GNUNET_OK to continue to iterate
878  */
879 static int
880 get_queue_delete_it (void *cls,
881                      const struct GNUNET_PeerIdentity *target,
882                      void *value)
883 {
884   struct Queue *queue = value;
885
886   (void) cls;
887   (void) target;
888   queue_destroy (queue);
889   return GNUNET_OK;
890 }
891
892
893 /**
894  * Shutdown the UNIX communicator.
895  *
896  * @param cls NULL (always)
897  */
898 static void
899 do_shutdown (void *cls)
900 {
901   if (NULL != read_task)
902   {
903     GNUNET_SCHEDULER_cancel (read_task);
904     read_task = NULL;
905   }
906   if (NULL != write_task)
907   {
908     GNUNET_SCHEDULER_cancel (write_task);
909     write_task = NULL;
910   }
911   if (NULL != unix_sock)
912   {
913     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (unix_sock));
914     unix_sock = NULL;
915   }
916   GNUNET_CONTAINER_multipeermap_iterate (queue_map, &get_queue_delete_it, NULL);
917   GNUNET_CONTAINER_multipeermap_destroy (queue_map);
918   if (NULL != ai)
919   {
920     GNUNET_TRANSPORT_communicator_address_remove (ai);
921     ai = NULL;
922   }
923   if (NULL != ch)
924   {
925     GNUNET_TRANSPORT_communicator_disconnect (ch);
926     ch = NULL;
927   }
928   if (NULL != stats)
929   {
930     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
931     stats = NULL;
932   }
933 }
934
935
936 /**
937  * Function called when the transport service has received an
938  * acknowledgement for this communicator (!) via a different return
939  * path.
940  *
941  * Not applicable for UNIX.
942  *
943  * @param cls closure
944  * @param sender which peer sent the notification
945  * @param msg payload
946  */
947 static void
948 enc_notify_cb (void *cls,
949                const struct GNUNET_PeerIdentity *sender,
950                const struct GNUNET_MessageHeader *msg)
951 {
952   (void) cls;
953   (void) sender;
954   (void) msg;
955   GNUNET_break_op (0);
956 }
957
958
959 /**
960  * Setup communicator and launch network interactions.
961  *
962  * @param cls NULL (always)
963  * @param args remaining command-line arguments
964  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
965  * @param cfg configuration
966  */
967 static void
968 run (void *cls,
969      char *const *args,
970      const char *cfgfile,
971      const struct GNUNET_CONFIGURATION_Handle *cfg)
972 {
973   char *unix_socket_path;
974   struct sockaddr_un *un;
975   socklen_t un_len;
976   char *my_addr;
977
978   (void) cls;
979
980   if (GNUNET_OK !=
981       GNUNET_CONFIGURATION_get_value_filename (cfg,
982                                                COMMUNICATOR_CONFIG_SECTION,
983                                                "UNIXPATH",
984                                                &unix_socket_path))
985   {
986     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
987                                COMMUNICATOR_CONFIG_SECTION,
988                                "UNIXPATH");
989     return;
990   }
991   if (GNUNET_OK !=
992       GNUNET_CONFIGURATION_get_value_number (cfg,
993                                              COMMUNICATOR_CONFIG_SECTION,
994                                              "MAX_QUEUE_LENGTH",
995                                              &max_queue_length))
996     max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
997
998   un = unix_address_to_sockaddr (unix_socket_path, &un_len);
999   if (NULL == un)
1000   {
1001     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1002                 "Failed to setup UNIX domain socket address with path `%s'\n",
1003                 unix_socket_path);
1004     GNUNET_free (unix_socket_path);
1005     return;
1006   }
1007   unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX, SOCK_DGRAM, 0);
1008   if (NULL == unix_sock)
1009   {
1010     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "socket");
1011     GNUNET_free (un);
1012     GNUNET_free (unix_socket_path);
1013     return;
1014   }
1015   if (('\0' != un->sun_path[0]) &&
1016       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (un->sun_path)))
1017   {
1018     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1019                 _ ("Cannot create path to `%s'\n"),
1020                 un->sun_path);
1021     GNUNET_NETWORK_socket_close (unix_sock);
1022     unix_sock = NULL;
1023     GNUNET_free (un);
1024     GNUNET_free (unix_socket_path);
1025     return;
1026   }
1027   if (GNUNET_OK != GNUNET_NETWORK_socket_bind (unix_sock,
1028                                                (const struct sockaddr *) un,
1029                                                un_len))
1030   {
1031     GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR, "bind", un->sun_path);
1032     GNUNET_NETWORK_socket_close (unix_sock);
1033     unix_sock = NULL;
1034     GNUNET_free (un);
1035     GNUNET_free (unix_socket_path);
1036     return;
1037   }
1038   GNUNET_free (un);
1039   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bound to `%s'\n", unix_socket_path);
1040   stats = GNUNET_STATISTICS_create ("C-UNIX", cfg);
1041   GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
1042   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1043                                              unix_sock,
1044                                              &select_read_cb,
1045                                              NULL);
1046   queue_map = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1047   ch = GNUNET_TRANSPORT_communicator_connect (cfg,
1048                                               COMMUNICATOR_CONFIG_SECTION,
1049                                               COMMUNICATOR_ADDRESS_PREFIX,
1050                                               GNUNET_TRANSPORT_CC_RELIABLE,
1051                                               &mq_init,
1052                                               NULL,
1053                                               &enc_notify_cb,
1054                                               NULL);
1055   if (NULL == ch)
1056   {
1057     GNUNET_break (0);
1058     GNUNET_SCHEDULER_shutdown ();
1059     GNUNET_free (unix_socket_path);
1060     return;
1061   }
1062   GNUNET_asprintf (&my_addr,
1063                    "%s-%s",
1064                    COMMUNICATOR_ADDRESS_PREFIX,
1065                    unix_socket_path);
1066   GNUNET_free (unix_socket_path);
1067   ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1068                                                   my_addr,
1069                                                   GNUNET_NT_LOOPBACK,
1070                                                   GNUNET_TIME_UNIT_FOREVER_REL);
1071   GNUNET_free (my_addr);
1072 }
1073
1074
1075 /**
1076  * The main function for the UNIX communicator.
1077  *
1078  * @param argc number of arguments from the command line
1079  * @param argv command line arguments
1080  * @return 0 ok, 1 on error
1081  */
1082 int
1083 main (int argc, char *const *argv)
1084 {
1085   static const struct GNUNET_GETOPT_CommandLineOption options[] = {
1086     GNUNET_GETOPT_OPTION_END
1087   };
1088   int ret;
1089
1090   if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv))
1091     return 2;
1092
1093   ret = (GNUNET_OK ==
1094          GNUNET_PROGRAM_run (argc,
1095                              argv,
1096                              "gnunet-communicator-unix",
1097                              _ ("GNUnet UNIX domain socket communicator"),
1098                              options,
1099                              &run,
1100                              NULL))
1101         ? 0
1102         : 1;
1103   GNUNET_free ((void *) argv);
1104   return ret;
1105 }
1106
1107
1108 #if defined(__linux__) && defined(__GLIBC__)
1109 #include <malloc.h>
1110
1111 /**
1112  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1113  */
1114 void __attribute__ ((constructor))
1115 GNUNET_ARM_memory_init ()
1116 {
1117   mallopt (M_TRIM_THRESHOLD, 4 * 1024);
1118   mallopt (M_TOP_PAD, 1 * 1024);
1119   malloc_trim (0);
1120 }
1121
1122
1123 #endif
1124
1125 /* end of gnunet-communicator-unix.c */