fix warnings
[oweals/gnunet.git] / src / transport / gnunet-communicator-unix.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2010-2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file transport/gnunet-communicator-unix.c
23  * @brief Transport plugin using unix domain sockets (!)
24  *        Clearly, can only be used locally on Unix/Linux hosts...
25  *        ONLY INTENDED FOR TESTING!!!
26  * @author Christian Grothoff
27  * @author Nathan Evans
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_nt_lib.h"
34 #include "gnunet_statistics_service.h"
35 #include "gnunet_transport_communication_service.h"
36
37 /**
38  * How many messages do we keep at most in the queue to the
39  * transport service before we start to drop (default,
40  * can be changed via the configuration file).
41  * Should be _below_ the level of the communicator API, as
42  * otherwise we may read messages just to have them dropped
43  * by the communicator API.
44  */
45 #define DEFAULT_MAX_QUEUE_LENGTH 8000
46
47 /**
48  * Address prefix used by the communicator.
49  */
50 #define COMMUNICATOR_ADDRESS_PREFIX "unix"
51
52 /**
53  * Configuration section used by the communicator.
54  */
55 #define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
56
57 /**
58  * Our MTU.
59  */
60 #define UNIX_MTU UINT16_MAX
61
62 GNUNET_NETWORK_STRUCT_BEGIN
63
64 /**
65  * UNIX Message-Packet header.
66  */
67 struct UNIXMessage
68 {
69   /**
70    * Message header.
71    */
72   struct GNUNET_MessageHeader header;
73
74   /**
75    * What is the identity of the sender (GNUNET_hash of public key)
76    */
77   struct GNUNET_PeerIdentity sender;
78 };
79
80 GNUNET_NETWORK_STRUCT_END
81
82
83 /**
84  * Handle for a queue.
85  */
86 struct Queue
87 {
88   /**
89    * Queues with pending messages (!) are kept in a DLL.
90    */
91   struct Queue *next;
92
93   /**
94    * Queues with pending messages (!) are kept in a DLL.
95    */
96   struct Queue *prev;
97
98   /**
99    * To whom are we talking to.
100    */
101   struct GNUNET_PeerIdentity target;
102
103   /**
104    * Address of the other peer.
105    */
106   struct sockaddr_un *address;
107
108   /**
109    * Length of the address.
110    */
111   socklen_t address_len;
112
113   /**
114    * Message currently scheduled for transmission, non-NULL if and only
115    * if this queue is in the #queue_head DLL.
116    */
117   struct UNIXMessage *msg;
118
119   /**
120    * Message queue we are providing for the #ch.
121    */
122   struct GNUNET_MQ_Handle *mq;
123
124   /**
125    * handle for this queue with the #ch.
126    */
127   struct GNUNET_TRANSPORT_QueueHandle *qh;
128
129   /**
130    * Number of bytes we currently have in our write queue.
131    */
132   unsigned long long bytes_in_queue;
133
134   /**
135    * Timeout for this queue.
136    */
137   struct GNUNET_TIME_Absolute timeout;
138
139   /**
140    * Queue timeout task.
141    */
142   struct GNUNET_SCHEDULER_Task *timeout_task;
143 };
144
145 /**
146  * My Peer Identity
147  */
148 static struct GNUNET_PeerIdentity my_identity;
149
150 /**
151  * ID of read task
152  */
153 static struct GNUNET_SCHEDULER_Task *read_task;
154
155 /**
156  * ID of write task
157  */
158 static struct GNUNET_SCHEDULER_Task *write_task;
159
160 /**
161  * Number of messages we currently have in our queues towards the transport service.
162  */
163 static unsigned long long delivering_messages;
164
165 /**
166  * Maximum queue length before we stop reading towards the transport service.
167  */
168 static unsigned long long max_queue_length;
169
170 /**
171  * For logging statistics.
172  */
173 static struct GNUNET_STATISTICS_Handle *stats;
174
175 /**
176  * Our environment.
177  */
178 static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
179
180 /**
181  * Queues (map from peer identity to `struct Queue`)
182  */
183 static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
184
185 /**
186  * Head of queue of messages to transmit.
187  */
188 static struct Queue *queue_head;
189
190 /**
191  * Tail of queue of messages to transmit.
192  */
193 static struct Queue *queue_tail;
194
195 /**
196  * socket that we transmit all data with
197  */
198 static struct GNUNET_NETWORK_Handle *unix_sock;
199
200 /**
201  * Handle to the operation that publishes our address.
202  */
203 static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
204
205
206 /**
207  * Functions with this signature are called whenever we need
208  * to close a queue due to a disconnect or failure to
209  * establish a connection.
210  *
211  * @param queue queue to close down
212  */
213 static void
214 queue_destroy (struct Queue *queue)
215 {
216   struct GNUNET_MQ_Handle *mq;
217
218   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
219               "Disconnecting queue for peer `%s'\n",
220               GNUNET_i2s (&queue->target));
221   if (0 != queue->bytes_in_queue)
222   {
223     GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
224     queue->bytes_in_queue = 0;
225   }
226   if (NULL != (mq = queue->mq))
227   {
228     queue->mq = NULL;
229     GNUNET_MQ_destroy (mq);
230   }
231   GNUNET_assert (
232     GNUNET_YES ==
233     GNUNET_CONTAINER_multipeermap_remove (queue_map, &queue->target, queue));
234   GNUNET_STATISTICS_set (stats,
235                          "# queues active",
236                          GNUNET_CONTAINER_multipeermap_size (queue_map),
237                          GNUNET_NO);
238   if (NULL != queue->timeout_task)
239   {
240     GNUNET_SCHEDULER_cancel (queue->timeout_task);
241     queue->timeout_task = NULL;
242   }
243   GNUNET_free (queue->address);
244   GNUNET_free (queue);
245 }
246
247
248 /**
249  * Queue was idle for too long, so disconnect it
250  *
251  * @param cls the `struct Queue *` to disconnect
252  */
253 static void
254 queue_timeout (void *cls)
255 {
256   struct Queue *queue = cls;
257   struct GNUNET_TIME_Relative left;
258
259   queue->timeout_task = NULL;
260   left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
261   if (0 != left.rel_value_us)
262   {
263     /* not actually our turn yet, but let's at least update
264        the monitor, it may think we're about to die ... */
265     queue->timeout_task =
266       GNUNET_SCHEDULER_add_delayed (left, &queue_timeout, queue);
267     return;
268   }
269   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
270               "Queue %p was idle for %s, disconnecting\n",
271               queue,
272               GNUNET_STRINGS_relative_time_to_string (
273                 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
274                 GNUNET_YES));
275   queue_destroy (queue);
276 }
277
278
279 /**
280  * Increment queue timeout due to activity.  We do not immediately
281  * notify the monitor here as that might generate excessive
282  * signalling.
283  *
284  * @param queue queue for which the timeout should be rescheduled
285  */
286 static void
287 reschedule_queue_timeout (struct Queue *queue)
288 {
289   GNUNET_assert (NULL != queue->timeout_task);
290   queue->timeout =
291     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
292 }
293
294
295 /**
296  * Convert unix path to a `struct sockaddr_un *`
297  *
298  * @param unixpath path to convert
299  * @param[out] sock_len set to the length of the address
300  * @param is_abstract is this an abstract @a unixpath
301  * @return converted unix path
302  */
303 static struct sockaddr_un *
304 unix_address_to_sockaddr (const char *unixpath, socklen_t *sock_len)
305 {
306   struct sockaddr_un *un;
307   size_t slen;
308
309   GNUNET_assert (0 < strlen (unixpath));   /* sanity check */
310   un = GNUNET_new (struct sockaddr_un);
311   un->sun_family = AF_UNIX;
312   slen = strlen (unixpath);
313   if (slen >= sizeof(un->sun_path))
314     slen = sizeof(un->sun_path) - 1;
315   GNUNET_memcpy (un->sun_path, unixpath, slen);
316   un->sun_path[slen] = '\0';
317   slen = sizeof(struct sockaddr_un);
318 #if HAVE_SOCKADDR_UN_SUN_LEN
319   un->sun_len = (u_char) slen;
320 #endif
321   (*sock_len) = slen;
322   if ('@' == un->sun_path[0])
323     un->sun_path[0] = '\0';
324   return un;
325 }
326
327
328 /**
329  * Closure to #lookup_queue_it().
330  */
331 struct LookupCtx
332 {
333   /**
334    * Location to store the queue, if found.
335    */
336   struct Queue *res;
337
338   /**
339    * Address we are looking for.
340    */
341   const struct sockaddr_un *un;
342
343   /**
344    * Number of bytes in @a un
345    */
346   socklen_t un_len;
347 };
348
349
350 /**
351  * Function called to find a queue by address.
352  *
353  * @param cls the `struct LookupCtx *`
354  * @param key peer we are looking for (unused)
355  * @param value a queue
356  * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
357  */
358 static int
359 lookup_queue_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
360 {
361   struct LookupCtx *lctx = cls;
362   struct Queue *queue = value;
363
364   if ((queue->address_len == lctx->un_len) &&
365       (0 == memcmp (lctx->un, queue->address, queue->address_len)))
366   {
367     lctx->res = queue;
368     return GNUNET_NO;
369   }
370   return GNUNET_YES;
371 }
372
373
374 /**
375  * Find an existing queue by address.
376  *
377  * @param plugin the plugin
378  * @param address the address to find
379  * @return NULL if queue was not found
380  */
381 static struct Queue *
382 lookup_queue (const struct GNUNET_PeerIdentity *peer,
383               const struct sockaddr_un *un,
384               socklen_t un_len)
385 {
386   struct LookupCtx lctx;
387
388   lctx.un = un;
389   lctx.un_len = un_len;
390   lctx.res = NULL;
391   GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
392                                               peer,
393                                               &lookup_queue_it,
394                                               &lctx);
395   return lctx.res;
396 }
397
398
399 /**
400  * We have been notified that our socket is ready to write.
401  * Then reschedule this function to be called again once more is available.
402  *
403  * @param cls NULL
404  */
405 static void
406 select_write_cb (void *cls)
407 {
408   struct Queue *queue = queue_tail;
409   const struct GNUNET_MessageHeader *msg = &queue->msg->header;
410   size_t msg_size = ntohs (msg->size);
411   ssize_t sent;
412
413   /* take queue of the ready list */
414   write_task = NULL;
415 resend:
416   /* Send the data */
417   sent = GNUNET_NETWORK_socket_sendto (unix_sock,
418                                        msg,
419                                        msg_size,
420                                        (const struct sockaddr *) queue->address,
421                                        queue->address_len);
422   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
423               "UNIX transmitted message to %s (%d/%u: %s)\n",
424               GNUNET_i2s (&queue->target),
425               (int) sent,
426               (unsigned int) msg_size,
427               (sent < 0) ? strerror (errno) : "ok");
428   if (-1 != sent)
429   {
430     GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
431     if (NULL != queue_head)
432       write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
433                                                    unix_sock,
434                                                    &select_write_cb,
435                                                    NULL);
436
437     /* send 'msg' */
438     GNUNET_free (queue->msg);
439     queue->msg = NULL;
440     GNUNET_MQ_impl_send_continue (queue->mq);
441     GNUNET_STATISTICS_update (stats,
442                               "# bytes sent",
443                               (long long) sent,
444                               GNUNET_NO);
445     reschedule_queue_timeout (queue);
446     return;   /* all good */
447   }
448   GNUNET_STATISTICS_update (stats,
449                             "# network transmission failures",
450                             1,
451                             GNUNET_NO);
452   write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
453                                                unix_sock,
454                                                &select_write_cb,
455                                                NULL);
456   switch (errno)
457   {
458   case EAGAIN:
459   case ENOBUFS:
460     /* We should retry later... */
461     GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, "send");
462     return;
463
464   case EMSGSIZE: {
465       socklen_t size = 0;
466       socklen_t len = sizeof(size);
467
468       GNUNET_NETWORK_socket_getsockopt (unix_sock,
469                                         SOL_SOCKET,
470                                         SO_SNDBUF,
471                                         &size,
472                                         &len);
473       if (size > ntohs (msg->size))
474       {
475         /* Buffer is bigger than message:  error, no retry
476          * This should never happen!*/
477         GNUNET_break (0);
478         return;
479       }
480       GNUNET_log (
481         GNUNET_ERROR_TYPE_WARNING,
482         "Trying to increase socket buffer size from %u to %u for message size %u\n",
483         (unsigned int) size,
484         (unsigned int) ((msg_size / 1000) + 2) * 1000,
485         (unsigned int) msg_size);
486       size = ((msg_size / 1000) + 2) * 1000;
487       if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt (unix_sock,
488                                                          SOL_SOCKET,
489                                                          SO_SNDBUF,
490                                                          &size,
491                                                          sizeof(size)))
492         goto resend; /* Increased buffer size, retry sending */
493       /* Ok, then just try very modest increase */
494       size = msg_size;
495       if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt (unix_sock,
496                                                          SOL_SOCKET,
497                                                          SO_SNDBUF,
498                                                          &size,
499                                                          sizeof(size)))
500         goto resend; /* Increased buffer size, retry sending */
501       /* Could not increase buffer size: error, no retry */
502       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt");
503       return;
504     }
505
506   default:
507     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "send");
508     return;
509   }
510 }
511
512
513 /**
514  * Signature of functions implementing the sending functionality of a
515  * message queue.
516  *
517  * @param mq the message queue
518  * @param msg the message to send
519  * @param impl_state our `struct Queue`
520  */
521 static void
522 mq_send (struct GNUNET_MQ_Handle *mq,
523          const struct GNUNET_MessageHeader *msg,
524          void *impl_state)
525 {
526   struct Queue *queue = impl_state;
527   size_t msize = ntohs (msg->size);
528
529   GNUNET_assert (mq == queue->mq);
530   GNUNET_assert (NULL == queue->msg);
531   // Convert to UNIXMessage
532   queue->msg = GNUNET_malloc (msize + sizeof (struct UNIXMessage));
533   queue->msg->header.size = htons (msize + sizeof (struct UNIXMessage));
534   queue->msg->sender = my_identity;
535   memcpy (&queue->msg[1], msg, msize);
536   GNUNET_CONTAINER_DLL_insert (queue_head, queue_tail, queue);
537   GNUNET_assert (NULL != unix_sock);
538   if (NULL == write_task)
539     write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
540                                                  unix_sock,
541                                                  &select_write_cb,
542                                                  NULL);
543 }
544
545
546 /**
547  * Signature of functions implementing the destruction of a message
548  * queue.  Implementations must not free @a mq, but should take care
549  * of @a impl_state.
550  *
551  * @param mq the message queue to destroy
552  * @param impl_state our `struct Queue`
553  */
554 static void
555 mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
556 {
557   struct Queue *queue = impl_state;
558
559   if (mq == queue->mq)
560   {
561     queue->mq = NULL;
562     queue_destroy (queue);
563   }
564 }
565
566
567 /**
568  * Implementation function that cancels the currently sent message.
569  *
570  * @param mq message queue
571  * @param impl_state our `struct Queue`
572  */
573 static void
574 mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
575 {
576   struct Queue *queue = impl_state;
577
578   GNUNET_assert (NULL != queue->msg);
579   queue->msg = NULL;
580   GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
581   GNUNET_assert (NULL != write_task);
582   if (NULL == queue_head)
583   {
584     GNUNET_SCHEDULER_cancel (write_task);
585     write_task = NULL;
586   }
587 }
588
589
590 /**
591  * Generic error handler, called with the appropriate
592  * error code and the same closure specified at the creation of
593  * the message queue.
594  * Not every message queue implementation supports an error handler.
595  *
596  * @param cls our `struct Queue`
597  * @param error error code
598  */
599 static void
600 mq_error (void *cls, enum GNUNET_MQ_Error error)
601 {
602   struct Queue *queue = cls;
603
604   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
605               "UNIX MQ error in queue to %s: %d\n",
606               GNUNET_i2s (&queue->target),
607               (int) error);
608   queue_destroy (queue);
609 }
610
611
612 /**
613  * Creates a new outbound queue the transport service will use to send
614  * data to another peer.
615  *
616  * @param peer the target peer
617  * @param cs inbound or outbound queue
618  * @param un the address
619  * @param un_len number of bytes in @a un
620  * @return the queue or NULL of max connections exceeded
621  */
622 static struct Queue *
623 setup_queue (const struct GNUNET_PeerIdentity *target,
624              enum GNUNET_TRANSPORT_ConnectionStatus cs,
625              const struct sockaddr_un *un,
626              socklen_t un_len)
627 {
628   struct Queue *queue;
629
630   queue = GNUNET_new (struct Queue);
631   queue->target = *target;
632   queue->address = GNUNET_memdup (un, un_len);
633   queue->address_len = un_len;
634   (void) GNUNET_CONTAINER_multipeermap_put (
635     queue_map,
636     &queue->target,
637     queue,
638     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
639   GNUNET_STATISTICS_set (stats,
640                          "# queues active",
641                          GNUNET_CONTAINER_multipeermap_size (queue_map),
642                          GNUNET_NO);
643   queue->timeout =
644     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
645   queue->timeout_task =
646     GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
647                                   &queue_timeout,
648                                   queue);
649   queue->mq = GNUNET_MQ_queue_for_callbacks (&mq_send,
650                                              &mq_destroy,
651                                              &mq_cancel,
652                                              queue,
653                                              NULL,
654                                              &mq_error,
655                                              queue);
656   {
657     char *foreign_addr;
658
659     if ('\0' == un->sun_path[0])
660       GNUNET_asprintf (&foreign_addr,
661                        "%s-@%s",
662                        COMMUNICATOR_ADDRESS_PREFIX,
663                        &un->sun_path[1]);
664     else
665       GNUNET_asprintf (&foreign_addr,
666                        "%s-%s",
667                        COMMUNICATOR_ADDRESS_PREFIX,
668                        un->sun_path);
669     queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch,
670                                                       &queue->target,
671                                                       foreign_addr,
672                                                       UNIX_MTU,
673                                                       GNUNET_NT_LOOPBACK,
674                                                       cs,
675                                                       queue->mq);
676     GNUNET_free (foreign_addr);
677   }
678   return queue;
679 }
680
681
682 /**
683  * We have been notified that our socket has something to read. Do the
684  * read and reschedule this function to be called again once more is
685  * available.
686  *
687  * @param cls NULL
688  */
689 static void
690 select_read_cb (void *cls);
691
692
693 /**
694  * Function called when message was successfully passed to
695  * transport service.  Continue read activity.
696  *
697  * @param cls NULL
698  * @param success #GNUNET_OK on success
699  */
700 static void
701 receive_complete_cb (void *cls, int success)
702 {
703   (void) cls;
704   delivering_messages--;
705   if (GNUNET_OK != success)
706     GNUNET_STATISTICS_update (stats,
707                               "# transport transmission failures",
708                               1,
709                               GNUNET_NO);
710   if ((NULL == read_task) && (delivering_messages < max_queue_length) &&
711       (NULL != unix_sock))
712     read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
713                                                unix_sock,
714                                                &select_read_cb,
715                                                NULL);
716 }
717
718
719 /**
720  * We have been notified that our socket has something to read. Do the
721  * read and reschedule this function to be called again once more is
722  * available.
723  *
724  * @param cls NULL
725  */
726 static void
727 select_read_cb (void *cls)
728 {
729   char buf[65536] GNUNET_ALIGN;
730   struct Queue *queue;
731   const struct UNIXMessage *msg;
732   struct sockaddr_un un;
733   socklen_t addrlen;
734   ssize_t ret;
735   uint16_t msize;
736
737   GNUNET_assert (NULL != unix_sock);
738   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
739                                              unix_sock,
740                                              &select_read_cb,
741                                              NULL);
742   addrlen = sizeof(un);
743   memset (&un, 0, sizeof(un));
744   ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
745                                         buf,
746                                         sizeof(buf),
747                                         (struct sockaddr *) &un,
748                                         &addrlen);
749   if ((-1 == ret) && ((EAGAIN == errno) || (ENOBUFS == errno)))
750     return;
751   if (-1 == ret)
752   {
753     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "recvfrom");
754     return;
755   }
756   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
757               "Read %d bytes from socket %s\n",
758               (int) ret,
759               un.sun_path);
760   GNUNET_assert (AF_UNIX == (un.sun_family));
761   msg = (struct UNIXMessage *) buf;
762   msize = ntohs (msg->header.size);
763   if ((msize < sizeof(struct UNIXMessage)) || (msize > ret))
764   {
765     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
766                 "Wrong message size: %d bytes\n",
767                 msize);
768     GNUNET_break_op (0);
769     return;
770   }
771   queue = lookup_queue (&msg->sender, &un, addrlen);
772   if (NULL == queue)
773     queue =
774       setup_queue (&msg->sender, GNUNET_TRANSPORT_CS_INBOUND, &un, addrlen);
775   else
776     reschedule_queue_timeout (queue);
777   if (NULL == queue)
778   {
779     GNUNET_log (
780       GNUNET_ERROR_TYPE_ERROR,
781       _ (
782         "Maximum number of UNIX connections exceeded, dropping incoming message\n"));
783     return;
784   }
785
786   {
787     uint16_t tsize = msize - sizeof(struct UNIXMessage);
788
789     const struct GNUNET_MessageHeader *currhdr;
790     struct GNUNET_MessageHeader al_hdr;
791
792     currhdr = (const struct GNUNET_MessageHeader *) &msg[1];
793     /* ensure aligned access */
794     memcpy (&al_hdr, currhdr, sizeof(al_hdr));
795     if ((tsize < sizeof(struct GNUNET_MessageHeader)) ||
796         (tsize != ntohs(al_hdr.size)))
797     {
798       GNUNET_break_op (0);
799       return;
800     }
801     ret = GNUNET_TRANSPORT_communicator_receive (ch,
802                                                  &msg->sender,
803                                                  currhdr,
804                                                  GNUNET_TIME_UNIT_FOREVER_REL,
805                                                  &receive_complete_cb,
806                                                  NULL);
807     if (GNUNET_SYSERR == ret)
808     {
809       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
810                   "Transport not up!\n");
811       return;   /* transport not up */
812     }
813     if (GNUNET_NO == ret)
814     {
815       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
816                   "Error sending message to transport\n");
817       return;
818     }
819     delivering_messages++;
820   }
821   if (delivering_messages >= max_queue_length)
822   {
823     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
824                 "Back pressure %llu\n", delivering_messages);
825
826     /* we should try to apply 'back pressure' */
827     GNUNET_SCHEDULER_cancel (read_task);
828     read_task = NULL;
829   }
830 }
831
832
833 /**
834  * Function called by the transport service to initialize a
835  * message queue given address information about another peer.
836  * If and when the communication channel is established, the
837  * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
838  * to notify the service that the channel is now up.  It is
839  * the responsibility of the communicator to manage sane
840  * retries and timeouts for any @a peer/@a address combination
841  * provided by the transport service.  Timeouts and retries
842  * do not need to be signalled to the transport service.
843  *
844  * @param cls closure
845  * @param peer identity of the other peer
846  * @param address where to send the message, human-readable
847  *        communicator-specific format, 0-terminated, UTF-8
848  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
849  */
850 static int
851 mq_init (void *cls, const struct GNUNET_PeerIdentity *peer, const char *address)
852 {
853   struct Queue *queue;
854   const char *path;
855   struct sockaddr_un *un;
856   socklen_t un_len;
857
858   (void) cls;
859   if (0 != strncmp (address,
860                     COMMUNICATOR_ADDRESS_PREFIX "-",
861                     strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
862   {
863     GNUNET_break_op (0);
864     return GNUNET_SYSERR;
865   }
866   path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
867   un = unix_address_to_sockaddr (path, &un_len);
868   queue = lookup_queue (peer, un, un_len);
869   if (NULL != queue)
870   {
871     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
872                 "Address `%s' for %s ignored, queue exists\n",
873                 path,
874                 GNUNET_i2s (peer));
875     GNUNET_free (un);
876     return GNUNET_OK;
877   }
878   queue = setup_queue (peer, GNUNET_TRANSPORT_CS_OUTBOUND, un, un_len);
879   GNUNET_free (un);
880   if (NULL == queue)
881   {
882     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
883                 "Failed to setup queue to %s at `%s'\n",
884                 GNUNET_i2s (peer),
885                 path);
886     return GNUNET_NO;
887   }
888   return GNUNET_OK;
889 }
890
891
892 /**
893  * Iterator over all message queues to clean up.
894  *
895  * @param cls NULL
896  * @param target unused
897  * @param value the queue to destroy
898  * @return #GNUNET_OK to continue to iterate
899  */
900 static int
901 get_queue_delete_it (void *cls,
902                      const struct GNUNET_PeerIdentity *target,
903                      void *value)
904 {
905   struct Queue *queue = value;
906
907   (void) cls;
908   (void) target;
909   queue_destroy (queue);
910   return GNUNET_OK;
911 }
912
913
914 /**
915  * Shutdown the UNIX communicator.
916  *
917  * @param cls NULL (always)
918  */
919 static void
920 do_shutdown (void *cls)
921 {
922   if (NULL != read_task)
923   {
924     GNUNET_SCHEDULER_cancel (read_task);
925     read_task = NULL;
926   }
927   if (NULL != write_task)
928   {
929     GNUNET_SCHEDULER_cancel (write_task);
930     write_task = NULL;
931   }
932   if (NULL != unix_sock)
933   {
934     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (unix_sock));
935     unix_sock = NULL;
936   }
937   GNUNET_CONTAINER_multipeermap_iterate (queue_map, &get_queue_delete_it, NULL);
938   GNUNET_CONTAINER_multipeermap_destroy (queue_map);
939   if (NULL != ai)
940   {
941     GNUNET_TRANSPORT_communicator_address_remove (ai);
942     ai = NULL;
943   }
944   if (NULL != ch)
945   {
946     GNUNET_TRANSPORT_communicator_disconnect (ch);
947     ch = NULL;
948   }
949   if (NULL != stats)
950   {
951     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
952     stats = NULL;
953   }
954 }
955
956
957 /**
958  * Function called when the transport service has received an
959  * acknowledgement for this communicator (!) via a different return
960  * path.
961  *
962  * Not applicable for UNIX.
963  *
964  * @param cls closure
965  * @param sender which peer sent the notification
966  * @param msg payload
967  */
968 static void
969 enc_notify_cb (void *cls,
970                const struct GNUNET_PeerIdentity *sender,
971                const struct GNUNET_MessageHeader *msg)
972 {
973   (void) cls;
974   (void) sender;
975   (void) msg;
976   GNUNET_break_op (0);
977 }
978
979
980 /**
981  * Setup communicator and launch network interactions.
982  *
983  * @param cls NULL (always)
984  * @param args remaining command-line arguments
985  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
986  * @param cfg configuration
987  */
988 static void
989 run (void *cls,
990      char *const *args,
991      const char *cfgfile,
992      const struct GNUNET_CONFIGURATION_Handle *cfg)
993 {
994   char *unix_socket_path;
995   struct sockaddr_un *un;
996   socklen_t un_len;
997   char *my_addr;
998   struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key;
999
1000   (void) cls;
1001   delivering_messages = 0;
1002
1003   my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
1004   if (NULL == my_private_key)
1005   {
1006     GNUNET_log (
1007       GNUNET_ERROR_TYPE_ERROR,
1008       _ (
1009         "UNIX communicator is lacking key configuration settings. Exiting.\n"));
1010     GNUNET_SCHEDULER_shutdown ();
1011     return;
1012   }
1013   GNUNET_CRYPTO_eddsa_key_get_public (my_private_key, &my_identity.public_key);
1014
1015   if (GNUNET_OK !=
1016       GNUNET_CONFIGURATION_get_value_filename (cfg,
1017                                                COMMUNICATOR_CONFIG_SECTION,
1018                                                "UNIXPATH",
1019                                                &unix_socket_path))
1020   {
1021     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1022                                COMMUNICATOR_CONFIG_SECTION,
1023                                "UNIXPATH");
1024     return;
1025   }
1026   if (GNUNET_OK !=
1027       GNUNET_CONFIGURATION_get_value_number (cfg,
1028                                              COMMUNICATOR_CONFIG_SECTION,
1029                                              "MAX_QUEUE_LENGTH",
1030                                              &max_queue_length))
1031     max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
1032
1033   un = unix_address_to_sockaddr (unix_socket_path, &un_len);
1034   if (NULL == un)
1035   {
1036     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1037                 "Failed to setup UNIX domain socket address with path `%s'\n",
1038                 unix_socket_path);
1039     GNUNET_free (unix_socket_path);
1040     return;
1041   }
1042   unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX, SOCK_DGRAM, 0);
1043   if (NULL == unix_sock)
1044   {
1045     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "socket");
1046     GNUNET_free (un);
1047     GNUNET_free (unix_socket_path);
1048     return;
1049   }
1050   if (('\0' != un->sun_path[0]) &&
1051       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (un->sun_path)))
1052   {
1053     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1054                 _ ("Cannot create path to `%s'\n"),
1055                 un->sun_path);
1056     GNUNET_NETWORK_socket_close (unix_sock);
1057     unix_sock = NULL;
1058     GNUNET_free (un);
1059     GNUNET_free (unix_socket_path);
1060     return;
1061   }
1062   if (GNUNET_OK != GNUNET_NETWORK_socket_bind (unix_sock,
1063                                                (const struct sockaddr *) un,
1064                                                un_len))
1065   {
1066     GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR, "bind", un->sun_path);
1067     GNUNET_NETWORK_socket_close (unix_sock);
1068     unix_sock = NULL;
1069     GNUNET_free (un);
1070     GNUNET_free (unix_socket_path);
1071     return;
1072   }
1073   GNUNET_free (un);
1074   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bound to `%s'\n", unix_socket_path);
1075   stats = GNUNET_STATISTICS_create ("C-UNIX", cfg);
1076   GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
1077   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1078                                              unix_sock,
1079                                              &select_read_cb,
1080                                              NULL);
1081   queue_map = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1082   ch = GNUNET_TRANSPORT_communicator_connect (cfg,
1083                                               COMMUNICATOR_CONFIG_SECTION,
1084                                               COMMUNICATOR_ADDRESS_PREFIX,
1085                                               GNUNET_TRANSPORT_CC_RELIABLE,
1086                                               &mq_init,
1087                                               NULL,
1088                                               &enc_notify_cb,
1089                                               NULL);
1090   if (NULL == ch)
1091   {
1092     GNUNET_break (0);
1093     GNUNET_SCHEDULER_shutdown ();
1094     GNUNET_free (unix_socket_path);
1095     return;
1096   }
1097   GNUNET_asprintf (&my_addr,
1098                    "%s-%s",
1099                    COMMUNICATOR_ADDRESS_PREFIX,
1100                    unix_socket_path);
1101   GNUNET_free (unix_socket_path);
1102   ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1103                                                   my_addr,
1104                                                   GNUNET_NT_LOOPBACK,
1105                                                   GNUNET_TIME_UNIT_FOREVER_REL);
1106   GNUNET_free (my_addr);
1107 }
1108
1109
1110 /**
1111  * The main function for the UNIX communicator.
1112  *
1113  * @param argc number of arguments from the command line
1114  * @param argv command line arguments
1115  * @return 0 ok, 1 on error
1116  */
1117 int
1118 main (int argc, char *const *argv)
1119 {
1120   static const struct GNUNET_GETOPT_CommandLineOption options[] = {
1121     GNUNET_GETOPT_OPTION_END
1122   };
1123   int ret;
1124
1125   if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv))
1126     return 2;
1127
1128   ret = (GNUNET_OK ==
1129          GNUNET_PROGRAM_run (argc,
1130                              argv,
1131                              "gnunet-communicator-unix",
1132                              _ ("GNUnet UNIX domain socket communicator"),
1133                              options,
1134                              &run,
1135                              NULL))
1136         ? 0
1137         : 1;
1138   GNUNET_free ((void *) argv);
1139   return ret;
1140 }
1141
1142
1143 #if defined(__linux__) && defined(__GLIBC__)
1144 #include <malloc.h>
1145
1146 /**
1147  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1148  */
1149 void __attribute__ ((constructor))
1150 GNUNET_ARM_memory_init ()
1151 {
1152   mallopt (M_TRIM_THRESHOLD, 4 * 1024);
1153   mallopt (M_TOP_PAD, 1 * 1024);
1154   malloc_trim (0);
1155 }
1156
1157
1158 #endif
1159
1160 /* end of gnunet-communicator-unix.c */