tolerate additional IPv4 address now available for gnunet.org
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cadet_server.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cadet_server.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - PORT is set to old application type, unsure if we should keep
28  *   it that way (fine for now)
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_cadet_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_cadet.h"
39
40 /**
41  * After how long do we termiante idle connections?
42  */
43 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
44
45
46 /**
47  * A message in the queue to be written to the cadet.
48  */
49 struct WriteQueueItem
50 {
51   /**
52    * Kept in a DLL.
53    */
54   struct WriteQueueItem *next;
55
56   /**
57    * Kept in a DLL.
58    */
59   struct WriteQueueItem *prev;
60
61   /**
62    * Number of bytes of payload, allocated at the end of this struct.
63    */
64   size_t msize;
65 };
66
67
68 /**
69  * Information we keep around for each active cadeting client.
70  */
71 struct CadetClient
72 {
73   /**
74    * DLL
75    */
76   struct CadetClient *next;
77
78   /**
79    * DLL
80    */
81   struct CadetClient *prev;
82
83   /**
84    * Channel for communication.
85    */
86   struct GNUNET_CADET_Channel *channel;
87
88   /**
89    * Head of write queue.
90    */
91   struct WriteQueueItem *wqi_head;
92
93   /**
94    * Tail of write queue.
95    */
96   struct WriteQueueItem *wqi_tail;
97
98   /**
99    * Current active request to the datastore, if we have one pending.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *qe;
102
103   /**
104    * Task that is scheduled to asynchronously terminate the connection.
105    */
106   struct GNUNET_SCHEDULER_Task * terminate_task;
107
108   /**
109    * Task that is scheduled to terminate idle connections.
110    */
111   struct GNUNET_SCHEDULER_Task * timeout_task;
112
113   /**
114    * Size of the last write that was initiated.
115    */
116   size_t reply_size;
117
118 };
119
120
121 /**
122  * Listen port for incoming requests.
123  */
124 static struct GNUNET_CADET_Port *cadet_port;
125
126 /**
127  * Head of DLL of cadet clients.
128  */
129 static struct CadetClient *sc_head;
130
131 /**
132  * Tail of DLL of cadet clients.
133  */
134 static struct CadetClient *sc_tail;
135
136 /**
137  * Number of active cadet clients in the 'sc_*'-DLL.
138  */
139 static unsigned int sc_count;
140
141 /**
142  * Maximum allowed number of cadet clients.
143  */
144 static unsigned long long sc_count_max;
145
146
147
148 /**
149  * Task run to asynchronously terminate the cadet due to timeout.
150  *
151  * @param cls the 'struct CadetClient'
152  */
153 static void
154 timeout_cadet_task (void *cls)
155 {
156   struct CadetClient *sc = cls;
157   struct GNUNET_CADET_Channel *tun;
158
159   sc->timeout_task = NULL;
160   tun = sc->channel;
161   sc->channel = NULL;
162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
163               "Timeout for inactive cadet client %p\n",
164               sc);
165   GNUNET_CADET_channel_destroy (tun);
166 }
167
168
169 /**
170  * Reset the timeout for the cadet client (due to activity).
171  *
172  * @param sc client handle to reset timeout for
173  */
174 static void
175 refresh_timeout_task (struct CadetClient *sc)
176 {
177   if (NULL != sc->timeout_task)
178     GNUNET_SCHEDULER_cancel (sc->timeout_task);
179   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
180                                                    &timeout_cadet_task,
181                                                    sc);
182 }
183
184
185 /**
186  * Check if we are done with the write queue, and if so tell CADET
187  * that we are ready to read more.
188  *
189  * @param cls where to process the write queue
190  */
191 static void
192 continue_writing (void *cls)
193 {
194   struct CadetClient *sc = cls;
195   struct GNUNET_MQ_Handle *mq;
196
197   mq = GNUNET_CADET_get_mq (sc->channel);
198   if (0 != GNUNET_MQ_get_length (mq))
199   {
200     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
201                 "Write pending, waiting for it to complete\n");
202     return;
203   }
204   refresh_timeout_task (sc);
205   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
206               "Finished processing cadet request from client %p, ready to receive the next one\n",
207               sc);
208   GNUNET_CADET_receive_done (sc->channel);
209 }
210
211
212 /**
213  * Process a datum that was stored in the datastore.
214  *
215  * @param cls closure with the `struct CadetClient` which sent the query
216  * @param key key for the content
217  * @param size number of bytes in @a data
218  * @param data content stored
219  * @param type type of the content
220  * @param priority priority of the content
221  * @param anonymity anonymity-level for the content
222  * @param replication replication-level for the content
223  * @param expiration expiration time for the content
224  * @param uid unique identifier for the datum;
225  *        maybe 0 if no unique identifier is available
226  */
227 static void
228 handle_datastore_reply (void *cls,
229                         const struct GNUNET_HashCode *key,
230                         size_t size,
231                         const void *data,
232                         enum GNUNET_BLOCK_Type type,
233                         uint32_t priority,
234                         uint32_t anonymity,
235                         uint32_t replication,
236                         struct GNUNET_TIME_Absolute expiration,
237                         uint64_t uid)
238 {
239   struct CadetClient *sc = cls;
240   size_t msize = size + sizeof (struct CadetReplyMessage);
241   struct GNUNET_MQ_Envelope *env;
242   struct CadetReplyMessage *srm;
243
244   sc->qe = NULL;
245   if (NULL == data)
246   {
247     /* no result, this should not really happen, as for
248        non-anonymous routing only peers that HAVE the
249        answers should be queried; OTOH, this is not a
250        hard error as we might have had the answer in the
251        past and the user might have unindexed it. Hence
252        we log at level "INFO" for now. */
253     if (NULL == key)
254     {
255       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
256                   "Have no answer and the query was NULL\n");
257     }
258     else
259     {
260       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
261                   "Have no answer for query `%s'\n",
262                   GNUNET_h2s (key));
263     }
264     GNUNET_STATISTICS_update (GSF_stats,
265                               gettext_noop ("# queries received via CADET not answered"),
266                               1,
267                               GNUNET_NO);
268     continue_writing (sc);
269     return;
270   }
271   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
272   {
273     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
274                 "Performing on-demand encoding for query %s\n",
275                 GNUNET_h2s (key));
276     if (GNUNET_OK !=
277         GNUNET_FS_handle_on_demand_block (key,
278                                     size,
279                                     data,
280                                     type,
281                                     priority,
282                                     anonymity,
283                                     replication,
284                                     expiration,
285                                     uid,
286                                     &handle_datastore_reply,
287                                     sc))
288     {
289       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
290                   "On-demand encoding request failed\n");
291       continue_writing (sc);
292     }
293     return;
294   }
295   if (msize > GNUNET_MAX_MESSAGE_SIZE)
296   {
297     GNUNET_break (0);
298     continue_writing (sc);
299     return;
300   }
301   GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
302   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
303               "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n",
304               (unsigned int) size,
305               (unsigned int) type,
306               GNUNET_h2s (key),
307               sc);
308   env = GNUNET_MQ_msg_extra (srm,
309                              size,
310                              GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
311   srm->type = htonl (type);
312   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
313   GNUNET_memcpy (&srm[1],
314                  data,
315                  size);
316   GNUNET_MQ_notify_sent (env,
317                          &continue_writing,
318                          sc);
319   GNUNET_STATISTICS_update (GSF_stats,
320                             gettext_noop ("# Blocks transferred via cadet"),
321                             1,
322                             GNUNET_NO);
323   GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel),
324                   env);
325 }
326
327
328 /**
329  * Functions with this signature are called whenever a
330  * complete query message is received.
331  *
332  * @param cls closure with the `struct CadetClient`
333  * @param sqm the actual message
334  */
335 static void
336 handle_request (void *cls,
337                 const struct CadetQueryMessage *sqm)
338 {
339   struct CadetClient *sc = cls;
340
341   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
342               "Received query for `%s' via cadet from client %p\n",
343               GNUNET_h2s (&sqm->query),
344               sc);
345   GNUNET_STATISTICS_update (GSF_stats,
346                             gettext_noop ("# queries received via cadet"),
347                             1,
348                             GNUNET_NO);
349   refresh_timeout_task (sc);
350   sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
351                                      0 /* next_uid */,
352                                      false /* random */,
353                                      &sqm->query,
354                                      ntohl (sqm->type),
355                                      0 /* priority */,
356                                      GSF_datastore_queue_size,
357                                      &handle_datastore_reply,
358                                      sc);
359   if (NULL == sc->qe)
360   {
361     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
362                 "Queueing request with datastore failed (queue full?)\n");
363     continue_writing (sc);
364   }
365 }
366
367
368 /**
369  * Functions of this type are called upon new cadet connection from other peers.
370  *
371  * @param cls the closure from GNUNET_CADET_connect
372  * @param channel the channel representing the cadet
373  * @param initiator the identity of the peer who wants to establish a cadet
374  *            with us; NULL on binding error
375  * @return initial channel context (our `struct CadetClient`)
376  */
377 static void *
378 connect_cb (void *cls,
379             struct GNUNET_CADET_Channel *channel,
380             const struct GNUNET_PeerIdentity *initiator)
381 {
382   struct CadetClient *sc;
383
384   GNUNET_assert (NULL != channel);
385   if (sc_count >= sc_count_max)
386   {
387     GNUNET_STATISTICS_update (GSF_stats,
388                               gettext_noop ("# cadet client connections rejected"),
389                               1,
390                               GNUNET_NO);
391     GNUNET_CADET_channel_destroy (channel);
392     return NULL;
393   }
394   GNUNET_STATISTICS_update (GSF_stats,
395                             gettext_noop ("# cadet connections active"),
396                             1,
397                             GNUNET_NO);
398   sc = GNUNET_new (struct CadetClient);
399   sc->channel = channel;
400   GNUNET_CONTAINER_DLL_insert (sc_head,
401                                sc_tail,
402                                sc);
403   sc_count++;
404   refresh_timeout_task (sc);
405   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
406               "Accepting inbound cadet connection from `%s' as client %p\n",
407               GNUNET_i2s (initiator),
408               sc);
409   return sc;
410 }
411
412
413 /**
414  * Function called by cadet when a client disconnects.
415  * Cleans up our `struct CadetClient` of that channel.
416  *
417  * @param cls  our `struct CadetClient`
418  * @param channel channel of the disconnecting client
419  * @param channel_ctx
420  */
421 static void
422 disconnect_cb (void *cls,
423                const struct GNUNET_CADET_Channel *channel)
424 {
425   struct CadetClient *sc = cls;
426   struct WriteQueueItem *wqi;
427
428   if (NULL == sc)
429     return;
430   sc->channel = NULL;
431   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
432               "Terminating cadet connection with client %p\n",
433               sc);
434   GNUNET_STATISTICS_update (GSF_stats,
435                             gettext_noop ("# cadet connections active"), -1,
436                             GNUNET_NO);
437   if (NULL != sc->terminate_task)
438     GNUNET_SCHEDULER_cancel (sc->terminate_task);
439   if (NULL != sc->timeout_task)
440     GNUNET_SCHEDULER_cancel (sc->timeout_task);
441   if (NULL != sc->qe)
442     GNUNET_DATASTORE_cancel (sc->qe);
443   while (NULL != (wqi = sc->wqi_head))
444   {
445     GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
446                                  sc->wqi_tail,
447                                  wqi);
448     GNUNET_free (wqi);
449   }
450   GNUNET_CONTAINER_DLL_remove (sc_head,
451                                sc_tail,
452                                sc);
453   sc_count--;
454   GNUNET_free (sc);
455 }
456
457
458 /**
459  * Function called whenever an MQ-channel's transmission window size changes.
460  *
461  * The first callback in an outgoing channel will be with a non-zero value
462  * and will mean the channel is connected to the destination.
463  *
464  * For an incoming channel it will be called immediately after the
465  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
466  *
467  * @param cls Channel closure.
468  * @param channel Connection to the other end (henceforth invalid).
469  * @param window_size New window size. If the is more messages than buffer size
470  *                    this value will be negative..
471  */
472 static void
473 window_change_cb (void *cls,
474                   const struct GNUNET_CADET_Channel *channel,
475                   int window_size)
476 {
477   /* FIXME: could do flow control here... */
478 }
479
480
481 /**
482  * Initialize subsystem for non-anonymous file-sharing.
483  */
484 void
485 GSF_cadet_start_server ()
486 {
487   struct GNUNET_MQ_MessageHandler handlers[] = {
488     GNUNET_MQ_hd_fixed_size (request,
489                              GNUNET_MESSAGE_TYPE_FS_CADET_QUERY,
490                              struct CadetQueryMessage,
491                              NULL),
492     GNUNET_MQ_handler_end ()
493   };
494   struct GNUNET_HashCode port;
495
496   if (GNUNET_YES !=
497       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
498                                              "fs",
499                                              "MAX_CADET_CLIENTS",
500                                              &sc_count_max))
501     return;
502   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
503               "Initializing cadet FS server with a limit of %llu connections\n",
504               sc_count_max);
505   cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
506   cadet_handle = GNUNET_CADET_connect (GSF_cfg);
507   GNUNET_assert (NULL != cadet_handle);
508   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
509                       strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
510                       &port);
511   cadet_port = GNUNET_CADET_open_port (cadet_handle,
512                                        &port,
513                                        &connect_cb,
514                                        NULL,
515                                        &window_change_cb,
516                                        &disconnect_cb,
517                                        handlers);
518 }
519
520
521 /**
522  * Shutdown subsystem for non-anonymous file-sharing.
523  */
524 void
525 GSF_cadet_stop_server ()
526 {
527   GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
528                                          &GSF_cadet_release_clients,
529                                          NULL);
530   GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
531   cadet_map = NULL;
532   if (NULL != cadet_port)
533   {
534     GNUNET_CADET_close_port (cadet_port);
535     cadet_port = NULL;
536   }
537   if (NULL != cadet_handle)
538   {
539     GNUNET_CADET_disconnect (cadet_handle);
540     cadet_handle = NULL;
541   }
542   GNUNET_assert (NULL == sc_head);
543   GNUNET_assert (0 == sc_count);
544 }
545
546 /* end of gnunet-service-fs_cadet.c */