fixing tng testcase unix communicator
[oweals/gnunet.git] / src / transport / gnunet-communicator-unix.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2010-2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file transport/gnunet-communicator-unix.c
23  * @brief Transport plugin using unix domain sockets (!)
24  *        Clearly, can only be used locally on Unix/Linux hosts...
25  *        ONLY INTENDED FOR TESTING!!!
26  * @author Christian Grothoff
27  * @author Nathan Evans
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_nt_lib.h"
34 #include "gnunet_statistics_service.h"
35 #include "gnunet_transport_communication_service.h"
36
37 /**
38  * How many messages do we keep at most in the queue to the
39  * transport service before we start to drop (default,
40  * can be changed via the configuration file).
41  * Should be _below_ the level of the communicator API, as
42  * otherwise we may read messages just to have them dropped
43  * by the communicator API.
44  */
45 #define DEFAULT_MAX_QUEUE_LENGTH 8
46
47 /**
48  * Address prefix used by the communicator.
49  */
50 #define COMMUNICATOR_ADDRESS_PREFIX "unix"
51
52 /**
53  * Configuration section used by the communicator.
54  */
55 #define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
56
57 /**
58  * Our MTU.
59  */
60 #define UNIX_MTU UINT16_MAX
61
62 GNUNET_NETWORK_STRUCT_BEGIN
63
64 /**
65  * UNIX Message-Packet header.
66  */
67 struct UNIXMessage
68 {
69   /**
70    * Message header.
71    */
72   struct GNUNET_MessageHeader header;
73
74   /**
75    * What is the identity of the sender (GNUNET_hash of public key)
76    */
77   struct GNUNET_PeerIdentity sender;
78 };
79
80 GNUNET_NETWORK_STRUCT_END
81
82
83 /**
84  * Handle for a queue.
85  */
86 struct Queue
87 {
88   /**
89    * Queues with pending messages (!) are kept in a DLL.
90    */
91   struct Queue *next;
92
93   /**
94    * Queues with pending messages (!) are kept in a DLL.
95    */
96   struct Queue *prev;
97
98   /**
99    * To whom are we talking to.
100    */
101   struct GNUNET_PeerIdentity target;
102
103   /**
104    * Address of the other peer.
105    */
106   struct sockaddr_un *address;
107
108   /**
109    * Length of the address.
110    */
111   socklen_t address_len;
112
113   /**
114    * Message currently scheduled for transmission, non-NULL if and only
115    * if this queue is in the #queue_head DLL.
116    */
117   struct UNIXMessage *msg;
118
119   /**
120    * Message queue we are providing for the #ch.
121    */
122   struct GNUNET_MQ_Handle *mq;
123
124   /**
125    * handle for this queue with the #ch.
126    */
127   struct GNUNET_TRANSPORT_QueueHandle *qh;
128
129   /**
130    * Number of bytes we currently have in our write queue.
131    */
132   unsigned long long bytes_in_queue;
133
134   /**
135    * Timeout for this queue.
136    */
137   struct GNUNET_TIME_Absolute timeout;
138
139   /**
140    * Queue timeout task.
141    */
142   struct GNUNET_SCHEDULER_Task *timeout_task;
143 };
144
145 /**
146  * My Peer Identity
147  */
148 static struct GNUNET_PeerIdentity my_identity;
149
150 /**
151  * ID of read task
152  */
153 static struct GNUNET_SCHEDULER_Task *read_task;
154
155 /**
156  * ID of write task
157  */
158 static struct GNUNET_SCHEDULER_Task *write_task;
159
160 /**
161  * Number of messages we currently have in our queues towards the transport service.
162  */
163 static unsigned long long delivering_messages;
164
165 /**
166  * Maximum queue length before we stop reading towards the transport service.
167  */
168 static unsigned long long max_queue_length;
169
170 /**
171  * For logging statistics.
172  */
173 static struct GNUNET_STATISTICS_Handle *stats;
174
175 /**
176  * Our environment.
177  */
178 static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
179
180 /**
181  * Queues (map from peer identity to `struct Queue`)
182  */
183 static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
184
185 /**
186  * Head of queue of messages to transmit.
187  */
188 static struct Queue *queue_head;
189
190 /**
191  * Tail of queue of messages to transmit.
192  */
193 static struct Queue *queue_tail;
194
195 /**
196  * socket that we transmit all data with
197  */
198 static struct GNUNET_NETWORK_Handle *unix_sock;
199
200 /**
201  * Handle to the operation that publishes our address.
202  */
203 static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
204
205
206 /**
207  * Functions with this signature are called whenever we need
208  * to close a queue due to a disconnect or failure to
209  * establish a connection.
210  *
211  * @param queue queue to close down
212  */
213 static void
214 queue_destroy (struct Queue *queue)
215 {
216   struct GNUNET_MQ_Handle *mq;
217
218   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
219               "Disconnecting queue for peer `%s'\n",
220               GNUNET_i2s (&queue->target));
221   if (0 != queue->bytes_in_queue)
222   {
223     GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
224     queue->bytes_in_queue = 0;
225   }
226   if (NULL != (mq = queue->mq))
227   {
228     queue->mq = NULL;
229     GNUNET_MQ_destroy (mq);
230   }
231   GNUNET_assert (
232     GNUNET_YES ==
233     GNUNET_CONTAINER_multipeermap_remove (queue_map, &queue->target, queue));
234   GNUNET_STATISTICS_set (stats,
235                          "# queues active",
236                          GNUNET_CONTAINER_multipeermap_size (queue_map),
237                          GNUNET_NO);
238   if (NULL != queue->timeout_task)
239   {
240     GNUNET_SCHEDULER_cancel (queue->timeout_task);
241     queue->timeout_task = NULL;
242   }
243   GNUNET_free (queue->address);
244   GNUNET_free (queue);
245 }
246
247
248 /**
249  * Queue was idle for too long, so disconnect it
250  *
251  * @param cls the `struct Queue *` to disconnect
252  */
253 static void
254 queue_timeout (void *cls)
255 {
256   struct Queue *queue = cls;
257   struct GNUNET_TIME_Relative left;
258
259   queue->timeout_task = NULL;
260   left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
261   if (0 != left.rel_value_us)
262   {
263     /* not actually our turn yet, but let's at least update
264        the monitor, it may think we're about to die ... */
265     queue->timeout_task =
266       GNUNET_SCHEDULER_add_delayed (left, &queue_timeout, queue);
267     return;
268   }
269   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
270               "Queue %p was idle for %s, disconnecting\n",
271               queue,
272               GNUNET_STRINGS_relative_time_to_string (
273                 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
274                 GNUNET_YES));
275   queue_destroy (queue);
276 }
277
278
279 /**
280  * Increment queue timeout due to activity.  We do not immediately
281  * notify the monitor here as that might generate excessive
282  * signalling.
283  *
284  * @param queue queue for which the timeout should be rescheduled
285  */
286 static void
287 reschedule_queue_timeout (struct Queue *queue)
288 {
289   GNUNET_assert (NULL != queue->timeout_task);
290   queue->timeout =
291     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
292 }
293
294
295 /**
296  * Convert unix path to a `struct sockaddr_un *`
297  *
298  * @param unixpath path to convert
299  * @param[out] sock_len set to the length of the address
300  * @param is_abstract is this an abstract @a unixpath
301  * @return converted unix path
302  */
303 static struct sockaddr_un *
304 unix_address_to_sockaddr (const char *unixpath, socklen_t *sock_len)
305 {
306   struct sockaddr_un *un;
307   size_t slen;
308
309   GNUNET_assert (0 < strlen (unixpath));   /* sanity check */
310   un = GNUNET_new (struct sockaddr_un);
311   un->sun_family = AF_UNIX;
312   slen = strlen (unixpath);
313   if (slen >= sizeof(un->sun_path))
314     slen = sizeof(un->sun_path) - 1;
315   GNUNET_memcpy (un->sun_path, unixpath, slen);
316   un->sun_path[slen] = '\0';
317   slen = sizeof(struct sockaddr_un);
318 #if HAVE_SOCKADDR_UN_SUN_LEN
319   un->sun_len = (u_char) slen;
320 #endif
321   (*sock_len) = slen;
322   if ('@' == un->sun_path[0])
323     un->sun_path[0] = '\0';
324   return un;
325 }
326
327
328 /**
329  * Closure to #lookup_queue_it().
330  */
331 struct LookupCtx
332 {
333   /**
334    * Location to store the queue, if found.
335    */
336   struct Queue *res;
337
338   /**
339    * Address we are looking for.
340    */
341   const struct sockaddr_un *un;
342
343   /**
344    * Number of bytes in @a un
345    */
346   socklen_t un_len;
347 };
348
349
350 /**
351  * Function called to find a queue by address.
352  *
353  * @param cls the `struct LookupCtx *`
354  * @param key peer we are looking for (unused)
355  * @param value a queue
356  * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
357  */
358 static int
359 lookup_queue_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
360 {
361   struct LookupCtx *lctx = cls;
362   struct Queue *queue = value;
363
364   if ((queue->address_len == lctx->un_len) &&
365       (0 == memcmp (lctx->un, queue->address, queue->address_len)))
366   {
367     lctx->res = queue;
368     return GNUNET_NO;
369   }
370   return GNUNET_YES;
371 }
372
373
374 /**
375  * Find an existing queue by address.
376  *
377  * @param plugin the plugin
378  * @param address the address to find
379  * @return NULL if queue was not found
380  */
381 static struct Queue *
382 lookup_queue (const struct GNUNET_PeerIdentity *peer,
383               const struct sockaddr_un *un,
384               socklen_t un_len)
385 {
386   struct LookupCtx lctx;
387
388   lctx.un = un;
389   lctx.un_len = un_len;
390   lctx.res = NULL;
391   GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
392                                               peer,
393                                               &lookup_queue_it,
394                                               &lctx);
395   return lctx.res;
396 }
397
398
399 /**
400  * We have been notified that our socket is ready to write.
401  * Then reschedule this function to be called again once more is available.
402  *
403  * @param cls NULL
404  */
405 static void
406 select_write_cb (void *cls)
407 {
408   struct Queue *queue = queue_tail;
409   const struct GNUNET_MessageHeader *msg = &queue->msg->header;
410   size_t msg_size = ntohs (msg->size);
411   ssize_t sent;
412
413   /* take queue of the ready list */
414   write_task = NULL;
415   GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
416   if (NULL != queue_head)
417     write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
418                                                  unix_sock,
419                                                  &select_write_cb,
420                                                  NULL);
421
422   /* send 'msg' */
423   queue->msg = NULL;
424   GNUNET_MQ_impl_send_continue (queue->mq);
425 resend:
426   /* Send the data */
427   sent = GNUNET_NETWORK_socket_sendto (unix_sock,
428                                        msg,
429                                        msg_size,
430                                        (const struct sockaddr *) queue->address,
431                                        queue->address_len);
432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433               "UNIX transmitted message to %s (%d/%u: %s)\n",
434               GNUNET_i2s (&queue->target),
435               (int) sent,
436               (unsigned int) msg_size,
437               (sent < 0) ? strerror (errno) : "ok");
438   if (-1 != sent)
439   {
440     GNUNET_STATISTICS_update (stats,
441                               "# bytes sent",
442                               (long long) sent,
443                               GNUNET_NO);
444     reschedule_queue_timeout (queue);
445     return;   /* all good */
446   }
447   GNUNET_STATISTICS_update (stats,
448                             "# network transmission failures",
449                             1,
450                             GNUNET_NO);
451   switch (errno)
452   {
453   case EAGAIN:
454   case ENOBUFS:
455     /* We should retry later... */
456     GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, "send");
457     return;
458
459   case EMSGSIZE: {
460       socklen_t size = 0;
461       socklen_t len = sizeof(size);
462
463       GNUNET_NETWORK_socket_getsockopt (unix_sock,
464                                         SOL_SOCKET,
465                                         SO_SNDBUF,
466                                         &size,
467                                         &len);
468       if (size > ntohs (msg->size))
469       {
470         /* Buffer is bigger than message:  error, no retry
471          * This should never happen!*/
472         GNUNET_break (0);
473         return;
474       }
475       GNUNET_log (
476         GNUNET_ERROR_TYPE_DEBUG,
477         "Trying to increase socket buffer size from %u to %u for message size %u\n",
478         (unsigned int) size,
479         (unsigned int) ((msg_size / 1000) + 2) * 1000,
480         (unsigned int) msg_size);
481       size = ((msg_size / 1000) + 2) * 1000;
482       if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt (unix_sock,
483                                                          SOL_SOCKET,
484                                                          SO_SNDBUF,
485                                                          &size,
486                                                          sizeof(size)))
487         goto resend; /* Increased buffer size, retry sending */
488       /* Ok, then just try very modest increase */
489       size = msg_size;
490       if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt (unix_sock,
491                                                          SOL_SOCKET,
492                                                          SO_SNDBUF,
493                                                          &size,
494                                                          sizeof(size)))
495         goto resend; /* Increased buffer size, retry sending */
496       /* Could not increase buffer size: error, no retry */
497       GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt");
498       return;
499     }
500
501   default:
502     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "send");
503     return;
504   }
505 }
506
507
508 /**
509  * Signature of functions implementing the sending functionality of a
510  * message queue.
511  *
512  * @param mq the message queue
513  * @param msg the message to send
514  * @param impl_state our `struct Queue`
515  */
516 static void
517 mq_send (struct GNUNET_MQ_Handle *mq,
518          const struct GNUNET_MessageHeader *msg,
519          void *impl_state)
520 {
521   struct Queue *queue = impl_state;
522   size_t msize = ntohs (msg->size);
523
524   GNUNET_assert (mq == queue->mq);
525   GNUNET_assert (NULL == queue->msg);
526   //Convert to UNIXMessage
527   queue->msg = GNUNET_malloc (msize + sizeof (struct UNIXMessage));
528   queue->msg->header.size = htons(msize + sizeof (struct UNIXMessage));
529   queue->msg->sender = my_identity;
530   memcpy (&queue->msg[1], msg, msize);
531   GNUNET_CONTAINER_DLL_insert (queue_head, queue_tail, queue);
532   GNUNET_assert (NULL != unix_sock);
533   if (NULL == write_task)
534     write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
535                                                  unix_sock,
536                                                  &select_write_cb,
537                                                  NULL);
538 }
539
540
541 /**
542  * Signature of functions implementing the destruction of a message
543  * queue.  Implementations must not free @a mq, but should take care
544  * of @a impl_state.
545  *
546  * @param mq the message queue to destroy
547  * @param impl_state our `struct Queue`
548  */
549 static void
550 mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
551 {
552   struct Queue *queue = impl_state;
553
554   if (mq == queue->mq)
555   {
556     queue->mq = NULL;
557     queue_destroy (queue);
558   }
559 }
560
561
562 /**
563  * Implementation function that cancels the currently sent message.
564  *
565  * @param mq message queue
566  * @param impl_state our `struct Queue`
567  */
568 static void
569 mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
570 {
571   struct Queue *queue = impl_state;
572
573   GNUNET_assert (NULL != queue->msg);
574   queue->msg = NULL;
575   GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
576   GNUNET_assert (NULL != write_task);
577   if (NULL == queue_head)
578   {
579     GNUNET_SCHEDULER_cancel (write_task);
580     write_task = NULL;
581   }
582 }
583
584
585 /**
586  * Generic error handler, called with the appropriate
587  * error code and the same closure specified at the creation of
588  * the message queue.
589  * Not every message queue implementation supports an error handler.
590  *
591  * @param cls our `struct Queue`
592  * @param error error code
593  */
594 static void
595 mq_error (void *cls, enum GNUNET_MQ_Error error)
596 {
597   struct Queue *queue = cls;
598
599   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
600               "UNIX MQ error in queue to %s: %d\n",
601               GNUNET_i2s (&queue->target),
602               (int) error);
603   queue_destroy (queue);
604 }
605
606
607 /**
608  * Creates a new outbound queue the transport service will use to send
609  * data to another peer.
610  *
611  * @param peer the target peer
612  * @param cs inbound or outbound queue
613  * @param un the address
614  * @param un_len number of bytes in @a un
615  * @return the queue or NULL of max connections exceeded
616  */
617 static struct Queue *
618 setup_queue (const struct GNUNET_PeerIdentity *target,
619              enum GNUNET_TRANSPORT_ConnectionStatus cs,
620              const struct sockaddr_un *un,
621              socklen_t un_len)
622 {
623   struct Queue *queue;
624
625   queue = GNUNET_new (struct Queue);
626   queue->target = *target;
627   queue->address = GNUNET_memdup (un, un_len);
628   queue->address_len = un_len;
629   (void) GNUNET_CONTAINER_multipeermap_put (
630     queue_map,
631     &queue->target,
632     queue,
633     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
634   GNUNET_STATISTICS_set (stats,
635                          "# queues active",
636                          GNUNET_CONTAINER_multipeermap_size (queue_map),
637                          GNUNET_NO);
638   queue->timeout =
639     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
640   queue->timeout_task =
641     GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
642                                   &queue_timeout,
643                                   queue);
644   queue->mq = GNUNET_MQ_queue_for_callbacks (&mq_send,
645                                              &mq_destroy,
646                                              &mq_cancel,
647                                              queue,
648                                              NULL,
649                                              &mq_error,
650                                              queue);
651   {
652     char *foreign_addr;
653
654     if ('\0' == un->sun_path[0])
655       GNUNET_asprintf (&foreign_addr,
656                        "%s-@%s",
657                        COMMUNICATOR_ADDRESS_PREFIX,
658                        &un->sun_path[1]);
659     else
660       GNUNET_asprintf (&foreign_addr,
661                        "%s-%s",
662                        COMMUNICATOR_ADDRESS_PREFIX,
663                        un->sun_path);
664     queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch,
665                                                       &queue->target,
666                                                       foreign_addr,
667                                                       UNIX_MTU,
668                                                       GNUNET_NT_LOOPBACK,
669                                                       cs,
670                                                       queue->mq);
671     GNUNET_free (foreign_addr);
672   }
673   return queue;
674 }
675
676
677 /**
678  * We have been notified that our socket has something to read. Do the
679  * read and reschedule this function to be called again once more is
680  * available.
681  *
682  * @param cls NULL
683  */
684 static void
685 select_read_cb (void *cls);
686
687
688 /**
689  * Function called when message was successfully passed to
690  * transport service.  Continue read activity.
691  *
692  * @param cls NULL
693  * @param success #GNUNET_OK on success
694  */
695 static void
696 receive_complete_cb (void *cls, int success)
697 {
698   (void) cls;
699   delivering_messages--;
700   if (GNUNET_OK != success)
701     GNUNET_STATISTICS_update (stats,
702                               "# transport transmission failures",
703                               1,
704                               GNUNET_NO);
705   GNUNET_assert (NULL != unix_sock);
706   if ((NULL == read_task) && (delivering_messages < max_queue_length))
707     read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
708                                                unix_sock,
709                                                &select_read_cb,
710                                                NULL);
711 }
712
713
714 /**
715  * We have been notified that our socket has something to read. Do the
716  * read and reschedule this function to be called again once more is
717  * available.
718  *
719  * @param cls NULL
720  */
721 static void
722 select_read_cb (void *cls)
723 {
724   char buf[65536] GNUNET_ALIGN;
725   struct Queue *queue;
726   const struct UNIXMessage *msg;
727   struct sockaddr_un un;
728   socklen_t addrlen;
729   ssize_t ret;
730   uint16_t msize;
731
732   GNUNET_assert (NULL != unix_sock);
733   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
734                                              unix_sock,
735                                              &select_read_cb,
736                                              NULL);
737   addrlen = sizeof(un);
738   memset (&un, 0, sizeof(un));
739   ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
740                                         buf,
741                                         sizeof(buf),
742                                         (struct sockaddr *) &un,
743                                         &addrlen);
744   if ((-1 == ret) && ((EAGAIN == errno) || (ENOBUFS == errno)))
745     return;
746   if (-1 == ret)
747   {
748     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "recvfrom");
749     return;
750   }
751   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
752               "Read %d bytes from socket %s\n",
753               (int) ret,
754               un.sun_path);
755   GNUNET_assert (AF_UNIX == (un.sun_family));
756   msg = (struct UNIXMessage *) buf;
757   msize = ntohs (msg->header.size);
758   if ((msize < sizeof(struct UNIXMessage)) || (msize > ret))
759   {
760     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
761                 "Wrong message size: %d bytes\n",
762                 msize);
763     GNUNET_break_op (0);
764     return;
765   }
766   queue = lookup_queue (&msg->sender, &un, addrlen);
767   if (NULL == queue)
768     queue =
769       setup_queue (&msg->sender, GNUNET_TRANSPORT_CS_INBOUND, &un, addrlen);
770   else
771     reschedule_queue_timeout (queue);
772   if (NULL == queue)
773   {
774     GNUNET_log (
775       GNUNET_ERROR_TYPE_ERROR,
776       _ (
777         "Maximum number of UNIX connections exceeded, dropping incoming message\n"));
778     return;
779   }
780
781   {
782     uint16_t offset = 0;
783     uint16_t tsize = msize - sizeof(struct UNIXMessage);
784     const char *msgbuf = (const char *) &msg[1];
785
786     while (offset + sizeof(struct GNUNET_MessageHeader) <= tsize)
787     {
788       const struct GNUNET_MessageHeader *currhdr;
789       struct GNUNET_MessageHeader al_hdr;
790       uint16_t csize;
791
792       currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
793       /* ensure aligned access */
794       memcpy (&al_hdr, currhdr, sizeof(al_hdr));
795       csize = ntohs (al_hdr.size);
796       if ((csize < sizeof(struct GNUNET_MessageHeader)) ||
797           (csize > tsize - offset))
798       {
799         GNUNET_break_op (0);
800         break;
801       }
802       ret = GNUNET_TRANSPORT_communicator_receive (ch,
803                                                    &msg->sender,
804                                                    currhdr,
805                                                    GNUNET_TIME_UNIT_FOREVER_REL,
806                                                    &receive_complete_cb,
807                                                    NULL);
808       if (GNUNET_SYSERR == ret)
809         return;   /* transport not up */
810       if (GNUNET_NO == ret)
811         break;
812       delivering_messages++;
813       offset += csize;
814     }
815   }
816   if (delivering_messages >= max_queue_length)
817   {
818     /* we should try to apply 'back pressure' */
819     GNUNET_SCHEDULER_cancel (read_task);
820     read_task = NULL;
821   }
822 }
823
824
825 /**
826  * Function called by the transport service to initialize a
827  * message queue given address information about another peer.
828  * If and when the communication channel is established, the
829  * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
830  * to notify the service that the channel is now up.  It is
831  * the responsibility of the communicator to manage sane
832  * retries and timeouts for any @a peer/@a address combination
833  * provided by the transport service.  Timeouts and retries
834  * do not need to be signalled to the transport service.
835  *
836  * @param cls closure
837  * @param peer identity of the other peer
838  * @param address where to send the message, human-readable
839  *        communicator-specific format, 0-terminated, UTF-8
840  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
841  */
842 static int
843 mq_init (void *cls, const struct GNUNET_PeerIdentity *peer, const char *address)
844 {
845   struct Queue *queue;
846   const char *path;
847   struct sockaddr_un *un;
848   socklen_t un_len;
849
850   (void) cls;
851   if (0 != strncmp (address,
852                     COMMUNICATOR_ADDRESS_PREFIX "-",
853                     strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
854   {
855     GNUNET_break_op (0);
856     return GNUNET_SYSERR;
857   }
858   path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
859   un = unix_address_to_sockaddr (path, &un_len);
860   queue = lookup_queue (peer, un, un_len);
861   if (NULL != queue)
862   {
863     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
864                 "Address `%s' for %s ignored, queue exists\n",
865                 path,
866                 GNUNET_i2s (peer));
867     GNUNET_free (un);
868     return GNUNET_OK;
869   }
870   queue = setup_queue (peer, GNUNET_TRANSPORT_CS_OUTBOUND, un, un_len);
871   GNUNET_free (un);
872   if (NULL == queue)
873   {
874     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
875                 "Failed to setup queue to %s at `%s'\n",
876                 GNUNET_i2s (peer),
877                 path);
878     return GNUNET_NO;
879   }
880   return GNUNET_OK;
881 }
882
883
884 /**
885  * Iterator over all message queues to clean up.
886  *
887  * @param cls NULL
888  * @param target unused
889  * @param value the queue to destroy
890  * @return #GNUNET_OK to continue to iterate
891  */
892 static int
893 get_queue_delete_it (void *cls,
894                      const struct GNUNET_PeerIdentity *target,
895                      void *value)
896 {
897   struct Queue *queue = value;
898
899   (void) cls;
900   (void) target;
901   queue_destroy (queue);
902   return GNUNET_OK;
903 }
904
905
906 /**
907  * Shutdown the UNIX communicator.
908  *
909  * @param cls NULL (always)
910  */
911 static void
912 do_shutdown (void *cls)
913 {
914   if (NULL != read_task)
915   {
916     GNUNET_SCHEDULER_cancel (read_task);
917     read_task = NULL;
918   }
919   if (NULL != write_task)
920   {
921     GNUNET_SCHEDULER_cancel (write_task);
922     write_task = NULL;
923   }
924   if (NULL != unix_sock)
925   {
926     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (unix_sock));
927     unix_sock = NULL;
928   }
929   GNUNET_CONTAINER_multipeermap_iterate (queue_map, &get_queue_delete_it, NULL);
930   GNUNET_CONTAINER_multipeermap_destroy (queue_map);
931   if (NULL != ai)
932   {
933     GNUNET_TRANSPORT_communicator_address_remove (ai);
934     ai = NULL;
935   }
936   if (NULL != ch)
937   {
938     GNUNET_TRANSPORT_communicator_disconnect (ch);
939     ch = NULL;
940   }
941   if (NULL != stats)
942   {
943     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
944     stats = NULL;
945   }
946 }
947
948
949 /**
950  * Function called when the transport service has received an
951  * acknowledgement for this communicator (!) via a different return
952  * path.
953  *
954  * Not applicable for UNIX.
955  *
956  * @param cls closure
957  * @param sender which peer sent the notification
958  * @param msg payload
959  */
960 static void
961 enc_notify_cb (void *cls,
962                const struct GNUNET_PeerIdentity *sender,
963                const struct GNUNET_MessageHeader *msg)
964 {
965   (void) cls;
966   (void) sender;
967   (void) msg;
968   GNUNET_break_op (0);
969 }
970
971
972 /**
973  * Setup communicator and launch network interactions.
974  *
975  * @param cls NULL (always)
976  * @param args remaining command-line arguments
977  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
978  * @param cfg configuration
979  */
980 static void
981 run (void *cls,
982      char *const *args,
983      const char *cfgfile,
984      const struct GNUNET_CONFIGURATION_Handle *cfg)
985 {
986   char *unix_socket_path;
987   struct sockaddr_un *un;
988   socklen_t un_len;
989   char *my_addr;
990   struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key;
991
992   (void) cls;
993
994   my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
995   if (NULL == my_private_key)
996   {
997     GNUNET_log (
998       GNUNET_ERROR_TYPE_ERROR,
999       _ (
1000         "UNIX communicator is lacking key configuration settings. Exiting.\n"));
1001     GNUNET_SCHEDULER_shutdown ();
1002     return;
1003   }
1004   GNUNET_CRYPTO_eddsa_key_get_public (my_private_key, &my_identity.public_key);
1005
1006   if (GNUNET_OK !=
1007       GNUNET_CONFIGURATION_get_value_filename (cfg,
1008                                                COMMUNICATOR_CONFIG_SECTION,
1009                                                "UNIXPATH",
1010                                                &unix_socket_path))
1011   {
1012     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1013                                COMMUNICATOR_CONFIG_SECTION,
1014                                "UNIXPATH");
1015     return;
1016   }
1017   if (GNUNET_OK !=
1018       GNUNET_CONFIGURATION_get_value_number (cfg,
1019                                              COMMUNICATOR_CONFIG_SECTION,
1020                                              "MAX_QUEUE_LENGTH",
1021                                              &max_queue_length))
1022     max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
1023
1024   un = unix_address_to_sockaddr (unix_socket_path, &un_len);
1025   if (NULL == un)
1026   {
1027     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1028                 "Failed to setup UNIX domain socket address with path `%s'\n",
1029                 unix_socket_path);
1030     GNUNET_free (unix_socket_path);
1031     return;
1032   }
1033   unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX, SOCK_DGRAM, 0);
1034   if (NULL == unix_sock)
1035   {
1036     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "socket");
1037     GNUNET_free (un);
1038     GNUNET_free (unix_socket_path);
1039     return;
1040   }
1041   if (('\0' != un->sun_path[0]) &&
1042       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (un->sun_path)))
1043   {
1044     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1045                 _ ("Cannot create path to `%s'\n"),
1046                 un->sun_path);
1047     GNUNET_NETWORK_socket_close (unix_sock);
1048     unix_sock = NULL;
1049     GNUNET_free (un);
1050     GNUNET_free (unix_socket_path);
1051     return;
1052   }
1053   if (GNUNET_OK != GNUNET_NETWORK_socket_bind (unix_sock,
1054                                                (const struct sockaddr *) un,
1055                                                un_len))
1056   {
1057     GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR, "bind", un->sun_path);
1058     GNUNET_NETWORK_socket_close (unix_sock);
1059     unix_sock = NULL;
1060     GNUNET_free (un);
1061     GNUNET_free (unix_socket_path);
1062     return;
1063   }
1064   GNUNET_free (un);
1065   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bound to `%s'\n", unix_socket_path);
1066   stats = GNUNET_STATISTICS_create ("C-UNIX", cfg);
1067   GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
1068   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1069                                              unix_sock,
1070                                              &select_read_cb,
1071                                              NULL);
1072   queue_map = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1073   ch = GNUNET_TRANSPORT_communicator_connect (cfg,
1074                                               COMMUNICATOR_CONFIG_SECTION,
1075                                               COMMUNICATOR_ADDRESS_PREFIX,
1076                                               GNUNET_TRANSPORT_CC_RELIABLE,
1077                                               &mq_init,
1078                                               NULL,
1079                                               &enc_notify_cb,
1080                                               NULL);
1081   if (NULL == ch)
1082   {
1083     GNUNET_break (0);
1084     GNUNET_SCHEDULER_shutdown ();
1085     GNUNET_free (unix_socket_path);
1086     return;
1087   }
1088   GNUNET_asprintf (&my_addr,
1089                    "%s-%s",
1090                    COMMUNICATOR_ADDRESS_PREFIX,
1091                    unix_socket_path);
1092   GNUNET_free (unix_socket_path);
1093   ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1094                                                   my_addr,
1095                                                   GNUNET_NT_LOOPBACK,
1096                                                   GNUNET_TIME_UNIT_FOREVER_REL);
1097   GNUNET_free (my_addr);
1098 }
1099
1100
1101 /**
1102  * The main function for the UNIX communicator.
1103  *
1104  * @param argc number of arguments from the command line
1105  * @param argv command line arguments
1106  * @return 0 ok, 1 on error
1107  */
1108 int
1109 main (int argc, char *const *argv)
1110 {
1111   static const struct GNUNET_GETOPT_CommandLineOption options[] = {
1112     GNUNET_GETOPT_OPTION_END
1113   };
1114   int ret;
1115
1116   if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv))
1117     return 2;
1118
1119   ret = (GNUNET_OK ==
1120          GNUNET_PROGRAM_run (argc,
1121                              argv,
1122                              "gnunet-communicator-unix",
1123                              _ ("GNUnet UNIX domain socket communicator"),
1124                              options,
1125                              &run,
1126                              NULL))
1127         ? 0
1128         : 1;
1129   GNUNET_free ((void *) argv);
1130   return ret;
1131 }
1132
1133
1134 #if defined(__linux__) && defined(__GLIBC__)
1135 #include <malloc.h>
1136
1137 /**
1138  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1139  */
1140 void __attribute__ ((constructor))
1141 GNUNET_ARM_memory_init ()
1142 {
1143   mallopt (M_TRIM_THRESHOLD, 4 * 1024);
1144   mallopt (M_TOP_PAD, 1 * 1024);
1145   malloc_trim (0);
1146 }
1147
1148
1149 #endif
1150
1151 /* end of gnunet-communicator-unix.c */