tng: UNIX communicator fixes. Test fixes
[oweals/gnunet.git] / src / transport / transport-testing2.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2019 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file transport/transport-testing2.c
23  * @brief functions related to testing-tng
24  * @author Christian Grothoff
25  * @author Julius Bünger
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_constants.h"
31 #include "transport-testing2.h"
32 #include "gnunet_ats_transport_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_signatures.h"
35 #include "transport.h"
36 #include <inttypes.h>
37
38 #define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__)
39
40 struct MyClient
41 {
42   struct MyClient *prev;
43   struct MyClient *next;
44   /**
45    * @brief Handle to the client
46    */
47   struct GNUNET_SERVICE_Client *client;
48
49   /**
50    * @brief Handle to the client
51    */
52   struct GNUNET_MQ_Handle *c_mq;
53
54   /**
55    * The TCH
56    */
57   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc;
58
59 };
60
61 /**
62  * @brief Handle to a transport communicator
63  */
64 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
65 {
66   /**
67    * Clients
68    */
69   struct MyClient *client_head;
70   struct MyClient *client_tail;
71
72   /**
73   * @brief Handle to the client
74   */
75   struct GNUNET_MQ_Handle *c_mq;
76
77   /**
78     * @brief Handle to the configuration
79     */
80   struct GNUNET_CONFIGURATION_Handle *cfg;
81
82   /**
83    * @brief File name of configuration file
84    */
85   char *cfg_filename;
86
87   struct GNUNET_PeerIdentity peer_id;
88
89   /**
90    * @brief Handle to the transport service
91    */
92   struct GNUNET_SERVICE_Handle *tsh;
93
94   /**
95    * @brief Task that will be run on shutdown to stop and clean transport
96    * service
97    */
98   struct GNUNET_SCHEDULER_Task *ts_shutdown_task;
99
100
101   /**
102    * @brief Process of the communicator
103    */
104   struct GNUNET_OS_Process *c_proc;
105
106   /**
107    * NAT process
108    */
109   struct GNUNET_OS_Process *nat_proc;
110
111   /**
112    * @brief Task that will be run on shutdown to stop and clean communicator
113    */
114   struct GNUNET_SCHEDULER_Task *c_shutdown_task;
115
116   /**
117    * @brief Characteristics of the communicator
118    */
119   enum GNUNET_TRANSPORT_CommunicatorCharacteristics c_characteristics;
120
121   /**
122    * @brief Specifies supported addresses
123    */
124   char *c_addr_prefix;
125
126   /**
127    * @brief Specifies supported addresses
128    */
129   char *c_address;
130
131   /**
132    * @brief Head of the DLL of queues associated with this communicator
133    */
134   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *queue_head;
135
136   /**
137    * @brief Tail of the DLL of queues associated with this communicator
138    */
139   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *queue_tail;
140
141   /* Callbacks + Closures */
142   /**
143    * @brief Callback called when a new communicator connects
144    */
145   GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback
146     communicator_available_cb;
147
148   /**
149    * @brief Callback called when a new communicator connects
150    */
151   GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb;
152
153   /**
154    * @brief Callback called when a new communicator connects
155    */
156   GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb;
157
158   /**
159    * @brief Callback called when a new communicator connects
160    */
161   GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb;
162
163   /**
164    * @brief Callback called when a new communicator connects
165    */
166   GNUNET_TRANSPORT_TESTING_IncomingMessageCallback incoming_msg_cb;
167
168   /**
169    * @brief Backchannel callback
170    */
171   GNUNET_TRANSPORT_TESTING_BackchannelCallback bc_cb;
172
173   /**
174    * Our service handle
175    */
176   struct GNUNET_SERVICE_Handle *sh;
177
178   /**
179    * @brief Closure to the callback
180    */
181   void *cb_cls;
182
183   /**
184    * Backchannel supported
185    */
186   int bc_enabled;
187 };
188
189
190 /**
191  * @brief Queue of a communicator and some context
192  */
193 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
194 {
195   /**
196    * @brief Handle to the TransportCommunicator
197    */
198   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h;
199
200   /**
201    * @brief Envelope to a message that requests the opening of the queue.
202    *
203    * If the client already requests queue(s), but the communicator is not yet
204    * connected, we cannot send the request to open the queue. Save it until the
205    * communicator becomes available and send it then.
206    */
207   struct GNUNET_MQ_Envelope *open_queue_env;
208
209   /**
210    * @brief Peer ID of the peer on the other side of the queue
211    */
212   struct GNUNET_PeerIdentity peer_id;
213
214   /**
215    * @brief Queue ID
216    */
217   uint32_t qid;
218
219   /**
220    * @brief Current message id
221    */
222   uint64_t mid;
223
224   /**
225    * An `enum GNUNET_NetworkType` in NBO.
226    */
227   uint32_t nt;
228
229   /**
230    * Maximum transmission unit.  UINT32_MAX for unlimited.
231    */
232   uint32_t mtu;
233
234   /**
235    * Queue length.  UINT64_MAX for unlimited.
236    */
237   uint64_t q_len;
238
239   /**
240    * Queue prio
241    */
242   uint32_t priority;
243
244   /**
245    * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
246    */
247   uint32_t cs;
248
249   /**
250    * @brief Next element inside a DLL
251    */
252   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *next;
253
254   /**
255    * @brief Previous element inside a DLL
256    */
257   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *prev;
258 };
259
260
261 /**
262  * @brief Handle/Context to a single transmission
263  */
264 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorTransmission
265 {
266 };
267
268
269 /**
270  * @brief Check whether incoming msg indicating available communicator is
271  * correct
272  *
273  * @param cls Closure
274  * @param msg Message struct
275  *
276  * @return GNUNET_YES in case message is correct
277  */
278 static int
279 check_communicator_available (
280   void *cls,
281   const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *msg)
282 {
283   uint16_t size;
284
285   size = ntohs (msg->header.size) - sizeof(*msg);
286   if (0 == size)
287     return GNUNET_OK; /* receive-only communicator */
288   GNUNET_MQ_check_zero_termination (msg);
289   return GNUNET_OK;
290 }
291
292
293 /**
294  * @brief Handle new communicator
295  *
296  * Store characteristics of communicator, call respective client callback.
297  *
298  * @param cls Closure - communicator handle
299  * @param msg Message struct
300  */
301 static void
302 handle_communicator_available (
303   void *cls,
304   const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *msg)
305 {
306   struct MyClient *client = cls;
307   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
308     client->tc;
309   uint16_t size;
310   tc_h->c_mq = client->c_mq;
311
312   size = ntohs (msg->header.size) - sizeof(*msg);
313   if (0 == size)
314   {
315     GNUNET_SERVICE_client_continue (client->client);
316     return; /* receive-only communicator */
317   }
318   tc_h->c_characteristics = ntohl (msg->cc);
319   tc_h->c_addr_prefix = GNUNET_strdup ((const char *) &msg[1]);
320   if (NULL != tc_h->communicator_available_cb)
321   {
322     LOG (GNUNET_ERROR_TYPE_DEBUG, "calling communicator_available_cb()\n");
323     tc_h->communicator_available_cb (tc_h->cb_cls,
324                                      tc_h,
325                                      tc_h->c_characteristics,
326                                      tc_h->c_addr_prefix);
327   }
328   GNUNET_SERVICE_client_continue (client->client);
329   LOG (GNUNET_ERROR_TYPE_DEBUG, "finished communicator_available_cb()\n");
330
331 }
332
333
334 /**
335  * Incoming message.  Test message is well-formed.
336  *
337  * @param cls the client
338  * @param msg the send message that was sent
339  * @return #GNUNET_OK if message is well-formed
340  */
341 static int
342 check_communicator_backchannel (void *cls,
343                                 const struct
344                                 GNUNET_TRANSPORT_CommunicatorBackchannel *msg)
345 {
346   // struct TransportClient *tc = cls;
347
348   // if (CT_COMMUNICATOR != tc->type)
349   // {
350   //  GNUNET_break (0);
351   //  return GNUNET_SYSERR;
352   // }
353   // GNUNET_MQ_check_boxed_message (msg);
354   return GNUNET_OK;
355 }
356
357
358 /**
359  * @brief Receive an incoming message.
360  *
361  * Pass the message to the client.
362  *
363  * @param cls Closure - communicator handle
364  * @param msg Message
365  */
366 static void
367 handle_communicator_backchannel (void *cls,
368                                  const struct
369                                  GNUNET_TRANSPORT_CommunicatorBackchannel *
370                                  bc_msg)
371 {
372   struct MyClient *client = cls;
373   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
374     client->tc;
375   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *other_tc_h;
376   struct GNUNET_MessageHeader *msg;
377   msg = (struct GNUNET_MessageHeader *) &bc_msg[1];
378   uint16_t isize = ntohs (msg->size);
379   const char *target_communicator = ((const char *) msg) + isize;
380   struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
381   struct GNUNET_MQ_Envelope *env;
382
383   LOG (GNUNET_ERROR_TYPE_DEBUG,
384        "Received backchannel message\n");
385   if (tc_h->bc_enabled != GNUNET_YES)
386   {
387     GNUNET_SERVICE_client_continue (client->client);
388     return;
389   }
390   /* Find client providing this communicator */
391   /* Finally, deliver backchannel message to communicator */
392   LOG (GNUNET_ERROR_TYPE_DEBUG,
393        "Delivering backchannel message of type %u to %s\n",
394        ntohs (msg->type),
395        target_communicator);
396   other_tc_h = tc_h->bc_cb (tc_h, msg, (struct
397                                         GNUNET_PeerIdentity*) &bc_msg->pid);
398   env = GNUNET_MQ_msg_extra (
399     cbi,
400     isize,
401     GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING);
402   cbi->pid = tc_h->peer_id;
403   memcpy (&cbi[1], msg, isize);
404
405
406   GNUNET_MQ_send (other_tc_h->c_mq, env);
407   GNUNET_SERVICE_client_continue (client->client);
408 }
409
410
411 /**
412  * Address of our peer added.  Test message is well-formed.
413  *
414  * @param cls the client
415  * @param aam the send message that was sent
416  * @return #GNUNET_OK if message is well-formed
417  */
418 static int
419 check_add_address (void *cls,
420                    const struct GNUNET_TRANSPORT_AddAddressMessage *msg)
421 {
422   // if (CT_COMMUNICATOR != tc->type)
423   // {
424   //  GNUNET_break (0);
425   //  return GNUNET_SYSERR;
426   // }
427   GNUNET_MQ_check_zero_termination (msg);
428   return GNUNET_OK;
429 }
430
431
432 /**
433  * @brief The communicator informs about an address.
434  *
435  * Store address and call client callback.
436  *
437  * @param cls Closure - communicator handle
438  * @param msg Message
439  */
440 static void
441 handle_add_address (void *cls,
442                     const struct GNUNET_TRANSPORT_AddAddressMessage *msg)
443 {
444   struct MyClient *client = cls;
445   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
446     client->tc;
447   uint16_t size;
448   size = ntohs (msg->header.size) - sizeof(*msg);
449   if (0 == size)
450     return; /* receive-only communicator */
451   LOG (GNUNET_ERROR_TYPE_DEBUG, "received add address cb %u\n", size);
452   tc_h->c_address = GNUNET_strdup ((const char *) &msg[1]);
453   if (NULL != tc_h->add_address_cb)
454   {
455     LOG (GNUNET_ERROR_TYPE_DEBUG, "calling add_address_cb()\n");
456     tc_h->add_address_cb (tc_h->cb_cls,
457                           tc_h,
458                           tc_h->c_address,
459                           GNUNET_TIME_relative_ntoh (msg->expiration),
460                           msg->aid,
461                           ntohl (msg->nt));
462   }
463   GNUNET_SERVICE_client_continue (client->client);
464 }
465
466
467 /**
468  * Incoming message.  Test message is well-formed.
469  *
470  * @param cls the client
471  * @param msg the send message that was sent
472  * @return #GNUNET_OK if message is well-formed
473  */
474 static int
475 check_incoming_msg (void *cls,
476                     const struct GNUNET_TRANSPORT_IncomingMessage *msg)
477 {
478   // struct TransportClient *tc = cls;
479
480   // if (CT_COMMUNICATOR != tc->type)
481   // {
482   //  GNUNET_break (0);
483   //  return GNUNET_SYSERR;
484   // }
485   GNUNET_MQ_check_boxed_message (msg);
486   return GNUNET_OK;
487 }
488
489
490 /**
491  * @brief Receive an incoming message.
492  *
493  * Pass the message to the client.
494  *
495  * @param cls Closure - communicator handle
496  * @param msg Message
497  */
498 static void
499 handle_incoming_msg (void *cls,
500                      const struct GNUNET_TRANSPORT_IncomingMessage *inc_msg)
501 {
502   struct MyClient *client = cls;
503   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
504     client->tc;
505   struct GNUNET_MessageHeader *msg;
506   msg = (struct GNUNET_MessageHeader *) &inc_msg[1];
507   size_t payload_len = ntohs (msg->size) - sizeof (struct
508                                                    GNUNET_MessageHeader);
509   if (NULL != tc_h->incoming_msg_cb)
510   {
511     tc_h->incoming_msg_cb (tc_h->cb_cls,
512                            tc_h,
513                            (char*) &msg[1],
514                            payload_len);
515   }
516   else
517   {
518     LOG (GNUNET_ERROR_TYPE_WARNING,
519          "Incoming message from communicator but no handler!\n");
520   }
521   if (GNUNET_YES == ntohl (inc_msg->fc_on))
522   {
523     /* send ACK when done to communicator for flow control! */
524     struct GNUNET_MQ_Envelope *env;
525     struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
526
527     env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
528     GNUNET_assert (NULL != env);
529     ack->reserved = htonl (0);
530     ack->fc_id = inc_msg->fc_id;
531     ack->sender = inc_msg->sender;
532     GNUNET_MQ_send (tc_h->c_mq, env);
533   }
534
535   GNUNET_SERVICE_client_continue (client->client);
536 }
537
538
539 /**
540  * @brief Communicator informs that it tries to establish requested queue
541  *
542  * @param cls Closure - communicator handle
543  * @param msg Message
544  */
545 static void
546 handle_queue_create_ok (void *cls,
547                         const struct GNUNET_TRANSPORT_CreateQueueResponse *msg)
548 {
549   struct MyClient *client = cls;
550   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
551     client->tc;
552
553   if (NULL != tc_h->queue_create_reply_cb)
554   {
555     tc_h->queue_create_reply_cb (tc_h->cb_cls, tc_h, GNUNET_YES);
556   }
557   GNUNET_SERVICE_client_continue (client->client);
558 }
559
560
561 /**
562  * @brief Communicator informs that it wont try establishing requested queue.
563  *
564  * It will not do so probably because the address is bougus (see comment to
565  * #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL)
566  *
567  * @param cls Closure - communicator handle
568  * @param msg Message
569  */
570 static void
571 handle_queue_create_fail (
572   void *cls,
573   const struct GNUNET_TRANSPORT_CreateQueueResponse *msg)
574 {
575   struct MyClient *client = cls;
576   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
577     client->tc;
578
579   if (NULL != tc_h->queue_create_reply_cb)
580   {
581     tc_h->queue_create_reply_cb (tc_h->cb_cls, tc_h, GNUNET_NO);
582   }
583   GNUNET_SERVICE_client_continue (client->client);
584 }
585
586
587 /**
588  * New queue became available.  Check message.
589  *
590  * @param cls the client
591  * @param aqm the send message that was sent
592  */
593 static int
594 check_add_queue_message (void *cls,
595                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
596 {
597   GNUNET_MQ_check_zero_termination (aqm);
598   return GNUNET_OK;
599 }
600
601
602 /**
603  * @brief Handle new queue
604  *
605  * Store context and call client callback.
606  *
607  * @param cls Closure - communicator handle
608  * @param msg Message struct
609  */
610 static void
611 handle_add_queue_message (void *cls,
612                           const struct GNUNET_TRANSPORT_AddQueueMessage *msg)
613 {
614   struct MyClient *client = cls;
615   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
616     client->tc;
617   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
618
619   LOG (GNUNET_ERROR_TYPE_DEBUG,
620        "Got queue with ID %u\n", msg->qid);
621   for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue = tc_queue->next)
622   {
623     if (tc_queue->qid == msg->qid)
624       break;
625   }
626   if (NULL == tc_queue)
627   {
628     tc_queue =
629       GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
630     tc_queue->tc_h = tc_h;
631     tc_queue->qid = msg->qid;
632     tc_queue->peer_id = msg->receiver;
633     GNUNET_CONTAINER_DLL_insert (tc_h->queue_head, tc_h->queue_tail, tc_queue);
634   }
635   GNUNET_assert (tc_queue->qid == msg->qid);
636   GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
637   tc_queue->nt = msg->nt;
638   tc_queue->mtu = ntohl (msg->mtu);
639   tc_queue->cs = msg->cs;
640   tc_queue->priority = ntohl (msg->priority);
641   tc_queue->q_len = GNUNET_ntohll (msg->q_len);
642   if (NULL != tc_h->add_queue_cb)
643   {
644     tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu);
645   }
646   GNUNET_SERVICE_client_continue (client->client);
647 }
648
649
650 /**
651  * @brief Handle new queue
652  *
653  * Store context and call client callback.
654  *
655  * @param cls Closure - communicator handle
656  * @param msg Message struct
657  */
658 static void
659 handle_update_queue_message (void *cls,
660                              const struct
661                              GNUNET_TRANSPORT_UpdateQueueMessage *msg)
662 {
663   struct MyClient *client = cls;
664   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
665     client->tc;
666   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
667
668   LOG (GNUNET_ERROR_TYPE_DEBUG,
669        "Received queue update message for %u with q_len %"PRIu64"\n",
670        msg->qid, GNUNET_ntohll(msg->q_len));
671   tc_queue = tc_h->queue_head;
672   if (NULL != tc_queue)
673   {
674     while (tc_queue->qid != msg->qid)
675     {
676       tc_queue = tc_queue->next;
677     }
678   }
679   GNUNET_assert (tc_queue->qid == msg->qid);
680   GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
681   tc_queue->nt = msg->nt;
682   tc_queue->mtu = ntohl (msg->mtu);
683   tc_queue->cs = msg->cs;
684   tc_queue->priority = ntohl (msg->priority);
685   tc_queue->q_len += GNUNET_ntohll (msg->q_len);
686   GNUNET_SERVICE_client_continue (client->client);
687 }
688
689
690 /**
691  * @brief Shut down the service
692  *
693  * @param cls Closure - Handle to the service
694  */
695 static void
696 shutdown_service (void *cls)
697 {
698   struct GNUNET_SERVICE_Handle *h = cls;
699
700   GNUNET_SERVICE_stop (h);
701 }
702
703
704 /**
705  * @brief Callback called when new Client (Communicator) connects
706  *
707  * @param cls Closure - TransporCommmunicator Handle
708  * @param client Client
709  * @param mq Messagequeue
710  *
711  * @return TransportCommunicator Handle
712  */
713 static void *
714 connect_cb (void *cls,
715             struct GNUNET_SERVICE_Client *client,
716             struct GNUNET_MQ_Handle *mq)
717 {
718   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
719   struct MyClient *new_c;
720
721   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client %p connected to %p.\n",
722        client, tc_h);
723   new_c = GNUNET_new (struct MyClient);
724   new_c->client = client;
725   new_c->c_mq = mq;
726   new_c->tc = tc_h;
727   GNUNET_CONTAINER_DLL_insert (tc_h->client_head,
728                                tc_h->client_tail,
729                                new_c);
730
731   if (NULL == tc_h->queue_head)
732     return new_c;
733   /* Iterate over queues. They are yet to be opened. Request opening. */
734   for (struct
735        GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_iter =
736          tc_h->queue_head;
737        NULL != tc_queue_iter;
738        tc_queue_iter = tc_queue_iter->next)
739   {
740     if (NULL == tc_queue_iter->open_queue_env)
741       continue;
742     /* Send the previously created mq envelope to request the creation of the
743      * queue. */
744     GNUNET_MQ_send (tc_h->c_mq,
745                     tc_queue_iter->open_queue_env);
746     tc_queue_iter->open_queue_env = NULL;
747   }
748   return new_c;
749 }
750
751
752 /**
753  * @brief Callback called when Client disconnects
754  *
755  * @param cls Closure - TransportCommunicator Handle
756  * @param client Client
757  * @param internal_cls TransporCommmunicator Handle
758  */
759 static void
760 disconnect_cb (void *cls,
761                struct GNUNET_SERVICE_Client *client,
762                void *internal_cls)
763 {
764   struct MyClient *cl = cls;
765   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
766
767   for (cl = tc_h->client_head; NULL != cl; cl = cl->next)
768   {
769     if (cl->client != client)
770       continue;
771     GNUNET_CONTAINER_DLL_remove (tc_h->client_head,
772                                  tc_h->client_tail,
773                                  cl);
774     if (cl->c_mq == tc_h->c_mq)
775       tc_h->c_mq = NULL;
776     GNUNET_free (cl);
777     break;
778   }
779   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected.\n");
780 }
781
782
783 /**
784  * Message was transmitted.  Process the request.
785  *
786  * @param cls the client
787  * @param sma the send message that was sent
788  */
789 static void
790 handle_send_message_ack (void *cls,
791                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
792 {
793   struct MyClient *client = cls;
794   GNUNET_SERVICE_client_continue (client->client);
795   // NOP
796 }
797
798
799 /**
800  * @brief Start the communicator part of the transport service
801  *
802  * @param communicator_available Callback to be called when a new communicator
803  * becomes available
804  * @param cfg Configuration
805  */
806 static void
807 transport_communicator_start (
808   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
809 {
810   struct GNUNET_MQ_MessageHandler mh[] = {
811     GNUNET_MQ_hd_var_size (communicator_available,
812                            GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
813                            struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
814                            tc_h),
815     GNUNET_MQ_hd_var_size (communicator_backchannel,
816                            GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
817                            struct GNUNET_TRANSPORT_CommunicatorBackchannel,
818                            tc_h),
819     GNUNET_MQ_hd_var_size (add_address,
820                            GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
821                            struct GNUNET_TRANSPORT_AddAddressMessage,
822                            tc_h),
823     // GNUNET_MQ_hd_fixed_size (del_address,
824     //                         GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
825     //                         struct GNUNET_TRANSPORT_DelAddressMessage,
826     //                         NULL),
827     GNUNET_MQ_hd_var_size (incoming_msg,
828                            GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
829                            struct GNUNET_TRANSPORT_IncomingMessage,
830                            tc_h),
831     GNUNET_MQ_hd_fixed_size (queue_create_ok,
832                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
833                              struct GNUNET_TRANSPORT_CreateQueueResponse,
834                              tc_h),
835     GNUNET_MQ_hd_fixed_size (queue_create_fail,
836                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
837                              struct GNUNET_TRANSPORT_CreateQueueResponse,
838                              tc_h),
839     GNUNET_MQ_hd_var_size (add_queue_message,
840                            GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
841                            struct GNUNET_TRANSPORT_AddQueueMessage,
842                            tc_h),
843     GNUNET_MQ_hd_fixed_size (update_queue_message,
844                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE,
845                              struct GNUNET_TRANSPORT_UpdateQueueMessage,
846                              tc_h),
847     // GNUNET_MQ_hd_fixed_size (del_queue_message,
848     //                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
849     //                         struct GNUNET_TRANSPORT_DelQueueMessage,
850     //                         NULL),
851     GNUNET_MQ_hd_fixed_size (send_message_ack,
852                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
853                              struct GNUNET_TRANSPORT_SendMessageToAck,
854                              tc_h),
855     GNUNET_MQ_handler_end ()
856   };
857
858
859   tc_h->sh = GNUNET_SERVICE_start ("transport",
860                                    tc_h->cfg,
861                                    &connect_cb,
862                                    &disconnect_cb,
863                                    tc_h,
864                                    mh);
865   GNUNET_assert (NULL != tc_h->sh);
866 }
867
868
869 /**
870  * @brief Task run at shutdown to kill communicator and clean up
871  *
872  * @param cls Closure - Process of communicator
873  */
874 static void
875 shutdown_process (struct GNUNET_OS_Process *proc)
876 {
877   if (0 != GNUNET_OS_process_kill (proc, SIGTERM))
878   {
879     LOG (GNUNET_ERROR_TYPE_WARNING,
880          "Error shutting down communicator with SIGERM, trying SIGKILL\n");
881     if (0 != GNUNET_OS_process_kill (proc, SIGKILL))
882     {
883       LOG (GNUNET_ERROR_TYPE_ERROR,
884            "Error shutting down communicator with SIGERM and SIGKILL\n");
885     }
886   }
887   GNUNET_OS_process_destroy (proc);
888 }
889
890
891 static void
892 shutdown_communicator (void *cls)
893 {
894   struct GNUNET_OS_Process *proc = cls;
895   shutdown_process (proc);
896 }
897
898
899 /**
900  * @brief Start the communicator
901  *
902  * @param cfgname Name of the communicator
903  */
904 static void
905 communicator_start (
906   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
907   const char *binary_name)
908 {
909   char *binary;
910
911   LOG (GNUNET_ERROR_TYPE_DEBUG, "communicator_start\n");
912   binary = GNUNET_OS_get_libexec_binary_path (binary_name);
913   tc_h->c_proc = GNUNET_OS_start_process (GNUNET_YES,
914                                           GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
915                                           NULL,
916                                           NULL,
917                                           NULL,
918                                           binary,
919                                           binary_name,
920                                           "-c",
921                                           tc_h->cfg_filename,
922                                           NULL);
923   if (NULL == tc_h->c_proc)
924   {
925     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start communicator!");
926     return;
927   }
928   LOG (GNUNET_ERROR_TYPE_INFO, "started communicator\n");
929   GNUNET_free (binary);
930 }
931
932
933 /**
934  * @brief Task run at shutdown to kill communicator and clean up
935  *
936  * @param cls Closure - Process of communicator
937  */
938 static void
939 shutdown_nat (void *cls)
940 {
941   struct GNUNET_OS_Process *proc = cls;
942   shutdown_process (proc);
943 }
944
945
946 /**
947  * @brief Start NAT
948  *
949  */
950 static void
951 nat_start (
952   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
953 {
954   char *binary;
955
956   LOG (GNUNET_ERROR_TYPE_DEBUG, "nat_start\n");
957   binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-nat");
958   tc_h->nat_proc = GNUNET_OS_start_process (GNUNET_YES,
959                                             GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
960                                             NULL,
961                                             NULL,
962                                             NULL,
963                                             binary,
964                                             "gnunet-service-nat",
965                                             "-c",
966                                             tc_h->cfg_filename,
967                                             NULL);
968   if (NULL == tc_h->nat_proc)
969   {
970     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start NAT!");
971     return;
972   }
973   LOG (GNUNET_ERROR_TYPE_INFO, "started NAT\n");
974   GNUNET_free (binary);
975 }
976
977
978 /**
979  * @brief Start communicator part of transport service and communicator
980  *
981  * @param service_name Name of the service
982  * @param cfg Configuration handle
983  * @param communicator_available_cb Callback that is called when a new
984  * @param add_address_cb Callback that is called when a new
985  * communicator becomes available
986  * @param cb_cls Closure to @a communicator_available_cb and @a
987  *
988  * @return Handle to the communicator duo
989  */
990 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *
991 GNUNET_TRANSPORT_TESTING_transport_communicator_service_start (
992   const char *service_name,
993   const char *binary_name,
994   const char *cfg_filename,
995   const struct GNUNET_PeerIdentity *peer_id,
996   GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback
997   communicator_available_cb,
998   GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb,
999   GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb,
1000   GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb,
1001   GNUNET_TRANSPORT_TESTING_IncomingMessageCallback incoming_message_cb,
1002   GNUNET_TRANSPORT_TESTING_BackchannelCallback bc_cb,
1003   void *cb_cls)
1004 {
1005   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h;
1006
1007   LOG (GNUNET_ERROR_TYPE_DEBUG,
1008        "Starting new transport/communicator combo with config %s\n",
1009        cfg_filename);
1010   tc_h =
1011     GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle);
1012   tc_h->cfg_filename = GNUNET_strdup (cfg_filename);
1013   tc_h->cfg = GNUNET_CONFIGURATION_create ();
1014   if ((GNUNET_SYSERR == GNUNET_CONFIGURATION_load (tc_h->cfg, cfg_filename)))
1015   {
1016     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1017                 _ ("Malformed configuration file `%s', exit ...\n"),
1018                 cfg_filename);
1019     GNUNET_free (tc_h->cfg_filename);
1020     GNUNET_CONFIGURATION_destroy (tc_h->cfg);
1021     GNUNET_free (tc_h);
1022     return NULL;
1023   }
1024   tc_h->bc_enabled = GNUNET_CONFIGURATION_get_value_yesno (tc_h->cfg,
1025                                                            "communicator-test",
1026                                                            "BACKCHANNEL_ENABLED");
1027   tc_h->communicator_available_cb = communicator_available_cb;
1028   tc_h->add_address_cb = add_address_cb;
1029   tc_h->queue_create_reply_cb = queue_create_reply_cb;
1030   tc_h->add_queue_cb = add_queue_cb;
1031   tc_h->incoming_msg_cb = incoming_message_cb;
1032   tc_h->bc_cb = bc_cb;
1033   tc_h->peer_id = *peer_id;
1034   tc_h->cb_cls = cb_cls;
1035
1036   /* Start communicator part of service */
1037   transport_communicator_start (tc_h);
1038   /* Start NAT */
1039   nat_start (tc_h);
1040   /* Schedule start communicator */
1041   communicator_start (tc_h,
1042                       binary_name);
1043   return tc_h;
1044 }
1045
1046
1047 void
1048 GNUNET_TRANSPORT_TESTING_transport_communicator_service_stop (
1049   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
1050 {
1051   shutdown_communicator (tc_h->c_proc);
1052   shutdown_service (tc_h->sh);
1053   shutdown_nat (tc_h->nat_proc);
1054   GNUNET_CONFIGURATION_destroy (tc_h->cfg);
1055   GNUNET_free (tc_h);
1056 }
1057
1058
1059 /**
1060  * @brief Instruct communicator to open a queue
1061  *
1062  * @param tc_h Handle to communicator which shall open queue
1063  * @param peer_id Towards which peer
1064  * @param address For which address
1065  */
1066 void
1067 GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (
1068   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
1069   const struct GNUNET_PeerIdentity *peer_id,
1070   const char *address)
1071 {
1072   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
1073   static uint32_t idgen;
1074   char *prefix;
1075   struct GNUNET_TRANSPORT_CreateQueue *msg;
1076   struct GNUNET_MQ_Envelope *env;
1077   size_t alen;
1078
1079   tc_queue =
1080     GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
1081   tc_queue->tc_h = tc_h;
1082   prefix = GNUNET_HELLO_address_to_prefix (address);
1083   if (NULL == prefix)
1084   {
1085     GNUNET_break (0);  /* We got an invalid address!? */
1086     GNUNET_free (tc_queue);
1087     return;
1088   }
1089   GNUNET_free (prefix);
1090   alen = strlen (address) + 1;
1091   env =
1092     GNUNET_MQ_msg_extra (msg, alen, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
1093   msg->request_id = htonl (idgen++);
1094   tc_queue->qid = msg->request_id;
1095   msg->receiver = *peer_id;
1096   tc_queue->peer_id = *peer_id;
1097   memcpy (&msg[1], address, alen);
1098   if (NULL != tc_h->c_mq)
1099   {
1100     GNUNET_MQ_send (tc_h->c_mq, env);
1101   }
1102   else
1103   {
1104     tc_queue->open_queue_env = env;
1105   }
1106   GNUNET_CONTAINER_DLL_insert (tc_h->queue_head, tc_h->queue_tail, tc_queue);
1107 }
1108
1109
1110 /**
1111  * @brief Instruct communicator to send data
1112  *
1113  * @param tc_queue The queue to use for sending
1114  * @param cont function to call when done sending
1115  * @param cont_cls closure for @a cont
1116  * @param payload Data to send
1117  * @param payload_size Size of the @a payload
1118  */
1119 void
1120 GNUNET_TRANSPORT_TESTING_transport_communicator_send
1121   (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
1122   GNUNET_SCHEDULER_TaskCallback cont,
1123   void *cont_cls,
1124   const void *payload,
1125   size_t payload_size)
1126 {
1127   struct GNUNET_MessageHeader *mh;
1128   struct GNUNET_TRANSPORT_SendMessageTo *msg;
1129   struct GNUNET_MQ_Envelope *env;
1130   size_t inbox_size;
1131   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
1132   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp;
1133
1134   tc_queue = NULL;
1135   for (tc_queue_tmp = tc_h->queue_head;
1136        NULL != tc_queue_tmp;
1137        tc_queue_tmp = tc_queue_tmp->next)
1138   {
1139     if (tc_queue_tmp->q_len <= 0)
1140       continue;
1141     if (NULL == tc_queue)
1142     {
1143       LOG (GNUNET_ERROR_TYPE_DEBUG,
1144            "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
1145            tc_queue_tmp->priority,
1146            tc_queue_tmp->q_len,
1147            tc_queue_tmp->mtu);
1148       tc_queue = tc_queue_tmp;
1149       continue;
1150     }
1151     if (tc_queue->priority < tc_queue_tmp->priority)
1152     {
1153       LOG (GNUNET_ERROR_TYPE_DEBUG,
1154            "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
1155            tc_queue_tmp->priority,
1156            tc_queue_tmp->q_len,
1157            tc_queue_tmp->mtu);
1158       tc_queue = tc_queue_tmp;
1159     }
1160   }
1161   GNUNET_assert (NULL != tc_queue);
1162   if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED)
1163     tc_queue->q_len--;
1164   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1165               "Sending message\n");
1166   inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size;
1167   env = GNUNET_MQ_msg_extra (msg,
1168                              inbox_size,
1169                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
1170   GNUNET_assert (NULL != env);
1171   msg->qid = htonl (tc_queue->qid);
1172   msg->mid = tc_queue->mid++;
1173   msg->receiver = tc_queue->peer_id;
1174   mh = (struct GNUNET_MessageHeader *) &msg[1];
1175   mh->size = htons (inbox_size);
1176   mh->type = GNUNET_MESSAGE_TYPE_DUMMY;
1177   memcpy (&mh[1],
1178           payload,
1179           payload_size);
1180   if (NULL != cont)
1181     GNUNET_MQ_notify_sent (env,
1182                            cont,
1183                            cont_cls);
1184   GNUNET_MQ_send (tc_queue->tc_h->c_mq,
1185                   env);
1186 }