57fa22ecf012f878bc9f1bdb9ca9c6060415b112
[oweals/gnunet.git] / src / transport / gnunet-communicator-unix.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2010-2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file transport/gnunet-communicator-unix.c
21  * @brief Transport plugin using unix domain sockets (!)
22  *        Clearly, can only be used locally on Unix/Linux hosts...
23  *        ONLY INTENDED FOR TESTING!!!
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_nt_lib.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_transport_communication_service.h"
34
35 /**
36  * How many messages do we keep at most in the queue to the
37  * transport service before we start to drop (default,
38  * can be changed via the configuration file).
39  * Should be _below_ the level of the communicator API, as
40  * otherwise we may read messages just to have them dropped
41  * by the communicator API.
42  */
43 #define DEFAULT_MAX_QUEUE_LENGTH 8
44
45 /**
46  * Address prefix used by the communicator.
47  */
48 #define COMMUNICATOR_ADDRESS_PREFIX "unix"
49
50 /**
51  * Configuration section used by the communicator.
52  */
53 #define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
54
55 /**
56  * Our MTU.
57  */
58 #define UNIX_MTU UINT16_MAX
59
60 GNUNET_NETWORK_STRUCT_BEGIN
61
62 /**
63  * UNIX Message-Packet header.
64  */
65 struct UNIXMessage
66 {
67   /**
68    * Message header.
69    */
70   struct GNUNET_MessageHeader header;
71
72   /**
73    * What is the identity of the sender (GNUNET_hash of public key)
74    */
75   struct GNUNET_PeerIdentity sender;
76
77 };
78
79 GNUNET_NETWORK_STRUCT_END
80
81
82 /**
83  * Handle for a queue.
84  */
85 struct Queue
86 {
87
88   /**
89    * Queues with pending messages (!) are kept in a DLL.
90    */
91   struct Queue *next;
92
93   /**
94    * Queues with pending messages (!) are kept in a DLL.
95    */
96   struct Queue *prev;
97
98   /**
99    * To whom are we talking to.
100    */
101   struct GNUNET_PeerIdentity target;
102
103   /**
104    * Address of the other peer.
105    */
106   struct sockaddr_un *address;
107
108   /**
109    * Length of the address.
110    */
111   socklen_t address_len;
112
113   /**
114    * Message currently scheduled for transmission, non-NULL if and only
115    * if this queue is in the #queue_head DLL.
116    */
117   const struct GNUNET_MessageHeader *msg;
118
119   /**
120    * Message queue we are providing for the #ch.
121    */
122   struct GNUNET_MQ_Handle *mq;
123
124   /**
125    * handle for this queue with the #ch.
126    */
127   struct GNUNET_TRANSPORT_QueueHandle *qh;
128
129   /**
130    * Number of bytes we currently have in our write queue.
131    */
132   unsigned long long bytes_in_queue;
133
134   /**
135    * Timeout for this queue.
136    */
137   struct GNUNET_TIME_Absolute timeout;
138
139   /**
140    * Queue timeout task.
141    */
142   struct GNUNET_SCHEDULER_Task *timeout_task;
143
144 };
145
146
147 /**
148  * ID of read task
149  */
150 static struct GNUNET_SCHEDULER_Task *read_task;
151
152 /**
153  * ID of write task
154  */
155 static struct GNUNET_SCHEDULER_Task *write_task;
156
157 /**
158  * Number of messages we currently have in our queues towards the transport service.
159  */
160 static unsigned long long delivering_messages;
161
162 /**
163  * Maximum queue length before we stop reading towards the transport service.
164  */
165 static unsigned long long max_queue_length;
166
167 /**
168  * For logging statistics.
169  */
170 static struct GNUNET_STATISTICS_Handle *stats;
171
172 /**
173  * Our environment.
174  */
175 static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
176
177 /**
178  * Queues (map from peer identity to `struct Queue`)
179  */
180 static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
181
182 /**
183  * Head of queue of messages to transmit.
184  */
185 static struct Queue *queue_head;
186
187 /**
188  * Tail of queue of messages to transmit.
189  */
190 static struct Queue *queue_tail;
191
192 /**
193  * socket that we transmit all data with
194  */
195 static struct GNUNET_NETWORK_Handle *unix_sock;
196
197 /**
198  * Handle to the operation that publishes our address.
199  */
200 static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
201
202
203 /**
204  * Functions with this signature are called whenever we need
205  * to close a queue due to a disconnect or failure to
206  * establish a connection.
207  *
208  * @param queue queue to close down
209  */
210 static void
211 queue_destroy (struct Queue *queue)
212 {
213   struct GNUNET_MQ_Handle *mq;
214
215   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
216               "Disconnecting queue for peer `%s'\n",
217               GNUNET_i2s (&queue->target));
218   if (0 != queue->bytes_in_queue)
219   {
220     GNUNET_CONTAINER_DLL_remove (queue_head,
221                                  queue_tail,
222                                  queue);
223     queue->bytes_in_queue = 0;
224   }
225   if (NULL != (mq = queue->mq))
226   {
227     queue->mq = NULL;
228     GNUNET_MQ_destroy (mq);
229   }
230   GNUNET_assert (GNUNET_YES ==
231                  GNUNET_CONTAINER_multipeermap_remove (queue_map,
232                                                        &queue->target,
233                                                        queue));
234   GNUNET_STATISTICS_set (stats,
235                          "# UNIX queues active",
236                          GNUNET_CONTAINER_multipeermap_size (queue_map),
237                          GNUNET_NO);
238   if (NULL != queue->timeout_task)
239   {
240     GNUNET_SCHEDULER_cancel (queue->timeout_task);
241     queue->timeout_task = NULL;
242   }
243   GNUNET_free (queue->address);
244   GNUNET_free (queue);
245 }
246
247
248 /**
249  * Queue was idle for too long, so disconnect it
250  *
251  * @param cls the `struct Queue *` to disconnect
252  */
253 static void
254 queue_timeout (void *cls)
255 {
256   struct Queue *queue = cls;
257   struct GNUNET_TIME_Relative left;
258
259   queue->timeout_task = NULL;
260   left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
261   if (0 != left.rel_value_us)
262   {
263     /* not actually our turn yet, but let's at least update
264        the monitor, it may think we're about to die ... */
265     queue->timeout_task
266       = GNUNET_SCHEDULER_add_delayed (left,
267                                       &queue_timeout,
268                                       queue);
269     return;
270   }
271   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
272               "Queue %p was idle for %s, disconnecting\n",
273               queue,
274               GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
275                                                       GNUNET_YES));
276   queue_destroy (queue);
277 }
278
279
280 /**
281  * Increment queue timeout due to activity.  We do not immediately
282  * notify the monitor here as that might generate excessive
283  * signalling.
284  *
285  * @param queue queue for which the timeout should be rescheduled
286  */
287 static void
288 reschedule_queue_timeout (struct Queue *queue)
289 {
290   GNUNET_assert (NULL != queue->timeout_task);
291   queue->timeout
292     = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
293 }
294
295
296 /**
297  * Convert unix path to a `struct sockaddr_un *`
298  *
299  * @param unixpath path to convert
300  * @param[out] sock_len set to the length of the address
301  * @param is_abstract is this an abstract @a unixpath
302  * @return converted unix path
303  */
304 static struct sockaddr_un *
305 unix_address_to_sockaddr (const char *unixpath,
306                           socklen_t *sock_len)
307 {
308   struct sockaddr_un *un;
309   size_t slen;
310
311   GNUNET_assert (0 < strlen (unixpath));        /* sanity check */
312   un = GNUNET_new (struct sockaddr_un);
313   un->sun_family = AF_UNIX;
314   slen = strlen (unixpath);
315   if (slen >= sizeof (un->sun_path))
316     slen = sizeof (un->sun_path) - 1;
317   GNUNET_memcpy (un->sun_path,
318                  unixpath,
319                  slen);
320   un->sun_path[slen] = '\0';
321   slen = sizeof (struct sockaddr_un);
322 #if HAVE_SOCKADDR_UN_SUN_LEN
323   un->sun_len = (u_char) slen;
324 #endif
325   (*sock_len) = slen;
326   if ('@' == un->sun_path[0])
327     un->sun_path[0] = '\0';
328   return un;
329 }
330
331
332 /**
333  * Closure to #lookup_queue_it().
334  */
335 struct LookupCtx
336 {
337   /**
338    * Location to store the queue, if found.
339    */
340   struct Queue *res;
341
342   /**
343    * Address we are looking for.
344    */
345   const struct sockaddr_un *un;
346
347   /**
348    * Number of bytes in @a un
349    */
350   socklen_t un_len;
351 };
352
353
354 /**
355  * Function called to find a queue by address.
356  *
357  * @param cls the `struct LookupCtx *`
358  * @param key peer we are looking for (unused)
359  * @param value a queue
360  * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
361  */
362 static int
363 lookup_queue_it (void *cls,
364                  const struct GNUNET_PeerIdentity *key,
365                  void *value)
366 {
367   struct LookupCtx *lctx = cls;
368   struct Queue *queue = value;
369
370   if ( (queue->address_len = lctx->un_len) &&
371        (0 == memcmp (lctx->un,
372                      queue->address,
373                      queue->address_len)) )
374   {
375     lctx->res = queue;
376     return GNUNET_NO;
377   }
378   return GNUNET_YES;
379 }
380
381
382 /**
383  * Find an existing queue by address.
384  *
385  * @param plugin the plugin
386  * @param address the address to find
387  * @return NULL if queue was not found
388  */
389 static struct Queue *
390 lookup_queue (const struct GNUNET_PeerIdentity *peer,
391               const struct sockaddr_un *un,
392               socklen_t un_len)
393 {
394   struct LookupCtx lctx;
395
396   lctx.un = un;
397   lctx.un_len = un_len;
398   GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
399                                               peer,
400                                               &lookup_queue_it,
401                                               &lctx);
402   return lctx.res;
403 }
404
405
406 /**
407  * We have been notified that our socket is ready to write.
408  * Then reschedule this function to be called again once more is available.
409  *
410  * @param cls NULL
411  */
412 static void
413 select_write_cb (void *cls)
414 {
415   struct Queue *queue = queue_tail;
416   const struct GNUNET_MessageHeader *msg = queue->msg;
417   size_t msg_size = ntohs (msg->size);
418   ssize_t sent;
419
420   /* take queue of the ready list */
421   write_task = NULL;
422   GNUNET_CONTAINER_DLL_remove (queue_head,
423                                queue_tail,
424                                queue);
425   if (NULL != queue_head)
426     write_task =
427       GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
428                                       unix_sock,
429                                       &select_write_cb,
430                                       NULL);
431
432   /* send 'msg' */
433   queue->msg = NULL;
434   GNUNET_MQ_impl_send_continue (queue->mq);
435  resend:
436   /* Send the data */
437   sent = GNUNET_NETWORK_socket_sendto (unix_sock,
438                                        queue->msg,
439                                        msg_size,
440                                        (const struct sockaddr *) queue->address,
441                                        queue->address_len);
442   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443               "UNIX transmitted message to %s (%d/%u: %s)\n",
444               GNUNET_i2s (&queue->target),
445               (int) sent,
446               (unsigned int) msg_size,
447               (sent < 0) ? STRERROR (errno) : "ok");
448   if (-1 != sent)
449   {
450     GNUNET_STATISTICS_update (stats,
451                               "# bytes sent",
452                               (long long) sent,
453                               GNUNET_NO);
454     reschedule_queue_timeout (queue);
455     return; /* all good */
456   }
457   GNUNET_STATISTICS_update (stats,
458                             "# network transmission failures",
459                             1,
460                             GNUNET_NO);
461   switch (errno)
462   {
463   case EAGAIN:
464   case ENOBUFS:
465     /* We should retry later... */
466     GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG,
467                          "send");
468     return;
469   case EMSGSIZE:
470     {
471       socklen_t size = 0;
472       socklen_t len = sizeof (size);
473
474       GNUNET_NETWORK_socket_getsockopt (unix_sock,
475                                         SOL_SOCKET,
476                                         SO_SNDBUF,
477                                         &size,
478                                         &len);
479       if (size > ntohs (msg->size))
480       {
481         /* Buffer is bigger than message:  error, no retry
482          * This should never happen!*/
483         GNUNET_break (0);
484         return;
485       }
486       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
487                   "Trying to increase socket buffer size from %u to %u for message size %u\n",
488                   (unsigned int) size,
489                   (unsigned int) ((msg_size / 1000) + 2) * 1000,
490                   (unsigned int) msg_size);
491       size = ((msg_size / 1000) + 2) * 1000;
492       if (GNUNET_OK ==
493           GNUNET_NETWORK_socket_setsockopt (unix_sock,
494                                             SOL_SOCKET,
495                                             SO_SNDBUF,
496                                             &size,
497                                             sizeof (size)))
498         goto resend; /* Increased buffer size, retry sending */
499       /* Ok, then just try very modest increase */
500       size = msg_size;
501       if (GNUNET_OK ==
502           GNUNET_NETWORK_socket_setsockopt (unix_sock,
503                                             SOL_SOCKET,
504                                             SO_SNDBUF,
505                                             &size,
506                                             sizeof (size)))
507           goto resend; /* Increased buffer size, retry sending */
508       /* Could not increase buffer size: error, no retry */
509       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
510                            "setsockopt");
511       return;
512     }
513   default:
514     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
515                          "send");
516     return;
517   }
518 }
519
520
521 /**
522  * Signature of functions implementing the sending functionality of a
523  * message queue.
524  *
525  * @param mq the message queue
526  * @param msg the message to send
527  * @param impl_state our `struct Queue`
528  */
529 static void
530 mq_send (struct GNUNET_MQ_Handle *mq,
531          const struct GNUNET_MessageHeader *msg,
532          void *impl_state)
533 {
534   struct Queue *queue = impl_state;
535
536   GNUNET_assert (mq == queue->mq);
537   GNUNET_assert (NULL == queue->msg);
538   queue->msg = msg;
539   GNUNET_CONTAINER_DLL_insert (queue_head,
540                                queue_tail,
541                                queue);
542   GNUNET_assert (NULL != unix_sock);
543   if (NULL == write_task)
544     write_task =
545       GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
546                                       unix_sock,
547                                       &select_write_cb,
548                                       NULL);
549 }
550
551
552 /**
553  * Signature of functions implementing the destruction of a message
554  * queue.  Implementations must not free @a mq, but should take care
555  * of @a impl_state.
556  *
557  * @param mq the message queue to destroy
558  * @param impl_state our `struct Queue`
559  */
560 static void
561 mq_destroy (struct GNUNET_MQ_Handle *mq,
562             void *impl_state)
563 {
564   struct Queue *queue = impl_state;
565
566   if (mq == queue->mq)
567   {
568     queue->mq = NULL;
569     queue_destroy (queue);
570   }
571 }
572
573
574 /**
575  * Implementation function that cancels the currently sent message.
576  *
577  * @param mq message queue
578  * @param impl_state our `struct Queue`
579  */
580 static void
581 mq_cancel (struct GNUNET_MQ_Handle *mq,
582            void *impl_state)
583 {
584   struct Queue *queue = impl_state;
585
586   GNUNET_assert (NULL != queue->msg);
587   queue->msg = NULL;
588   GNUNET_CONTAINER_DLL_remove (queue_head,
589                                queue_tail,
590                                queue);
591   GNUNET_assert (NULL != write_task);
592   if (NULL == queue_head)
593   {
594     GNUNET_SCHEDULER_cancel (write_task);
595     write_task = NULL;
596   }
597 }
598
599
600 /**
601  * Generic error handler, called with the appropriate
602  * error code and the same closure specified at the creation of
603  * the message queue.
604  * Not every message queue implementation supports an error handler.
605  *
606  * @param cls our `struct Queue`
607  * @param error error code
608  */
609 static void
610 mq_error (void *cls,
611           enum GNUNET_MQ_Error error)
612 {
613   struct Queue *queue = cls;
614
615   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
616               "UNIX MQ error in queue to %s: %d\n",
617               GNUNET_i2s (&queue->target),
618               (int) error);
619   queue_destroy (queue);
620 }
621
622
623 /**
624  * Creates a new outbound queue the transport service will use to send
625  * data to another peer.
626  *
627  * @param peer the target peer
628  * @param cs inbound or outbound queue
629  * @param un the address
630  * @param un_len number of bytes in @a un
631  * @return the queue or NULL of max connections exceeded
632  */
633 static struct Queue *
634 setup_queue (const struct GNUNET_PeerIdentity *target,
635              enum GNUNET_TRANSPORT_ConnectionStatus cs,
636              const struct sockaddr_un *un,
637              socklen_t un_len)
638 {
639   struct Queue *queue;
640
641   queue = GNUNET_new (struct Queue);
642   queue->target = *target;
643   queue->address = GNUNET_memdup (un,
644                                   un_len);
645   queue->address_len = un_len;
646   (void) GNUNET_CONTAINER_multipeermap_put (queue_map,
647                                             &queue->target,
648                                             queue,
649                                             GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
650   GNUNET_STATISTICS_set (stats,
651                          "# queues active",
652                          GNUNET_CONTAINER_multipeermap_size (queue_map),
653                          GNUNET_NO);
654   queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
655   queue->timeout_task
656     = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
657                                     &queue_timeout,
658                                     queue);
659   queue->mq
660     = GNUNET_MQ_queue_for_callbacks (&mq_send,
661                                      &mq_destroy,
662                                      &mq_cancel,
663                                      queue,
664                                      NULL,
665                                      &mq_error,
666                                      queue);
667   {
668     char *foreign_addr;
669
670     if ('\0' == un->sun_path[0])
671       GNUNET_asprintf (&foreign_addr,
672                        "%s-@%s",
673                        COMMUNICATOR_ADDRESS_PREFIX,
674                        &un->sun_path[1]);
675     else
676       GNUNET_asprintf (&foreign_addr,
677                        "%s-%s",
678                        COMMUNICATOR_ADDRESS_PREFIX,
679                        un->sun_path);
680     queue->qh
681       = GNUNET_TRANSPORT_communicator_mq_add (ch,
682                                               &queue->target,
683                                               foreign_addr,
684                                               UNIX_MTU,
685                                               GNUNET_NT_LOOPBACK,
686                                               0 /* distance */,
687                                               cs,
688                                               queue->mq);
689     GNUNET_free (foreign_addr);
690   }
691   return queue;
692 }
693
694
695 /**
696  * We have been notified that our socket has something to read. Do the
697  * read and reschedule this function to be called again once more is
698  * available.
699  *
700  * @param cls NULL
701  */
702 static void
703 select_read_cb (void *cls);
704
705
706 /**
707  * Function called when message was successfully passed to
708  * transport service.  Continue read activity.
709  *
710  * @param cls NULL
711  * @param success #GNUNET_OK on success
712  */
713 static void
714 receive_complete_cb (void *cls,
715                      int success)
716 {
717   delivering_messages--;
718   if (GNUNET_OK != success)
719     GNUNET_STATISTICS_update (stats,
720                               "# transport transmission failures",
721                               1,
722                               GNUNET_NO);
723   GNUNET_assert (NULL != unix_sock);
724   if ( (NULL == read_task) &&
725        (delivering_messages < max_queue_length) )
726     read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
727                                                unix_sock,
728                                                &select_read_cb,
729                                                NULL);
730 }
731
732
733 /**
734  * We have been notified that our socket has something to read. Do the
735  * read and reschedule this function to be called again once more is
736  * available.
737  *
738  * @param cls NULL
739  */
740 static void
741 select_read_cb (void *cls)
742 {
743   char buf[65536] GNUNET_ALIGN;
744   struct Queue *queue;
745   const struct UNIXMessage *msg;
746   struct sockaddr_un un;
747   socklen_t addrlen;
748   ssize_t ret;
749   uint16_t msize;
750
751   GNUNET_assert (NULL != unix_sock);
752   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
753                                              unix_sock,
754                                              &select_read_cb,
755                                              NULL);
756   addrlen = sizeof (un);
757   memset (&un,
758           0,
759           sizeof (un));
760   ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
761                                         buf,
762                                         sizeof (buf),
763                                         (struct sockaddr *) &un,
764                                         &addrlen);
765   if ( (-1 == ret) &&
766        ( (EAGAIN == errno) ||
767          (ENOBUFS == errno) ) )
768     return;
769   if (-1 == ret)
770   {
771     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
772                          "recvfrom");
773     return;
774   }
775   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776               "Read %d bytes from socket %s\n",
777               (int) ret,
778               un.sun_path);
779   GNUNET_assert (AF_UNIX == (un.sun_family));
780   msg = (struct UNIXMessage *) buf;
781   msize = ntohs (msg->header.size);
782   if ( (msize < sizeof (struct UNIXMessage)) ||
783        (msize > ret) )
784   {
785     GNUNET_break_op (0);
786     return;
787   }
788   queue = lookup_queue (&msg->sender,
789                         &un,
790                         addrlen);
791   if (NULL == queue)
792     queue = setup_queue (&msg->sender,
793                          GNUNET_TRANSPORT_CS_INBOUND,
794                          &un,
795                          addrlen);
796   else
797     reschedule_queue_timeout (queue);
798   if (NULL == queue)
799   {
800     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
801                 _("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
802     return;
803   }
804
805   {
806     uint16_t offset = 0;
807     uint16_t tsize = msize - sizeof (struct UNIXMessage);
808     const char *msgbuf = (const char *) &msg[1];
809
810     while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
811     {
812       const struct GNUNET_MessageHeader *currhdr;
813       struct GNUNET_MessageHeader al_hdr;
814       uint16_t csize;
815
816       currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
817       /* ensure aligned access */
818       memcpy (&al_hdr,
819               currhdr,
820               sizeof (al_hdr));
821       csize = ntohs (al_hdr.size);
822       if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
823            (csize > tsize - offset))
824       {
825         GNUNET_break_op (0);
826         break;
827       }
828       ret = GNUNET_TRANSPORT_communicator_receive (ch,
829                                                    &msg->sender,
830                                                    currhdr,
831                                                    &receive_complete_cb,
832                                                    NULL);
833       if (GNUNET_SYSERR == ret)
834         return; /* transport not up */
835       if (GNUNET_NO == ret)
836         break;
837       delivering_messages++;
838       offset += csize;
839     }
840   }
841   if (delivering_messages >= max_queue_length)
842   {
843     /* we should try to apply 'back pressure' */
844     GNUNET_SCHEDULER_cancel (read_task);
845     read_task = NULL;
846   }
847 }
848
849
850 /**
851  * Function called by the transport service to initialize a
852  * message queue given address information about another peer.
853  * If and when the communication channel is established, the
854  * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
855  * to notify the service that the channel is now up.  It is
856  * the responsibility of the communicator to manage sane
857  * retries and timeouts for any @a peer/@a address combination
858  * provided by the transport service.  Timeouts and retries
859  * do not need to be signalled to the transport service.
860  *
861  * @param cls closure
862  * @param peer identity of the other peer
863  * @param address where to send the message, human-readable
864  *        communicator-specific format, 0-terminated, UTF-8
865  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
866  */
867 static int
868 mq_init (void *cls,
869          const struct GNUNET_PeerIdentity *peer,
870          const char *address)
871 {
872   struct Queue *queue;
873   const char *path;
874   struct sockaddr_un *un;
875   socklen_t un_len;
876
877   if (0 != strncmp (address,
878                     COMMUNICATOR_ADDRESS_PREFIX "-",
879                     strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
880   {
881     GNUNET_break_op (0);
882     return GNUNET_SYSERR;
883   }
884   path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
885   un = unix_address_to_sockaddr (path,
886                                  &un_len);
887   queue = lookup_queue (peer,
888                         un,
889                         un_len);
890   if (NULL != queue)
891   {
892     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
893                 "Address `%s' for %s ignored, queue exists\n",
894                 path,
895                 GNUNET_i2s (peer));
896     GNUNET_free (un);
897     return GNUNET_OK;
898   }
899   queue = setup_queue (peer,
900                        GNUNET_TRANSPORT_CS_OUTBOUND,
901                        un,
902                        un_len);
903   GNUNET_free (un);
904   if (NULL == queue)
905   {
906     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
907                 "Failed to setup queue to %s at `%s'\n",
908                 GNUNET_i2s (peer),
909                 path);
910     return GNUNET_NO;
911   }
912   return GNUNET_OK;
913 }
914
915
916 /**
917  * Iterator over all message queues to clean up.
918  *
919  * @param cls NULL
920  * @param target unused
921  * @param value the queue to destroy
922  * @return #GNUNET_OK to continue to iterate
923  */
924 static int
925 get_queue_delete_it (void *cls,
926                      const struct GNUNET_PeerIdentity *target,
927                      void *value)
928 {
929   struct Queue *queue = value;
930
931   (void) cls;
932   (void) target;
933   queue_destroy (queue);
934   return GNUNET_OK;
935 }
936
937
938 /**
939  * Shutdown the UNIX communicator.
940  *
941  * @param cls NULL (always)
942  */
943 static void
944 do_shutdown (void *cls)
945 {
946   if (NULL != read_task)
947   {
948     GNUNET_SCHEDULER_cancel (read_task);
949     read_task = NULL;
950   }
951   if (NULL != write_task)
952   {
953     GNUNET_SCHEDULER_cancel (write_task);
954     write_task = NULL;
955   }
956   if (NULL != unix_sock)
957   {
958     GNUNET_break (GNUNET_OK ==
959                   GNUNET_NETWORK_socket_close (unix_sock));
960     unix_sock = NULL;
961   }
962   GNUNET_CONTAINER_multipeermap_iterate (queue_map,
963                                          &get_queue_delete_it,
964                                          NULL);
965   GNUNET_CONTAINER_multipeermap_destroy (queue_map);
966   if (NULL != ai)
967   {
968     GNUNET_TRANSPORT_communicator_address_remove (ai);
969     ai = NULL;
970   }
971   if (NULL != ch)
972   {
973     GNUNET_TRANSPORT_communicator_disconnect (ch);
974     ch = NULL;
975   }
976   if (NULL != stats)
977   {
978     GNUNET_STATISTICS_destroy (stats,
979                                GNUNET_NO);
980     stats = NULL;
981   }
982 }
983
984
985 /**
986  * Function called when the transport service has received an
987  * acknowledgement for this communicator (!) via a different return
988  * path.
989  *
990  * Not applicable for UNIX.
991  *
992  * @param cls closure
993  * @param sender which peer sent the notification
994  * @param msg payload
995  */
996 static void
997 enc_notify_cb (void *cls,
998                const struct GNUNET_PeerIdentity *sender,
999                const struct GNUNET_MessageHeader *msg)
1000 {
1001   (void) cls;
1002   (void) sender;
1003   (void) msg;
1004   GNUNET_break_op (0);
1005 }
1006
1007
1008 /**
1009  * Setup communicator and launch network interactions.
1010  *
1011  * @param cls NULL (always)
1012  * @param args remaining command-line arguments
1013  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
1014  * @param cfg configuration
1015  */
1016 static void
1017 run (void *cls,
1018      char *const *args,
1019      const char *cfgfile,
1020      const struct GNUNET_CONFIGURATION_Handle *cfg)
1021 {
1022   char *unix_socket_path;
1023   struct sockaddr_un *un;
1024   socklen_t un_len;
1025   char *my_addr;
1026   (void) cls;
1027
1028   if (GNUNET_OK !=
1029       GNUNET_CONFIGURATION_get_value_filename (cfg,
1030                                                COMMUNICATOR_CONFIG_SECTION,
1031                                                "UNIXPATH",
1032                                                &unix_socket_path))
1033   {
1034     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1035                                COMMUNICATOR_CONFIG_SECTION,
1036                                "UNIXPATH");
1037     return;
1038   }
1039   if (GNUNET_OK !=
1040       GNUNET_CONFIGURATION_get_value_number (cfg,
1041                                              COMMUNICATOR_CONFIG_SECTION,
1042                                              "MAX_QUEUE_LENGTH",
1043                                              &max_queue_length))
1044     max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
1045
1046   un = unix_address_to_sockaddr (unix_socket_path,
1047                                  &un_len);
1048   if (NULL == un)
1049   {
1050     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1051                 "Failed to setup UNIX domain socket address with path `%s'\n",
1052                 unix_socket_path);
1053     GNUNET_free (unix_socket_path);
1054     return;
1055   }
1056   unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX,
1057                                             SOCK_DGRAM,
1058                                             0);
1059   if (NULL == unix_sock)
1060   {
1061     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
1062                          "socket");
1063     GNUNET_free (un);
1064     GNUNET_free (unix_socket_path);
1065     return;
1066   }
1067   if ( ('\0' != un->sun_path[0]) &&
1068        (GNUNET_OK !=
1069         GNUNET_DISK_directory_create_for_file (un->sun_path)) )
1070   {
1071     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1072                 _("Cannot create path to `%s'\n"),
1073                 un->sun_path);
1074     GNUNET_NETWORK_socket_close (unix_sock);
1075     unix_sock = NULL;
1076     GNUNET_free (un);
1077     GNUNET_free (unix_socket_path);
1078     return;
1079   }
1080   if (GNUNET_OK !=
1081       GNUNET_NETWORK_socket_bind (unix_sock,
1082                                   (const struct sockaddr *) un,
1083                                   un_len))
1084   {
1085     GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
1086                               "bind",
1087                               un->sun_path);
1088     GNUNET_NETWORK_socket_close (unix_sock);
1089     unix_sock = NULL;
1090     GNUNET_free (un);
1091     GNUNET_free (unix_socket_path);
1092     return;
1093   }
1094   GNUNET_free (un);
1095   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1096               "Bound to `%s'\n",
1097               unix_socket_path);
1098   stats = GNUNET_STATISTICS_create ("C-UNIX",
1099                                     cfg);
1100   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
1101                                  NULL);
1102   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1103                                              unix_sock,
1104                                              &select_read_cb,
1105                                              NULL);
1106   queue_map = GNUNET_CONTAINER_multipeermap_create (10,
1107                                                       GNUNET_NO);
1108   ch = GNUNET_TRANSPORT_communicator_connect (cfg,
1109                                               COMMUNICATOR_CONFIG_SECTION,
1110                                               COMMUNICATOR_ADDRESS_PREFIX,
1111                                               GNUNET_TRANSPORT_CC_RELIABLE,
1112                                               &mq_init,
1113                                               NULL,
1114                                               &enc_notify_cb,
1115                                               NULL);
1116   if (NULL == ch)
1117   {
1118     GNUNET_break (0);
1119     GNUNET_SCHEDULER_shutdown ();
1120     GNUNET_free (unix_socket_path);
1121     return;
1122   }
1123   GNUNET_asprintf (&my_addr,
1124                    "%s-%s",
1125                    COMMUNICATOR_ADDRESS_PREFIX,
1126                    unix_socket_path);
1127   GNUNET_free (unix_socket_path);
1128   ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1129                                                   my_addr,
1130                                                   GNUNET_NT_LOOPBACK,
1131                                                   GNUNET_TIME_UNIT_FOREVER_REL);
1132   GNUNET_free (my_addr);
1133 }
1134
1135
1136 /**
1137  * The main function for the UNIX communicator.
1138  *
1139  * @param argc number of arguments from the command line
1140  * @param argv command line arguments
1141  * @return 0 ok, 1 on error
1142  */
1143 int
1144 main (int argc,
1145       char *const *argv)
1146 {
1147   static const struct GNUNET_GETOPT_CommandLineOption options[] = {
1148     GNUNET_GETOPT_OPTION_END
1149   };
1150   int ret;
1151
1152   if (GNUNET_OK !=
1153       GNUNET_STRINGS_get_utf8_args (argc, argv,
1154                                     &argc, &argv))
1155     return 2;
1156
1157   ret =
1158       (GNUNET_OK ==
1159        GNUNET_PROGRAM_run (argc, argv,
1160                            "gnunet-communicator-unix",
1161                            _("GNUnet UNIX domain socket communicator"),
1162                            options,
1163                            &run,
1164                            NULL)) ? 0 : 1;
1165   GNUNET_free ((void*) argv);
1166   return ret;
1167 }
1168
1169
1170 #if defined(LINUX) && defined(__GLIBC__)
1171 #include <malloc.h>
1172
1173 /**
1174  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1175  */
1176 void __attribute__ ((constructor))
1177 GNUNET_ARM_memory_init ()
1178 {
1179   mallopt (M_TRIM_THRESHOLD, 4 * 1024);
1180   mallopt (M_TOP_PAD, 1 * 1024);
1181   malloc_trim (0);
1182 }
1183 #endif
1184
1185 /* end of gnunet-communicator-unix.c */