tolerate additional IPv4 address now available for gnunet.org
[oweals/gnunet.git] / src / transport / gnunet-communicator-unix.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2010-2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file transport/gnunet-communicator-unix.c
23  * @brief Transport plugin using unix domain sockets (!)
24  *        Clearly, can only be used locally on Unix/Linux hosts...
25  *        ONLY INTENDED FOR TESTING!!!
26  * @author Christian Grothoff
27  * @author Nathan Evans
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_nt_lib.h"
34 #include "gnunet_statistics_service.h"
35 #include "gnunet_transport_communication_service.h"
36
37 /**
38  * How many messages do we keep at most in the queue to the
39  * transport service before we start to drop (default,
40  * can be changed via the configuration file).
41  * Should be _below_ the level of the communicator API, as
42  * otherwise we may read messages just to have them dropped
43  * by the communicator API.
44  */
45 #define DEFAULT_MAX_QUEUE_LENGTH 8
46
47 /**
48  * Address prefix used by the communicator.
49  */
50 #define COMMUNICATOR_ADDRESS_PREFIX "unix"
51
52 /**
53  * Configuration section used by the communicator.
54  */
55 #define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
56
57 /**
58  * Our MTU.
59  */
60 #define UNIX_MTU UINT16_MAX
61
62 GNUNET_NETWORK_STRUCT_BEGIN
63
64 /**
65  * UNIX Message-Packet header.
66  */
67 struct UNIXMessage
68 {
69   /**
70    * Message header.
71    */
72   struct GNUNET_MessageHeader header;
73
74   /**
75    * What is the identity of the sender (GNUNET_hash of public key)
76    */
77   struct GNUNET_PeerIdentity sender;
78
79 };
80
81 GNUNET_NETWORK_STRUCT_END
82
83
84 /**
85  * Handle for a queue.
86  */
87 struct Queue
88 {
89
90   /**
91    * Queues with pending messages (!) are kept in a DLL.
92    */
93   struct Queue *next;
94
95   /**
96    * Queues with pending messages (!) are kept in a DLL.
97    */
98   struct Queue *prev;
99
100   /**
101    * To whom are we talking to.
102    */
103   struct GNUNET_PeerIdentity target;
104
105   /**
106    * Address of the other peer.
107    */
108   struct sockaddr_un *address;
109
110   /**
111    * Length of the address.
112    */
113   socklen_t address_len;
114
115   /**
116    * Message currently scheduled for transmission, non-NULL if and only
117    * if this queue is in the #queue_head DLL.
118    */
119   const struct GNUNET_MessageHeader *msg;
120
121   /**
122    * Message queue we are providing for the #ch.
123    */
124   struct GNUNET_MQ_Handle *mq;
125
126   /**
127    * handle for this queue with the #ch.
128    */
129   struct GNUNET_TRANSPORT_QueueHandle *qh;
130
131   /**
132    * Number of bytes we currently have in our write queue.
133    */
134   unsigned long long bytes_in_queue;
135
136   /**
137    * Timeout for this queue.
138    */
139   struct GNUNET_TIME_Absolute timeout;
140
141   /**
142    * Queue timeout task.
143    */
144   struct GNUNET_SCHEDULER_Task *timeout_task;
145
146 };
147
148
149 /**
150  * ID of read task
151  */
152 static struct GNUNET_SCHEDULER_Task *read_task;
153
154 /**
155  * ID of write task
156  */
157 static struct GNUNET_SCHEDULER_Task *write_task;
158
159 /**
160  * Number of messages we currently have in our queues towards the transport service.
161  */
162 static unsigned long long delivering_messages;
163
164 /**
165  * Maximum queue length before we stop reading towards the transport service.
166  */
167 static unsigned long long max_queue_length;
168
169 /**
170  * For logging statistics.
171  */
172 static struct GNUNET_STATISTICS_Handle *stats;
173
174 /**
175  * Our environment.
176  */
177 static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
178
179 /**
180  * Queues (map from peer identity to `struct Queue`)
181  */
182 static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
183
184 /**
185  * Head of queue of messages to transmit.
186  */
187 static struct Queue *queue_head;
188
189 /**
190  * Tail of queue of messages to transmit.
191  */
192 static struct Queue *queue_tail;
193
194 /**
195  * socket that we transmit all data with
196  */
197 static struct GNUNET_NETWORK_Handle *unix_sock;
198
199 /**
200  * Handle to the operation that publishes our address.
201  */
202 static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
203
204
205 /**
206  * Functions with this signature are called whenever we need
207  * to close a queue due to a disconnect or failure to
208  * establish a connection.
209  *
210  * @param queue queue to close down
211  */
212 static void
213 queue_destroy (struct Queue *queue)
214 {
215   struct GNUNET_MQ_Handle *mq;
216
217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
218               "Disconnecting queue for peer `%s'\n",
219               GNUNET_i2s (&queue->target));
220   if (0 != queue->bytes_in_queue)
221   {
222     GNUNET_CONTAINER_DLL_remove (queue_head,
223                                  queue_tail,
224                                  queue);
225     queue->bytes_in_queue = 0;
226   }
227   if (NULL != (mq = queue->mq))
228   {
229     queue->mq = NULL;
230     GNUNET_MQ_destroy (mq);
231   }
232   GNUNET_assert (GNUNET_YES ==
233                  GNUNET_CONTAINER_multipeermap_remove (queue_map,
234                                                        &queue->target,
235                                                        queue));
236   GNUNET_STATISTICS_set (stats,
237                          "# queues active",
238                          GNUNET_CONTAINER_multipeermap_size (queue_map),
239                          GNUNET_NO);
240   if (NULL != queue->timeout_task)
241   {
242     GNUNET_SCHEDULER_cancel (queue->timeout_task);
243     queue->timeout_task = NULL;
244   }
245   GNUNET_free (queue->address);
246   GNUNET_free (queue);
247 }
248
249
250 /**
251  * Queue was idle for too long, so disconnect it
252  *
253  * @param cls the `struct Queue *` to disconnect
254  */
255 static void
256 queue_timeout (void *cls)
257 {
258   struct Queue *queue = cls;
259   struct GNUNET_TIME_Relative left;
260
261   queue->timeout_task = NULL;
262   left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
263   if (0 != left.rel_value_us)
264   {
265     /* not actually our turn yet, but let's at least update
266        the monitor, it may think we're about to die ... */
267     queue->timeout_task
268       = GNUNET_SCHEDULER_add_delayed (left,
269                                       &queue_timeout,
270                                       queue);
271     return;
272   }
273   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
274               "Queue %p was idle for %s, disconnecting\n",
275               queue,
276               GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
277                                                       GNUNET_YES));
278   queue_destroy (queue);
279 }
280
281
282 /**
283  * Increment queue timeout due to activity.  We do not immediately
284  * notify the monitor here as that might generate excessive
285  * signalling.
286  *
287  * @param queue queue for which the timeout should be rescheduled
288  */
289 static void
290 reschedule_queue_timeout (struct Queue *queue)
291 {
292   GNUNET_assert (NULL != queue->timeout_task);
293   queue->timeout
294     = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
295 }
296
297
298 /**
299  * Convert unix path to a `struct sockaddr_un *`
300  *
301  * @param unixpath path to convert
302  * @param[out] sock_len set to the length of the address
303  * @param is_abstract is this an abstract @a unixpath
304  * @return converted unix path
305  */
306 static struct sockaddr_un *
307 unix_address_to_sockaddr (const char *unixpath,
308                           socklen_t *sock_len)
309 {
310   struct sockaddr_un *un;
311   size_t slen;
312
313   GNUNET_assert (0 < strlen (unixpath));        /* sanity check */
314   un = GNUNET_new (struct sockaddr_un);
315   un->sun_family = AF_UNIX;
316   slen = strlen (unixpath);
317   if (slen >= sizeof (un->sun_path))
318     slen = sizeof (un->sun_path) - 1;
319   GNUNET_memcpy (un->sun_path,
320                  unixpath,
321                  slen);
322   un->sun_path[slen] = '\0';
323   slen = sizeof (struct sockaddr_un);
324 #if HAVE_SOCKADDR_UN_SUN_LEN
325   un->sun_len = (u_char) slen;
326 #endif
327   (*sock_len) = slen;
328   if ('@' == un->sun_path[0])
329     un->sun_path[0] = '\0';
330   return un;
331 }
332
333
334 /**
335  * Closure to #lookup_queue_it().
336  */
337 struct LookupCtx
338 {
339   /**
340    * Location to store the queue, if found.
341    */
342   struct Queue *res;
343
344   /**
345    * Address we are looking for.
346    */
347   const struct sockaddr_un *un;
348
349   /**
350    * Number of bytes in @a un
351    */
352   socklen_t un_len;
353 };
354
355
356 /**
357  * Function called to find a queue by address.
358  *
359  * @param cls the `struct LookupCtx *`
360  * @param key peer we are looking for (unused)
361  * @param value a queue
362  * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
363  */
364 static int
365 lookup_queue_it (void *cls,
366                  const struct GNUNET_PeerIdentity *key,
367                  void *value)
368 {
369   struct LookupCtx *lctx = cls;
370   struct Queue *queue = value;
371
372   if ( (queue->address_len = lctx->un_len) &&
373        (0 == memcmp (lctx->un,
374                      queue->address,
375                      queue->address_len)) )
376   {
377     lctx->res = queue;
378     return GNUNET_NO;
379   }
380   return GNUNET_YES;
381 }
382
383
384 /**
385  * Find an existing queue by address.
386  *
387  * @param plugin the plugin
388  * @param address the address to find
389  * @return NULL if queue was not found
390  */
391 static struct Queue *
392 lookup_queue (const struct GNUNET_PeerIdentity *peer,
393               const struct sockaddr_un *un,
394               socklen_t un_len)
395 {
396   struct LookupCtx lctx;
397
398   lctx.un = un;
399   lctx.un_len = un_len;
400   GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
401                                               peer,
402                                               &lookup_queue_it,
403                                               &lctx);
404   return lctx.res;
405 }
406
407
408 /**
409  * We have been notified that our socket is ready to write.
410  * Then reschedule this function to be called again once more is available.
411  *
412  * @param cls NULL
413  */
414 static void
415 select_write_cb (void *cls)
416 {
417   struct Queue *queue = queue_tail;
418   const struct GNUNET_MessageHeader *msg = queue->msg;
419   size_t msg_size = ntohs (msg->size);
420   ssize_t sent;
421
422   /* take queue of the ready list */
423   write_task = NULL;
424   GNUNET_CONTAINER_DLL_remove (queue_head,
425                                queue_tail,
426                                queue);
427   if (NULL != queue_head)
428     write_task =
429       GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
430                                       unix_sock,
431                                       &select_write_cb,
432                                       NULL);
433
434   /* send 'msg' */
435   queue->msg = NULL;
436   GNUNET_MQ_impl_send_continue (queue->mq);
437  resend:
438   /* Send the data */
439   sent = GNUNET_NETWORK_socket_sendto (unix_sock,
440                                        queue->msg,
441                                        msg_size,
442                                        (const struct sockaddr *) queue->address,
443                                        queue->address_len);
444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
445               "UNIX transmitted message to %s (%d/%u: %s)\n",
446               GNUNET_i2s (&queue->target),
447               (int) sent,
448               (unsigned int) msg_size,
449               (sent < 0) ? STRERROR (errno) : "ok");
450   if (-1 != sent)
451   {
452     GNUNET_STATISTICS_update (stats,
453                               "# bytes sent",
454                               (long long) sent,
455                               GNUNET_NO);
456     reschedule_queue_timeout (queue);
457     return; /* all good */
458   }
459   GNUNET_STATISTICS_update (stats,
460                             "# network transmission failures",
461                             1,
462                             GNUNET_NO);
463   switch (errno)
464   {
465   case EAGAIN:
466   case ENOBUFS:
467     /* We should retry later... */
468     GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG,
469                          "send");
470     return;
471   case EMSGSIZE:
472     {
473       socklen_t size = 0;
474       socklen_t len = sizeof (size);
475
476       GNUNET_NETWORK_socket_getsockopt (unix_sock,
477                                         SOL_SOCKET,
478                                         SO_SNDBUF,
479                                         &size,
480                                         &len);
481       if (size > ntohs (msg->size))
482       {
483         /* Buffer is bigger than message:  error, no retry
484          * This should never happen!*/
485         GNUNET_break (0);
486         return;
487       }
488       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489                   "Trying to increase socket buffer size from %u to %u for message size %u\n",
490                   (unsigned int) size,
491                   (unsigned int) ((msg_size / 1000) + 2) * 1000,
492                   (unsigned int) msg_size);
493       size = ((msg_size / 1000) + 2) * 1000;
494       if (GNUNET_OK ==
495           GNUNET_NETWORK_socket_setsockopt (unix_sock,
496                                             SOL_SOCKET,
497                                             SO_SNDBUF,
498                                             &size,
499                                             sizeof (size)))
500         goto resend; /* Increased buffer size, retry sending */
501       /* Ok, then just try very modest increase */
502       size = msg_size;
503       if (GNUNET_OK ==
504           GNUNET_NETWORK_socket_setsockopt (unix_sock,
505                                             SOL_SOCKET,
506                                             SO_SNDBUF,
507                                             &size,
508                                             sizeof (size)))
509           goto resend; /* Increased buffer size, retry sending */
510       /* Could not increase buffer size: error, no retry */
511       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
512                            "setsockopt");
513       return;
514     }
515   default:
516     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
517                          "send");
518     return;
519   }
520 }
521
522
523 /**
524  * Signature of functions implementing the sending functionality of a
525  * message queue.
526  *
527  * @param mq the message queue
528  * @param msg the message to send
529  * @param impl_state our `struct Queue`
530  */
531 static void
532 mq_send (struct GNUNET_MQ_Handle *mq,
533          const struct GNUNET_MessageHeader *msg,
534          void *impl_state)
535 {
536   struct Queue *queue = impl_state;
537
538   GNUNET_assert (mq == queue->mq);
539   GNUNET_assert (NULL == queue->msg);
540   queue->msg = msg;
541   GNUNET_CONTAINER_DLL_insert (queue_head,
542                                queue_tail,
543                                queue);
544   GNUNET_assert (NULL != unix_sock);
545   if (NULL == write_task)
546     write_task =
547       GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
548                                       unix_sock,
549                                       &select_write_cb,
550                                       NULL);
551 }
552
553
554 /**
555  * Signature of functions implementing the destruction of a message
556  * queue.  Implementations must not free @a mq, but should take care
557  * of @a impl_state.
558  *
559  * @param mq the message queue to destroy
560  * @param impl_state our `struct Queue`
561  */
562 static void
563 mq_destroy (struct GNUNET_MQ_Handle *mq,
564             void *impl_state)
565 {
566   struct Queue *queue = impl_state;
567
568   if (mq == queue->mq)
569   {
570     queue->mq = NULL;
571     queue_destroy (queue);
572   }
573 }
574
575
576 /**
577  * Implementation function that cancels the currently sent message.
578  *
579  * @param mq message queue
580  * @param impl_state our `struct Queue`
581  */
582 static void
583 mq_cancel (struct GNUNET_MQ_Handle *mq,
584            void *impl_state)
585 {
586   struct Queue *queue = impl_state;
587
588   GNUNET_assert (NULL != queue->msg);
589   queue->msg = NULL;
590   GNUNET_CONTAINER_DLL_remove (queue_head,
591                                queue_tail,
592                                queue);
593   GNUNET_assert (NULL != write_task);
594   if (NULL == queue_head)
595   {
596     GNUNET_SCHEDULER_cancel (write_task);
597     write_task = NULL;
598   }
599 }
600
601
602 /**
603  * Generic error handler, called with the appropriate
604  * error code and the same closure specified at the creation of
605  * the message queue.
606  * Not every message queue implementation supports an error handler.
607  *
608  * @param cls our `struct Queue`
609  * @param error error code
610  */
611 static void
612 mq_error (void *cls,
613           enum GNUNET_MQ_Error error)
614 {
615   struct Queue *queue = cls;
616
617   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
618               "UNIX MQ error in queue to %s: %d\n",
619               GNUNET_i2s (&queue->target),
620               (int) error);
621   queue_destroy (queue);
622 }
623
624
625 /**
626  * Creates a new outbound queue the transport service will use to send
627  * data to another peer.
628  *
629  * @param peer the target peer
630  * @param cs inbound or outbound queue
631  * @param un the address
632  * @param un_len number of bytes in @a un
633  * @return the queue or NULL of max connections exceeded
634  */
635 static struct Queue *
636 setup_queue (const struct GNUNET_PeerIdentity *target,
637              enum GNUNET_TRANSPORT_ConnectionStatus cs,
638              const struct sockaddr_un *un,
639              socklen_t un_len)
640 {
641   struct Queue *queue;
642
643   queue = GNUNET_new (struct Queue);
644   queue->target = *target;
645   queue->address = GNUNET_memdup (un,
646                                   un_len);
647   queue->address_len = un_len;
648   (void) GNUNET_CONTAINER_multipeermap_put (queue_map,
649                                             &queue->target,
650                                             queue,
651                                             GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
652   GNUNET_STATISTICS_set (stats,
653                          "# queues active",
654                          GNUNET_CONTAINER_multipeermap_size (queue_map),
655                          GNUNET_NO);
656   queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
657   queue->timeout_task
658     = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
659                                     &queue_timeout,
660                                     queue);
661   queue->mq
662     = GNUNET_MQ_queue_for_callbacks (&mq_send,
663                                      &mq_destroy,
664                                      &mq_cancel,
665                                      queue,
666                                      NULL,
667                                      &mq_error,
668                                      queue);
669   {
670     char *foreign_addr;
671
672     if ('\0' == un->sun_path[0])
673       GNUNET_asprintf (&foreign_addr,
674                        "%s-@%s",
675                        COMMUNICATOR_ADDRESS_PREFIX,
676                        &un->sun_path[1]);
677     else
678       GNUNET_asprintf (&foreign_addr,
679                        "%s-%s",
680                        COMMUNICATOR_ADDRESS_PREFIX,
681                        un->sun_path);
682     queue->qh
683       = GNUNET_TRANSPORT_communicator_mq_add (ch,
684                                               &queue->target,
685                                               foreign_addr,
686                                               UNIX_MTU,
687                                               GNUNET_NT_LOOPBACK,
688                                               cs,
689                                               queue->mq);
690     GNUNET_free (foreign_addr);
691   }
692   return queue;
693 }
694
695
696 /**
697  * We have been notified that our socket has something to read. Do the
698  * read and reschedule this function to be called again once more is
699  * available.
700  *
701  * @param cls NULL
702  */
703 static void
704 select_read_cb (void *cls);
705
706
707 /**
708  * Function called when message was successfully passed to
709  * transport service.  Continue read activity.
710  *
711  * @param cls NULL
712  * @param success #GNUNET_OK on success
713  */
714 static void
715 receive_complete_cb (void *cls,
716                      int success)
717 {
718   delivering_messages--;
719   if (GNUNET_OK != success)
720     GNUNET_STATISTICS_update (stats,
721                               "# transport transmission failures",
722                               1,
723                               GNUNET_NO);
724   GNUNET_assert (NULL != unix_sock);
725   if ( (NULL == read_task) &&
726        (delivering_messages < max_queue_length) )
727     read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
728                                                unix_sock,
729                                                &select_read_cb,
730                                                NULL);
731 }
732
733
734 /**
735  * We have been notified that our socket has something to read. Do the
736  * read and reschedule this function to be called again once more is
737  * available.
738  *
739  * @param cls NULL
740  */
741 static void
742 select_read_cb (void *cls)
743 {
744   char buf[65536] GNUNET_ALIGN;
745   struct Queue *queue;
746   const struct UNIXMessage *msg;
747   struct sockaddr_un un;
748   socklen_t addrlen;
749   ssize_t ret;
750   uint16_t msize;
751
752   GNUNET_assert (NULL != unix_sock);
753   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
754                                              unix_sock,
755                                              &select_read_cb,
756                                              NULL);
757   addrlen = sizeof (un);
758   memset (&un,
759           0,
760           sizeof (un));
761   ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
762                                         buf,
763                                         sizeof (buf),
764                                         (struct sockaddr *) &un,
765                                         &addrlen);
766   if ( (-1 == ret) &&
767        ( (EAGAIN == errno) ||
768          (ENOBUFS == errno) ) )
769     return;
770   if (-1 == ret)
771   {
772     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
773                          "recvfrom");
774     return;
775   }
776   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
777               "Read %d bytes from socket %s\n",
778               (int) ret,
779               un.sun_path);
780   GNUNET_assert (AF_UNIX == (un.sun_family));
781   msg = (struct UNIXMessage *) buf;
782   msize = ntohs (msg->header.size);
783   if ( (msize < sizeof (struct UNIXMessage)) ||
784        (msize > ret) )
785   {
786     GNUNET_break_op (0);
787     return;
788   }
789   queue = lookup_queue (&msg->sender,
790                         &un,
791                         addrlen);
792   if (NULL == queue)
793     queue = setup_queue (&msg->sender,
794                          GNUNET_TRANSPORT_CS_INBOUND,
795                          &un,
796                          addrlen);
797   else
798     reschedule_queue_timeout (queue);
799   if (NULL == queue)
800   {
801     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
802                 _("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
803     return;
804   }
805
806   {
807     uint16_t offset = 0;
808     uint16_t tsize = msize - sizeof (struct UNIXMessage);
809     const char *msgbuf = (const char *) &msg[1];
810
811     while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
812     {
813       const struct GNUNET_MessageHeader *currhdr;
814       struct GNUNET_MessageHeader al_hdr;
815       uint16_t csize;
816
817       currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
818       /* ensure aligned access */
819       memcpy (&al_hdr,
820               currhdr,
821               sizeof (al_hdr));
822       csize = ntohs (al_hdr.size);
823       if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
824            (csize > tsize - offset))
825       {
826         GNUNET_break_op (0);
827         break;
828       }
829       ret = GNUNET_TRANSPORT_communicator_receive (ch,
830                                                    &msg->sender,
831                                                    currhdr,
832                                                    GNUNET_TIME_UNIT_FOREVER_REL,
833                                                    &receive_complete_cb,
834                                                    NULL);
835       if (GNUNET_SYSERR == ret)
836         return; /* transport not up */
837       if (GNUNET_NO == ret)
838         break;
839       delivering_messages++;
840       offset += csize;
841     }
842   }
843   if (delivering_messages >= max_queue_length)
844   {
845     /* we should try to apply 'back pressure' */
846     GNUNET_SCHEDULER_cancel (read_task);
847     read_task = NULL;
848   }
849 }
850
851
852 /**
853  * Function called by the transport service to initialize a
854  * message queue given address information about another peer.
855  * If and when the communication channel is established, the
856  * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
857  * to notify the service that the channel is now up.  It is
858  * the responsibility of the communicator to manage sane
859  * retries and timeouts for any @a peer/@a address combination
860  * provided by the transport service.  Timeouts and retries
861  * do not need to be signalled to the transport service.
862  *
863  * @param cls closure
864  * @param peer identity of the other peer
865  * @param address where to send the message, human-readable
866  *        communicator-specific format, 0-terminated, UTF-8
867  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
868  */
869 static int
870 mq_init (void *cls,
871          const struct GNUNET_PeerIdentity *peer,
872          const char *address)
873 {
874   struct Queue *queue;
875   const char *path;
876   struct sockaddr_un *un;
877   socklen_t un_len;
878
879   if (0 != strncmp (address,
880                     COMMUNICATOR_ADDRESS_PREFIX "-",
881                     strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
882   {
883     GNUNET_break_op (0);
884     return GNUNET_SYSERR;
885   }
886   path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
887   un = unix_address_to_sockaddr (path,
888                                  &un_len);
889   queue = lookup_queue (peer,
890                         un,
891                         un_len);
892   if (NULL != queue)
893   {
894     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
895                 "Address `%s' for %s ignored, queue exists\n",
896                 path,
897                 GNUNET_i2s (peer));
898     GNUNET_free (un);
899     return GNUNET_OK;
900   }
901   queue = setup_queue (peer,
902                        GNUNET_TRANSPORT_CS_OUTBOUND,
903                        un,
904                        un_len);
905   GNUNET_free (un);
906   if (NULL == queue)
907   {
908     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
909                 "Failed to setup queue to %s at `%s'\n",
910                 GNUNET_i2s (peer),
911                 path);
912     return GNUNET_NO;
913   }
914   return GNUNET_OK;
915 }
916
917
918 /**
919  * Iterator over all message queues to clean up.
920  *
921  * @param cls NULL
922  * @param target unused
923  * @param value the queue to destroy
924  * @return #GNUNET_OK to continue to iterate
925  */
926 static int
927 get_queue_delete_it (void *cls,
928                      const struct GNUNET_PeerIdentity *target,
929                      void *value)
930 {
931   struct Queue *queue = value;
932
933   (void) cls;
934   (void) target;
935   queue_destroy (queue);
936   return GNUNET_OK;
937 }
938
939
940 /**
941  * Shutdown the UNIX communicator.
942  *
943  * @param cls NULL (always)
944  */
945 static void
946 do_shutdown (void *cls)
947 {
948   if (NULL != read_task)
949   {
950     GNUNET_SCHEDULER_cancel (read_task);
951     read_task = NULL;
952   }
953   if (NULL != write_task)
954   {
955     GNUNET_SCHEDULER_cancel (write_task);
956     write_task = NULL;
957   }
958   if (NULL != unix_sock)
959   {
960     GNUNET_break (GNUNET_OK ==
961                   GNUNET_NETWORK_socket_close (unix_sock));
962     unix_sock = NULL;
963   }
964   GNUNET_CONTAINER_multipeermap_iterate (queue_map,
965                                          &get_queue_delete_it,
966                                          NULL);
967   GNUNET_CONTAINER_multipeermap_destroy (queue_map);
968   if (NULL != ai)
969   {
970     GNUNET_TRANSPORT_communicator_address_remove (ai);
971     ai = NULL;
972   }
973   if (NULL != ch)
974   {
975     GNUNET_TRANSPORT_communicator_disconnect (ch);
976     ch = NULL;
977   }
978   if (NULL != stats)
979   {
980     GNUNET_STATISTICS_destroy (stats,
981                                GNUNET_NO);
982     stats = NULL;
983   }
984 }
985
986
987 /**
988  * Function called when the transport service has received an
989  * acknowledgement for this communicator (!) via a different return
990  * path.
991  *
992  * Not applicable for UNIX.
993  *
994  * @param cls closure
995  * @param sender which peer sent the notification
996  * @param msg payload
997  */
998 static void
999 enc_notify_cb (void *cls,
1000                const struct GNUNET_PeerIdentity *sender,
1001                const struct GNUNET_MessageHeader *msg)
1002 {
1003   (void) cls;
1004   (void) sender;
1005   (void) msg;
1006   GNUNET_break_op (0);
1007 }
1008
1009
1010 /**
1011  * Setup communicator and launch network interactions.
1012  *
1013  * @param cls NULL (always)
1014  * @param args remaining command-line arguments
1015  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
1016  * @param cfg configuration
1017  */
1018 static void
1019 run (void *cls,
1020      char *const *args,
1021      const char *cfgfile,
1022      const struct GNUNET_CONFIGURATION_Handle *cfg)
1023 {
1024   char *unix_socket_path;
1025   struct sockaddr_un *un;
1026   socklen_t un_len;
1027   char *my_addr;
1028   (void) cls;
1029
1030   if (GNUNET_OK !=
1031       GNUNET_CONFIGURATION_get_value_filename (cfg,
1032                                                COMMUNICATOR_CONFIG_SECTION,
1033                                                "UNIXPATH",
1034                                                &unix_socket_path))
1035   {
1036     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1037                                COMMUNICATOR_CONFIG_SECTION,
1038                                "UNIXPATH");
1039     return;
1040   }
1041   if (GNUNET_OK !=
1042       GNUNET_CONFIGURATION_get_value_number (cfg,
1043                                              COMMUNICATOR_CONFIG_SECTION,
1044                                              "MAX_QUEUE_LENGTH",
1045                                              &max_queue_length))
1046     max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
1047
1048   un = unix_address_to_sockaddr (unix_socket_path,
1049                                  &un_len);
1050   if (NULL == un)
1051   {
1052     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1053                 "Failed to setup UNIX domain socket address with path `%s'\n",
1054                 unix_socket_path);
1055     GNUNET_free (unix_socket_path);
1056     return;
1057   }
1058   unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX,
1059                                             SOCK_DGRAM,
1060                                             0);
1061   if (NULL == unix_sock)
1062   {
1063     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
1064                          "socket");
1065     GNUNET_free (un);
1066     GNUNET_free (unix_socket_path);
1067     return;
1068   }
1069   if ( ('\0' != un->sun_path[0]) &&
1070        (GNUNET_OK !=
1071         GNUNET_DISK_directory_create_for_file (un->sun_path)) )
1072   {
1073     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1074                 _("Cannot create path to `%s'\n"),
1075                 un->sun_path);
1076     GNUNET_NETWORK_socket_close (unix_sock);
1077     unix_sock = NULL;
1078     GNUNET_free (un);
1079     GNUNET_free (unix_socket_path);
1080     return;
1081   }
1082   if (GNUNET_OK !=
1083       GNUNET_NETWORK_socket_bind (unix_sock,
1084                                   (const struct sockaddr *) un,
1085                                   un_len))
1086   {
1087     GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
1088                               "bind",
1089                               un->sun_path);
1090     GNUNET_NETWORK_socket_close (unix_sock);
1091     unix_sock = NULL;
1092     GNUNET_free (un);
1093     GNUNET_free (unix_socket_path);
1094     return;
1095   }
1096   GNUNET_free (un);
1097   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098               "Bound to `%s'\n",
1099               unix_socket_path);
1100   stats = GNUNET_STATISTICS_create ("C-UNIX",
1101                                     cfg);
1102   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
1103                                  NULL);
1104   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1105                                              unix_sock,
1106                                              &select_read_cb,
1107                                              NULL);
1108   queue_map = GNUNET_CONTAINER_multipeermap_create (10,
1109                                                       GNUNET_NO);
1110   ch = GNUNET_TRANSPORT_communicator_connect (cfg,
1111                                               COMMUNICATOR_CONFIG_SECTION,
1112                                               COMMUNICATOR_ADDRESS_PREFIX,
1113                                               GNUNET_TRANSPORT_CC_RELIABLE,
1114                                               &mq_init,
1115                                               NULL,
1116                                               &enc_notify_cb,
1117                                               NULL);
1118   if (NULL == ch)
1119   {
1120     GNUNET_break (0);
1121     GNUNET_SCHEDULER_shutdown ();
1122     GNUNET_free (unix_socket_path);
1123     return;
1124   }
1125   GNUNET_asprintf (&my_addr,
1126                    "%s-%s",
1127                    COMMUNICATOR_ADDRESS_PREFIX,
1128                    unix_socket_path);
1129   GNUNET_free (unix_socket_path);
1130   ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1131                                                   my_addr,
1132                                                   GNUNET_NT_LOOPBACK,
1133                                                   GNUNET_TIME_UNIT_FOREVER_REL);
1134   GNUNET_free (my_addr);
1135 }
1136
1137
1138 /**
1139  * The main function for the UNIX communicator.
1140  *
1141  * @param argc number of arguments from the command line
1142  * @param argv command line arguments
1143  * @return 0 ok, 1 on error
1144  */
1145 int
1146 main (int argc,
1147       char *const *argv)
1148 {
1149   static const struct GNUNET_GETOPT_CommandLineOption options[] = {
1150     GNUNET_GETOPT_OPTION_END
1151   };
1152   int ret;
1153
1154   if (GNUNET_OK !=
1155       GNUNET_STRINGS_get_utf8_args (argc, argv,
1156                                     &argc, &argv))
1157     return 2;
1158
1159   ret =
1160       (GNUNET_OK ==
1161        GNUNET_PROGRAM_run (argc, argv,
1162                            "gnunet-communicator-unix",
1163                            _("GNUnet UNIX domain socket communicator"),
1164                            options,
1165                            &run,
1166                            NULL)) ? 0 : 1;
1167   GNUNET_free ((void*) argv);
1168   return ret;
1169 }
1170
1171
1172 #if defined(LINUX) && defined(__GLIBC__)
1173 #include <malloc.h>
1174
1175 /**
1176  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1177  */
1178 void __attribute__ ((constructor))
1179 GNUNET_ARM_memory_init ()
1180 {
1181   mallopt (M_TRIM_THRESHOLD, 4 * 1024);
1182   mallopt (M_TOP_PAD, 1 * 1024);
1183   malloc_trim (0);
1184 }
1185 #endif
1186
1187 /* end of gnunet-communicator-unix.c */