TNG testing: Add ability to open queue
[oweals/gnunet.git] / src / transport / transport-testing2.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2019 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file transport/transport-testing2.c
23  * @brief functions related to testing-tng
24  * @author Christian Grothoff
25  * @author Julius Bünger
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_constants.h"
31 #include "transport-testing2.h"
32 #include "gnunet_ats_transport_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_signatures.h"
35 #include "transport.h"
36
37
38 #define LOG(kind,...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__)
39
40
41 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
42 {
43   /**
44    * @brief Handle to the configuration
45    */
46   struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * @brief File name of configuration file
50    */
51   char *cfg_filename;
52
53   /**
54    * @brief Handle to the transport service
55    */
56   struct GNUNET_SERVICE_Handle *tsh;
57
58   /**
59    * @brief Task that will be run on shutdown to stop and clean transport
60    * service
61    */
62   struct GNUNET_SCHEDULER_Task *ts_shutdown_task;
63
64   /**
65    * @brief Handle to the client
66    */
67   struct GNUNET_SERVICE_Client *client;
68
69   /**
70    * @brief Handle to the client
71    */
72   struct GNUNET_MQ_Handle *c_mq;
73
74   /**
75    * @brief Process of the communicator
76    */
77   struct GNUNET_OS_Process *c_proc;
78
79   /**
80    * @brief Task that will be run on shutdown to stop and clean communicator
81    */
82   struct GNUNET_SCHEDULER_Task *c_shutdown_task;
83
84   /**
85    * @brief Characteristics of the communicator
86    */
87   enum GNUNET_TRANSPORT_CommunicatorCharacteristics c_characteristics;
88
89   /**
90    * @brief Specifies supported addresses
91    */
92   char *c_addr_prefix;
93
94   /**
95    * @brief Specifies supported addresses
96    */
97   char *c_address;
98
99   /**
100    * @brief Task to request the opening of a view
101    */
102   struct GNUNET_MQ_Envelope *open_queue_env;
103
104   /* Callbacks + Closures */
105   /**
106    * @brief Callback called when a new communicator connects
107    */
108   GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback communicator_available_cb;
109
110   /**
111    * @brief Callback called when a new communicator connects
112    */
113   GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb;
114
115   /**
116    * @brief Callback called when a new communicator connects
117    */
118   GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb;
119
120   /**
121    * @brief Callback called when a new communicator connects
122    */
123   GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb;
124
125   /**
126    * @brief Closure to the callback
127    */
128   void *cb_cls;
129 };
130
131
132 /**
133  * @brief Check whether incoming msg indicating available communicator is
134  * correct
135  *
136  * @param cls Closure
137  * @param msg Message struct
138  *
139  * @return GNUNET_YES in case message is correct
140  */
141 static int
142 check_communicator_available (void *cls,
143     const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *msg)
144 {
145   uint16_t size;
146
147   size = ntohs (msg->header.size) - sizeof (*msg);
148   if (0 == size)
149     return GNUNET_OK; /* receive-only communicator */
150   GNUNET_MQ_check_zero_termination (msg);
151   return GNUNET_OK;
152 }
153
154
155 /**
156  * @brief Handle new communicator
157  *
158  * @param cls Closure
159  * @param msg Message struct
160  */
161 static void
162 handle_communicator_available (void *cls,
163     const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *msg)
164 {
165   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
166   uint16_t size;
167
168   size = ntohs (msg->header.size) - sizeof (*msg);
169   if (0 == size)
170     return; /* receive-only communicator */
171   tc_h->c_characteristics = ntohl (msg->cc);
172   tc_h->c_addr_prefix = GNUNET_strdup ((const char *) &msg[1]);
173   if (NULL != tc_h->communicator_available_cb)
174   {
175     LOG (GNUNET_ERROR_TYPE_DEBUG,
176         "calling communicator_available_cb()\n");
177     tc_h->communicator_available_cb (tc_h->cb_cls,
178                                      tc_h,
179                                      tc_h->c_characteristics,
180                                      tc_h->c_addr_prefix);
181   }
182   GNUNET_SERVICE_client_continue (tc_h->client);
183 }
184
185
186 /**
187  * Address of our peer added.  Test message is well-formed.
188  *
189  * @param cls the client
190  * @param aam the send message that was sent
191  * @return #GNUNET_OK if message is well-formed
192  */
193 static int
194 check_add_address (void *cls,
195                    const struct GNUNET_TRANSPORT_AddAddressMessage *msg)
196 {
197   struct TransportClient *tc = cls;
198
199   //if (CT_COMMUNICATOR != tc->type)
200   //{
201   //  GNUNET_break (0);
202   //  return GNUNET_SYSERR;
203   //}
204   GNUNET_MQ_check_zero_termination (msg);
205   return GNUNET_OK;
206 }
207
208
209 static void
210 handle_add_address (void *cls,
211                     const struct GNUNET_TRANSPORT_AddAddressMessage *msg)
212 {
213   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
214   uint16_t size;
215
216   size = ntohs (msg->header.size) - sizeof (*msg);
217   if (0 == size)
218     return; /* receive-only communicator */
219   tc_h->c_address = GNUNET_strdup ((const char *) &msg[1]);
220   if (NULL != tc_h->add_address_cb)
221   {
222     LOG (GNUNET_ERROR_TYPE_DEBUG,
223         "calling communicator_available()\n");
224     tc_h->add_address_cb (tc_h->cb_cls,
225                           tc_h,
226                           tc_h->c_address,
227                           GNUNET_TIME_relative_ntoh (msg->expiration),
228                           msg->aid,
229                           ntohl (msg->nt));
230   }
231   GNUNET_SERVICE_client_continue (tc_h->client);
232 }
233
234
235 static void
236 handle_queue_create_ok (void *cls,
237                         const struct GNUNET_TRANSPORT_CreateQueueResponse *msg)
238 {
239   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
240
241   if (NULL != tc_h->queue_create_reply_cb)
242   {
243     tc_h->queue_create_reply_cb (tc_h->cb_cls,
244                             tc_h,
245                             GNUNET_YES);
246   }
247   GNUNET_SERVICE_client_continue (tc_h->client);
248 }
249
250
251 static void
252 handle_queue_create_fail (void *cls,
253                           const struct GNUNET_TRANSPORT_CreateQueueResponse *msg)
254 {
255   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
256
257   if (NULL != tc_h->queue_create_reply_cb)
258   {
259     tc_h->queue_create_reply_cb (tc_h->cb_cls,
260                             tc_h,
261                             GNUNET_NO);
262   }
263   GNUNET_SERVICE_client_continue (tc_h->client);
264 }
265
266
267 /**
268  * New queue became available.  Check message.
269  *
270  * @param cls the client
271  * @param aqm the send message that was sent
272  */
273 static int
274 check_add_queue_message (void *cls,
275                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
276 {
277   GNUNET_MQ_check_zero_termination (aqm);
278   return GNUNET_OK;
279 }
280
281
282 /**
283  * @brief Handle new communicator
284  *
285  * @param cls Closure
286  * @param msg Message struct
287  */
288 static void
289 handle_add_queue_message (void *cls,
290                           const struct GNUNET_TRANSPORT_AddQueueMessage *msg)
291 {
292   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
293
294   if (NULL != tc_h->add_queue_cb)
295   {
296     tc_h->add_queue_cb (tc_h->cb_cls,
297                         tc_h);
298   }
299   GNUNET_SERVICE_client_continue (tc_h->client);
300 }
301
302
303 /**
304  * @brief Shut down the service
305  *
306  * @param cls Closure - Handle to the service
307  */
308 static void
309 shutdown_service (void *cls)
310 {
311   struct GNUNET_SERVICE_Handle *h = cls;
312
313   GNUNET_SERVICE_stop (h);
314 }
315
316
317 /**
318  * @brief Callback called when new Client (Communicator) connects
319  *
320  * @param cls Closure - TransporCommmunicator Handle
321  * @param client Client
322  * @param mq Messagequeue
323  *
324  * @return TransportCommunicator Handle
325  */
326 static void *
327 connect_cb (void *cls,
328             struct GNUNET_SERVICE_Client *client,
329             struct GNUNET_MQ_Handle *mq)
330 {
331   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
332
333   LOG (GNUNET_ERROR_TYPE_DEBUG,
334       "Client connected.\n");
335   tc_h->client = client;
336   tc_h->c_mq = mq;
337
338   if (NULL != tc_h->open_queue_env)
339   {
340     GNUNET_MQ_send (tc_h->c_mq,
341                     tc_h->open_queue_env);
342     tc_h->open_queue_env = NULL;
343   }
344   return tc_h;
345 }
346
347
348 /**
349  * @brief Callback called when Client disconnects
350  *
351  * @param cls Closure - TransportCommunicator Handle
352  * @param client Client
353  * @param internal_cls TransporCommmunicator Handle
354  */
355 static void
356 disconnect_cb (void *cls,
357                struct GNUNET_SERVICE_Client *client,
358                void *internal_cls)
359 {
360   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
361
362   LOG (GNUNET_ERROR_TYPE_DEBUG,
363       "Client disconnected.\n");
364   tc_h->client = NULL;
365 }
366
367
368 /**
369  * @brief Start the communicator part of the transport service
370  *
371  * @param communicator_available Callback to be called when a new communicator
372  * becomes available
373  * @param cfg Configuration
374  */
375 static void
376 transport_communicator_start (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
377 {
378   struct GNUNET_MQ_MessageHandler mh[] = {
379     GNUNET_MQ_hd_var_size (communicator_available,
380         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
381         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
382         &tc_h),
383     //GNUNET_MQ_hd_var_size (communicator_backchannel,
384     //    GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
385     //    struct GNUNET_TRANSPORT_CommunicatorBackchannel,
386     //    NULL),
387     GNUNET_MQ_hd_var_size (add_address,
388         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
389         struct GNUNET_TRANSPORT_AddAddressMessage,
390         &tc_h),
391     //GNUNET_MQ_hd_fixed_size (del_address,
392     //                         GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
393     //                         struct GNUNET_TRANSPORT_DelAddressMessage,
394     //                         NULL),
395     //GNUNET_MQ_hd_var_size (incoming_msg,
396     //    GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
397     //    struct GNUNET_TRANSPORT_IncomingMessage,
398     //    NULL),
399     GNUNET_MQ_hd_fixed_size (queue_create_ok,
400           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
401           struct GNUNET_TRANSPORT_CreateQueueResponse,
402           tc_h),
403     GNUNET_MQ_hd_fixed_size (queue_create_fail,
404           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
405           struct GNUNET_TRANSPORT_CreateQueueResponse,
406           tc_h),
407     GNUNET_MQ_hd_var_size (add_queue_message,
408         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
409         struct GNUNET_TRANSPORT_AddQueueMessage,
410         NULL),
411     //GNUNET_MQ_hd_fixed_size (del_queue_message,
412     //                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
413     //                         struct GNUNET_TRANSPORT_DelQueueMessage,
414     //                         NULL),
415     //GNUNET_MQ_hd_fixed_size (send_message_ack,
416     //                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
417     //                         struct GNUNET_TRANSPORT_SendMessageToAck,
418     //                         NULL),
419   };
420   struct GNUNET_SERVICE_Handle *h;
421
422   h = GNUNET_SERVICE_start ("transport",
423                             tc_h->cfg,
424                             &connect_cb,
425                             &disconnect_cb,
426                             tc_h,
427                             mh);
428   if (NULL == h)
429     LOG (GNUNET_ERROR_TYPE_ERROR,
430          "Failed starting service!\n");
431   else
432   {
433     LOG (GNUNET_ERROR_TYPE_DEBUG,
434         "Started service\n");
435     /* TODO */ GNUNET_SCHEDULER_add_shutdown (&shutdown_service, h);
436   }
437 }
438
439
440 /**
441  * @brief Task run at shutdown to kill communicator and clean up
442  *
443  * @param cls Closure - Process of communicator
444  */
445 static void
446 shutdown_communicator (void *cls)
447 {
448   struct GNUNET_OS_Process *proc = cls;
449
450   if (GNUNET_OK != GNUNET_OS_process_kill (proc,
451                                            SIGTERM))
452   {
453     LOG (GNUNET_ERROR_TYPE_WARNING,
454         "Error shutting down communicator with SIGERM, trying SIGKILL\n");
455     if (GNUNET_OK != GNUNET_OS_process_kill (proc,
456                                              SIGKILL))
457     {
458       LOG (GNUNET_ERROR_TYPE_ERROR,
459           "Error shutting down communicator with SIGERM and SIGKILL\n");
460     }
461   }
462   GNUNET_OS_process_destroy (proc);
463 }
464
465
466 /**
467  * @brief Start the communicator
468  *
469  * @param cfgname Name of the communicator
470  */
471 static void
472 communicator_start (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
473 {
474   char *binary;
475
476   LOG (GNUNET_ERROR_TYPE_DEBUG,
477       "communicator_start\n");
478   binary = GNUNET_OS_get_libexec_binary_path ("gnunet-communicator-unix");
479   tc_h->c_proc =
480     GNUNET_OS_start_process (GNUNET_YES,
481                              GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
482                              NULL, NULL, NULL,
483                              binary,
484                              "./gnunet-communicator-unix",
485                              "-c",
486                              tc_h->cfg_filename,
487                              NULL);
488   if (NULL == tc_h->c_proc)
489   {
490     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
491                 "Failed to start communicator!");
492     return;
493   }
494   LOG (GNUNET_ERROR_TYPE_DEBUG,
495       "started communicator\n");
496   GNUNET_free (binary);
497   /* TODO */ GNUNET_SCHEDULER_add_shutdown (&shutdown_communicator,
498                                             tc_h->c_proc);
499 }
500
501
502 /**
503  * @brief Start communicator part of transport service and communicator
504  *
505  * @param service_name Name of the service
506  * @param cfg Configuration handle
507  * @param communicator_available_cb Callback that is called when a new
508  * @param add_address_cb Callback that is called when a new
509  * communicator becomes available
510  * @param cb_cls Closure to @a communicator_available_cb and @a
511  *
512  * @return Handle to the communicator duo
513  */
514 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *
515 GNUNET_TRANSPORT_TESTING_transport_communicator_service_start
516   (const char *service_name,
517    const char *cfg_filename,
518    GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback communicator_available_cb,
519    GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb,
520    GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb,
521    GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb,
522    void *cb_cls)
523 {
524   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h;
525
526   tc_h = GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle);
527   tc_h->cfg_filename = GNUNET_strdup (cfg_filename);
528   tc_h->cfg = GNUNET_CONFIGURATION_create ();
529   if ( (GNUNET_SYSERR ==
530         GNUNET_CONFIGURATION_load (tc_h->cfg,
531                                    cfg_filename)) )
532   {
533     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
534                 _("Malformed configuration file `%s', exit ...\n"),
535                   cfg_filename);
536     return NULL;
537   }
538   tc_h->communicator_available_cb = communicator_available_cb;
539   tc_h->add_address_cb = add_address_cb;
540   tc_h->queue_create_reply_cb = queue_create_reply_cb;
541   tc_h->add_queue_cb = add_queue_cb;
542   tc_h->cb_cls = cb_cls;
543
544   /* Start communicator part of service */
545   transport_communicator_start (tc_h);
546
547   /* Schedule start communicator */
548   communicator_start (tc_h);
549   return tc_h;
550 }
551
552
553 void
554 GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue
555   (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
556    const struct GNUNET_PeerIdentity *peer_id,
557    const char *address)
558 {
559   static uint32_t idgen;
560   char *prefix;
561   struct GNUNET_TRANSPORT_CreateQueue *msg;
562   struct GNUNET_MQ_Envelope *env;
563   size_t alen;
564
565   if (NULL != tc_h->open_queue_env)
566   {
567     // FIXME: handle multiple queue requests
568     return; /* Already waiting for opening of queue */
569   }
570   prefix = GNUNET_HELLO_address_to_prefix (address);
571   if (NULL == prefix)
572   {
573     GNUNET_break (0); /* We got an invalid address!? */
574     return;
575   }
576   alen = strlen (address) + 1;
577   env = GNUNET_MQ_msg_extra (msg,
578                              alen,
579                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
580   msg->request_id = htonl (idgen++);
581   msg->receiver = *peer_id;
582   memcpy (&msg[1],
583           address,
584           alen);
585   if (NULL != tc_h->c_mq)
586   {
587     GNUNET_MQ_send (tc_h->c_mq,
588                     env);
589   }
590   else
591   {
592     tc_h->open_queue_env = env;
593   }
594 }
595
596 //struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorTransmission *
597 //GNUNET_TRANSPORT_TESTING_transport_communicator_send
598 //  (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tcq,
599 //   const struct GNUNET_MessageHeader *hdr,
600 //   GNUNET_TRANSPORT_TESTING_SuccessStatus cb, void *cb_cls);
601