2 This file is part of GNUnet.
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file fs/gnunet-service-fs_cadet_server.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - PORT is set to old application type, unsure if we should keep
28 * it that way (fine for now)
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_cadet_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_cadet.h"
41 * After how long do we termiante idle connections?
43 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
47 * A message in the queue to be written to the cadet.
54 struct WriteQueueItem *next;
59 struct WriteQueueItem *prev;
62 * Number of bytes of payload, allocated at the end of this struct.
69 * Information we keep around for each active cadeting client.
76 struct CadetClient *next;
81 struct CadetClient *prev;
84 * Channel for communication.
86 struct GNUNET_CADET_Channel *channel;
89 * Handle for active write operation, or NULL.
91 struct GNUNET_CADET_TransmitHandle *wh;
94 * Head of write queue.
96 struct WriteQueueItem *wqi_head;
99 * Tail of write queue.
101 struct WriteQueueItem *wqi_tail;
104 * Current active request to the datastore, if we have one pending.
106 struct GNUNET_DATASTORE_QueueEntry *qe;
109 * Task that is scheduled to asynchronously terminate the connection.
111 struct GNUNET_SCHEDULER_Task * terminate_task;
114 * Task that is scheduled to terminate idle connections.
116 struct GNUNET_SCHEDULER_Task * timeout_task;
119 * Size of the last write that was initiated.
127 * Listen port for incoming requests.
129 static struct GNUNET_CADET_Port *cadet_port;
132 * Head of DLL of cadet clients.
134 static struct CadetClient *sc_head;
137 * Tail of DLL of cadet clients.
139 static struct CadetClient *sc_tail;
142 * Number of active cadet clients in the 'sc_*'-DLL.
144 static unsigned int sc_count;
147 * Maximum allowed number of cadet clients.
149 static unsigned long long sc_count_max;
154 * Task run to asynchronously terminate the cadet due to timeout.
156 * @param cls the 'struct CadetClient'
159 timeout_cadet_task (void *cls)
161 struct CadetClient *sc = cls;
162 struct GNUNET_CADET_Channel *tun;
164 sc->timeout_task = NULL;
167 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
168 "Timeout for inactive cadet client %p\n",
170 GNUNET_CADET_channel_destroy (tun);
175 * Reset the timeout for the cadet client (due to activity).
177 * @param sc client handle to reset timeout for
180 refresh_timeout_task (struct CadetClient *sc)
182 if (NULL != sc->timeout_task)
183 GNUNET_SCHEDULER_cancel (sc->timeout_task);
184 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
191 * Check if we are done with the write queue, and if so tell CADET
192 * that we are ready to read more.
194 * @param cls where to process the write queue
197 continue_writing (void *cls)
199 struct CadetClient *sc = cls;
200 struct GNUNET_MQ_Handle *mq;
202 mq = GNUNET_CADET_get_mq (sc->channel);
203 if (0 != GNUNET_MQ_get_length (mq))
205 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
206 "Write pending, waiting for it to complete\n");
209 refresh_timeout_task (sc);
210 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
211 "Finished processing cadet request from client %p, ready to receive the next one\n",
213 GNUNET_CADET_receive_done (sc->channel);
218 * Process a datum that was stored in the datastore.
220 * @param cls closure with the `struct CadetClient` which sent the query
221 * @param key key for the content
222 * @param size number of bytes in @a data
223 * @param data content stored
224 * @param type type of the content
225 * @param priority priority of the content
226 * @param anonymity anonymity-level for the content
227 * @param expiration expiration time for the content
228 * @param uid unique identifier for the datum;
229 * maybe 0 if no unique identifier is available
232 handle_datastore_reply (void *cls,
233 const struct GNUNET_HashCode *key,
236 enum GNUNET_BLOCK_Type type,
239 struct GNUNET_TIME_Absolute expiration,
242 struct CadetClient *sc = cls;
243 size_t msize = size + sizeof (struct CadetReplyMessage);
244 struct GNUNET_MQ_Envelope *env;
245 struct CadetReplyMessage *srm;
250 /* no result, this should not really happen, as for
251 non-anonymous routing only peers that HAVE the
252 answers should be queried; OTOH, this is not a
253 hard error as we might have had the answer in the
254 past and the user might have unindexed it. Hence
255 we log at level "INFO" for now. */
258 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
259 "Have no answer and the query was NULL\n");
263 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
264 "Have no answer for query `%s'\n",
267 GNUNET_STATISTICS_update (GSF_stats,
268 gettext_noop ("# queries received via CADET not answered"),
271 continue_writing (sc);
274 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
276 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
277 "Performing on-demand encoding for query %s\n",
280 GNUNET_FS_handle_on_demand_block (key,
288 &handle_datastore_reply,
291 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
292 "On-demand encoding request failed\n");
293 continue_writing (sc);
297 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
300 continue_writing (sc);
303 GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
304 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
305 "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n",
310 env = GNUNET_MQ_msg_extra (srm,
312 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
313 srm->type = htonl (type);
314 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
315 GNUNET_memcpy (&srm[1],
318 GNUNET_MQ_notify_sent (env,
321 GNUNET_STATISTICS_update (GSF_stats,
322 gettext_noop ("# Blocks transferred via cadet"),
325 GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel),
331 * Functions with this signature are called whenever a
332 * complete query message is received.
334 * @param cls closure with the `struct CadetClient`
335 * @param sqm the actual message
338 handle_request (void *cls,
339 const struct CadetQueryMessage *sqm)
341 struct CadetClient *sc = cls;
343 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
344 "Received query for `%s' via cadet from client %p\n",
345 GNUNET_h2s (&sqm->query),
347 GNUNET_STATISTICS_update (GSF_stats,
348 gettext_noop ("# queries received via cadet"),
351 refresh_timeout_task (sc);
352 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
357 GSF_datastore_queue_size,
358 &handle_datastore_reply,
362 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
363 "Queueing request with datastore failed (queue full?)\n");
364 continue_writing (sc);
370 * Functions of this type are called upon new cadet connection from other peers.
372 * @param cls the closure from GNUNET_CADET_connect
373 * @param channel the channel representing the cadet
374 * @param initiator the identity of the peer who wants to establish a cadet
375 * with us; NULL on binding error
376 * @return initial channel context (our `struct CadetClient`)
379 connect_cb (void *cls,
380 struct GNUNET_CADET_Channel *channel,
381 const struct GNUNET_PeerIdentity *initiator)
383 struct CadetClient *sc;
385 GNUNET_assert (NULL != channel);
386 if (sc_count >= sc_count_max)
388 GNUNET_STATISTICS_update (GSF_stats,
389 gettext_noop ("# cadet client connections rejected"),
392 GNUNET_CADET_channel_destroy (channel);
395 GNUNET_STATISTICS_update (GSF_stats,
396 gettext_noop ("# cadet connections active"),
399 sc = GNUNET_new (struct CadetClient);
400 sc->channel = channel;
401 GNUNET_CONTAINER_DLL_insert (sc_head,
405 refresh_timeout_task (sc);
406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
407 "Accepting inbound cadet connection from `%s' as client %p\n",
408 GNUNET_i2s (initiator),
415 * Function called by cadet when a client disconnects.
416 * Cleans up our `struct CadetClient` of that channel.
418 * @param cls our `struct CadetClient`
419 * @param channel channel of the disconnecting client
423 disconnect_cb (void *cls,
424 const struct GNUNET_CADET_Channel *channel)
426 struct CadetClient *sc = cls;
427 struct WriteQueueItem *wqi;
432 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433 "Terminating cadet connection with client %p\n",
435 GNUNET_STATISTICS_update (GSF_stats,
436 gettext_noop ("# cadet connections active"), -1,
438 if (NULL != sc->terminate_task)
439 GNUNET_SCHEDULER_cancel (sc->terminate_task);
440 if (NULL != sc->timeout_task)
441 GNUNET_SCHEDULER_cancel (sc->timeout_task);
443 GNUNET_CADET_notify_transmit_ready_cancel (sc->wh);
445 GNUNET_DATASTORE_cancel (sc->qe);
446 while (NULL != (wqi = sc->wqi_head))
448 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
453 GNUNET_CONTAINER_DLL_remove (sc_head,
463 * Function called whenever an MQ-channel's transmission window size changes.
465 * The first callback in an outgoing channel will be with a non-zero value
466 * and will mean the channel is connected to the destination.
468 * For an incoming channel it will be called immediately after the
469 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
471 * @param cls Channel closure.
472 * @param channel Connection to the other end (henceforth invalid).
473 * @param window_size New window size. If the is more messages than buffer size
474 * this value will be negative..
477 window_change_cb (void *cls,
478 const struct GNUNET_CADET_Channel *channel,
481 /* FIXME: could do flow control here... */
486 * Initialize subsystem for non-anonymous file-sharing.
489 GSF_cadet_start_server ()
491 struct GNUNET_MQ_MessageHandler handlers[] = {
492 GNUNET_MQ_hd_fixed_size (request,
493 GNUNET_MESSAGE_TYPE_FS_CADET_QUERY,
494 struct CadetQueryMessage,
496 GNUNET_MQ_handler_end ()
498 struct GNUNET_HashCode port;
501 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507 "Initializing cadet FS server with a limit of %llu connections\n",
509 cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
510 cadet_handle = GNUNET_CADET_connecT (GSF_cfg);
511 GNUNET_assert (NULL != cadet_handle);
512 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
513 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
515 cadet_port = GNUNET_CADET_open_porT (cadet_handle,
526 * Shutdown subsystem for non-anonymous file-sharing.
529 GSF_cadet_stop_server ()
531 GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
532 &GSF_cadet_release_clients,
534 GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
536 if (NULL != cadet_port)
538 GNUNET_CADET_close_port (cadet_port);
541 if (NULL != cadet_handle)
543 GNUNET_CADET_disconnect (cadet_handle);
546 GNUNET_assert (NULL == sc_head);
547 GNUNET_assert (0 == sc_count);
550 /* end of gnunet-service-fs_cadet.c */