2 This file is part of GNUnet.
3 Copyright (C) 2012, 2013, 2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
17 * @file fs/gnunet-service-fs_cadet_server.c
18 * @brief non-anonymous file-transfer
19 * @author Christian Grothoff
22 * - PORT is set to old application type, unsure if we should keep
23 * it that way (fine for now)
26 #include "gnunet_constants.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet-service-fs.h"
32 #include "gnunet-service-fs_indexing.h"
33 #include "gnunet-service-fs_cadet.h"
36 * After how long do we termiante idle connections?
38 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
42 * A message in the queue to be written to the cadet.
49 struct WriteQueueItem *next;
54 struct WriteQueueItem *prev;
57 * Number of bytes of payload, allocated at the end of this struct.
64 * Information we keep around for each active cadeting client.
71 struct CadetClient *next;
76 struct CadetClient *prev;
79 * Channel for communication.
81 struct GNUNET_CADET_Channel *channel;
84 * Head of write queue.
86 struct WriteQueueItem *wqi_head;
89 * Tail of write queue.
91 struct WriteQueueItem *wqi_tail;
94 * Current active request to the datastore, if we have one pending.
96 struct GNUNET_DATASTORE_QueueEntry *qe;
99 * Task that is scheduled to asynchronously terminate the connection.
101 struct GNUNET_SCHEDULER_Task * terminate_task;
104 * Task that is scheduled to terminate idle connections.
106 struct GNUNET_SCHEDULER_Task * timeout_task;
109 * Size of the last write that was initiated.
117 * Listen port for incoming requests.
119 static struct GNUNET_CADET_Port *cadet_port;
122 * Head of DLL of cadet clients.
124 static struct CadetClient *sc_head;
127 * Tail of DLL of cadet clients.
129 static struct CadetClient *sc_tail;
132 * Number of active cadet clients in the 'sc_*'-DLL.
134 static unsigned int sc_count;
137 * Maximum allowed number of cadet clients.
139 static unsigned long long sc_count_max;
144 * Task run to asynchronously terminate the cadet due to timeout.
146 * @param cls the 'struct CadetClient'
149 timeout_cadet_task (void *cls)
151 struct CadetClient *sc = cls;
152 struct GNUNET_CADET_Channel *tun;
154 sc->timeout_task = NULL;
157 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
158 "Timeout for inactive cadet client %p\n",
160 GNUNET_CADET_channel_destroy (tun);
165 * Reset the timeout for the cadet client (due to activity).
167 * @param sc client handle to reset timeout for
170 refresh_timeout_task (struct CadetClient *sc)
172 if (NULL != sc->timeout_task)
173 GNUNET_SCHEDULER_cancel (sc->timeout_task);
174 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
181 * Check if we are done with the write queue, and if so tell CADET
182 * that we are ready to read more.
184 * @param cls where to process the write queue
187 continue_writing (void *cls)
189 struct CadetClient *sc = cls;
190 struct GNUNET_MQ_Handle *mq;
192 mq = GNUNET_CADET_get_mq (sc->channel);
193 if (0 != GNUNET_MQ_get_length (mq))
195 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
196 "Write pending, waiting for it to complete\n");
199 refresh_timeout_task (sc);
200 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
201 "Finished processing cadet request from client %p, ready to receive the next one\n",
203 GNUNET_CADET_receive_done (sc->channel);
208 * Process a datum that was stored in the datastore.
210 * @param cls closure with the `struct CadetClient` which sent the query
211 * @param key key for the content
212 * @param size number of bytes in @a data
213 * @param data content stored
214 * @param type type of the content
215 * @param priority priority of the content
216 * @param anonymity anonymity-level for the content
217 * @param replication replication-level for the content
218 * @param expiration expiration time for the content
219 * @param uid unique identifier for the datum;
220 * maybe 0 if no unique identifier is available
223 handle_datastore_reply (void *cls,
224 const struct GNUNET_HashCode *key,
227 enum GNUNET_BLOCK_Type type,
230 uint32_t replication,
231 struct GNUNET_TIME_Absolute expiration,
234 struct CadetClient *sc = cls;
235 size_t msize = size + sizeof (struct CadetReplyMessage);
236 struct GNUNET_MQ_Envelope *env;
237 struct CadetReplyMessage *srm;
242 /* no result, this should not really happen, as for
243 non-anonymous routing only peers that HAVE the
244 answers should be queried; OTOH, this is not a
245 hard error as we might have had the answer in the
246 past and the user might have unindexed it. Hence
247 we log at level "INFO" for now. */
250 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
251 "Have no answer and the query was NULL\n");
255 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
256 "Have no answer for query `%s'\n",
259 GNUNET_STATISTICS_update (GSF_stats,
260 gettext_noop ("# queries received via CADET not answered"),
263 continue_writing (sc);
266 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
268 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
269 "Performing on-demand encoding for query %s\n",
272 GNUNET_FS_handle_on_demand_block (key,
281 &handle_datastore_reply,
284 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
285 "On-demand encoding request failed\n");
286 continue_writing (sc);
290 if (msize > GNUNET_MAX_MESSAGE_SIZE)
293 continue_writing (sc);
296 GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
298 "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n",
303 env = GNUNET_MQ_msg_extra (srm,
305 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
306 srm->type = htonl (type);
307 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
308 GNUNET_memcpy (&srm[1],
311 GNUNET_MQ_notify_sent (env,
314 GNUNET_STATISTICS_update (GSF_stats,
315 gettext_noop ("# Blocks transferred via cadet"),
318 GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel),
324 * Functions with this signature are called whenever a
325 * complete query message is received.
327 * @param cls closure with the `struct CadetClient`
328 * @param sqm the actual message
331 handle_request (void *cls,
332 const struct CadetQueryMessage *sqm)
334 struct CadetClient *sc = cls;
336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
337 "Received query for `%s' via cadet from client %p\n",
338 GNUNET_h2s (&sqm->query),
340 GNUNET_STATISTICS_update (GSF_stats,
341 gettext_noop ("# queries received via cadet"),
344 refresh_timeout_task (sc);
345 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
351 GSF_datastore_queue_size,
352 &handle_datastore_reply,
356 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
357 "Queueing request with datastore failed (queue full?)\n");
358 continue_writing (sc);
364 * Functions of this type are called upon new cadet connection from other peers.
366 * @param cls the closure from GNUNET_CADET_connect
367 * @param channel the channel representing the cadet
368 * @param initiator the identity of the peer who wants to establish a cadet
369 * with us; NULL on binding error
370 * @return initial channel context (our `struct CadetClient`)
373 connect_cb (void *cls,
374 struct GNUNET_CADET_Channel *channel,
375 const struct GNUNET_PeerIdentity *initiator)
377 struct CadetClient *sc;
379 GNUNET_assert (NULL != channel);
380 if (sc_count >= sc_count_max)
382 GNUNET_STATISTICS_update (GSF_stats,
383 gettext_noop ("# cadet client connections rejected"),
386 GNUNET_CADET_channel_destroy (channel);
389 GNUNET_STATISTICS_update (GSF_stats,
390 gettext_noop ("# cadet connections active"),
393 sc = GNUNET_new (struct CadetClient);
394 sc->channel = channel;
395 GNUNET_CONTAINER_DLL_insert (sc_head,
399 refresh_timeout_task (sc);
400 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
401 "Accepting inbound cadet connection from `%s' as client %p\n",
402 GNUNET_i2s (initiator),
409 * Function called by cadet when a client disconnects.
410 * Cleans up our `struct CadetClient` of that channel.
412 * @param cls our `struct CadetClient`
413 * @param channel channel of the disconnecting client
417 disconnect_cb (void *cls,
418 const struct GNUNET_CADET_Channel *channel)
420 struct CadetClient *sc = cls;
421 struct WriteQueueItem *wqi;
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427 "Terminating cadet connection with client %p\n",
429 GNUNET_STATISTICS_update (GSF_stats,
430 gettext_noop ("# cadet connections active"), -1,
432 if (NULL != sc->terminate_task)
433 GNUNET_SCHEDULER_cancel (sc->terminate_task);
434 if (NULL != sc->timeout_task)
435 GNUNET_SCHEDULER_cancel (sc->timeout_task);
437 GNUNET_DATASTORE_cancel (sc->qe);
438 while (NULL != (wqi = sc->wqi_head))
440 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
445 GNUNET_CONTAINER_DLL_remove (sc_head,
454 * Function called whenever an MQ-channel's transmission window size changes.
456 * The first callback in an outgoing channel will be with a non-zero value
457 * and will mean the channel is connected to the destination.
459 * For an incoming channel it will be called immediately after the
460 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
462 * @param cls Channel closure.
463 * @param channel Connection to the other end (henceforth invalid).
464 * @param window_size New window size. If the is more messages than buffer size
465 * this value will be negative..
468 window_change_cb (void *cls,
469 const struct GNUNET_CADET_Channel *channel,
472 /* FIXME: could do flow control here... */
477 * Initialize subsystem for non-anonymous file-sharing.
480 GSF_cadet_start_server ()
482 struct GNUNET_MQ_MessageHandler handlers[] = {
483 GNUNET_MQ_hd_fixed_size (request,
484 GNUNET_MESSAGE_TYPE_FS_CADET_QUERY,
485 struct CadetQueryMessage,
487 GNUNET_MQ_handler_end ()
489 struct GNUNET_HashCode port;
492 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
497 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498 "Initializing cadet FS server with a limit of %llu connections\n",
500 cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
501 cadet_handle = GNUNET_CADET_connect (GSF_cfg);
502 GNUNET_assert (NULL != cadet_handle);
503 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
504 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
506 cadet_port = GNUNET_CADET_open_port (cadet_handle,
517 * Shutdown subsystem for non-anonymous file-sharing.
520 GSF_cadet_stop_server ()
522 GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
523 &GSF_cadet_release_clients,
525 GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
527 if (NULL != cadet_port)
529 GNUNET_CADET_close_port (cadet_port);
532 if (NULL != cadet_handle)
534 GNUNET_CADET_disconnect (cadet_handle);
537 GNUNET_assert (NULL == sc_head);
538 GNUNET_assert (0 == sc_count);
541 /* end of gnunet-service-fs_cadet.c */