2 This file is part of GNUnet.
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file fs/gnunet-service-fs_cadet_server.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - PORT is set to old application type, unsure if we should keep
28 * it that way (fine for now)
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_cadet_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_cadet.h"
41 * After how long do we termiante idle connections?
43 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
47 * A message in the queue to be written to the cadet.
54 struct WriteQueueItem *next;
59 struct WriteQueueItem *prev;
62 * Number of bytes of payload, allocated at the end of this struct.
69 * Information we keep around for each active cadeting client.
76 struct CadetClient *next;
81 struct CadetClient *prev;
84 * Channel for communication.
86 struct GNUNET_CADET_Channel *channel;
89 * Handle for active write operation, or NULL.
91 struct GNUNET_CADET_TransmitHandle *wh;
94 * Head of write queue.
96 struct WriteQueueItem *wqi_head;
99 * Tail of write queue.
101 struct WriteQueueItem *wqi_tail;
104 * Current active request to the datastore, if we have one pending.
106 struct GNUNET_DATASTORE_QueueEntry *qe;
109 * Task that is scheduled to asynchronously terminate the connection.
111 struct GNUNET_SCHEDULER_Task * terminate_task;
114 * Task that is scheduled to terminate idle connections.
116 struct GNUNET_SCHEDULER_Task * timeout_task;
119 * Size of the last write that was initiated.
127 * Listen channel for incoming requests.
129 static struct GNUNET_CADET_Handle *listen_channel;
132 * Head of DLL of cadet clients.
134 static struct CadetClient *sc_head;
137 * Tail of DLL of cadet clients.
139 static struct CadetClient *sc_tail;
142 * Number of active cadet clients in the 'sc_*'-DLL.
144 static unsigned int sc_count;
147 * Maximum allowed number of cadet clients.
149 static unsigned long long sc_count_max;
154 * Task run to asynchronously terminate the cadet due to timeout.
156 * @param cls the 'struct CadetClient'
159 timeout_cadet_task (void *cls)
161 struct CadetClient *sc = cls;
162 struct GNUNET_CADET_Channel *tun;
164 sc->timeout_task = NULL;
167 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
168 "Timeout for inactive cadet client %p\n",
170 GNUNET_CADET_channel_destroy (tun);
175 * Reset the timeout for the cadet client (due to activity).
177 * @param sc client handle to reset timeout for
180 refresh_timeout_task (struct CadetClient *sc)
182 if (NULL != sc->timeout_task)
183 GNUNET_SCHEDULER_cancel (sc->timeout_task);
184 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
191 * We're done handling a request from a client, read the next one.
193 * @param sc client to continue reading requests from
196 continue_reading (struct CadetClient *sc)
198 refresh_timeout_task (sc);
199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
200 "Finished processing cadet request from client %p, ready to receive the next one\n",
202 GNUNET_CADET_receive_done (sc->channel);
207 * Transmit the next entry from the write queue.
209 * @param sc where to process the write queue
212 continue_writing (struct CadetClient *sc);
216 * Send a reply now, cadet is ready.
218 * @param cls closure with the `struct CadetClient` which sent the query
219 * @param size number of bytes available in @a buf
220 * @param buf where to write the message
221 * @return number of bytes written to @a buf
224 write_continuation (void *cls,
228 struct CadetClient *sc = cls;
229 struct GNUNET_CADET_Channel *tun;
230 struct WriteQueueItem *wqi;
234 if (NULL == (wqi = sc->wqi_head))
236 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
237 "Write queue empty, reading more requests\n");
241 (size < wqi->msize) )
243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244 "Transmission of reply failed, terminating cadet\n");
247 GNUNET_CADET_channel_destroy (tun);
250 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
254 "Transmitted %u byte reply via cadet to %p\n",
257 GNUNET_STATISTICS_update (GSF_stats,
258 gettext_noop ("# Blocks transferred via cadet"), 1,
260 GNUNET_memcpy (buf, &wqi[1], ret);
263 continue_writing (sc);
269 * Transmit the next entry from the write queue.
271 * @param sc where to process the write queue
274 continue_writing (struct CadetClient *sc)
276 struct WriteQueueItem *wqi;
277 struct GNUNET_CADET_Channel *tun;
281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
282 "Write pending, waiting for it to complete\n");
283 return; /* write already pending */
285 if (NULL == (wqi = sc->wqi_head))
287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
288 "Write queue empty, reading more requests\n");
289 continue_reading (sc);
292 sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO,
293 GNUNET_TIME_UNIT_FOREVER_REL,
299 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
300 "Write failed; terminating cadet\n");
303 GNUNET_CADET_channel_destroy (tun);
310 * Process a datum that was stored in the datastore.
312 * @param cls closure with the `struct CadetClient` which sent the query
313 * @param key key for the content
314 * @param size number of bytes in @a data
315 * @param data content stored
316 * @param type type of the content
317 * @param priority priority of the content
318 * @param anonymity anonymity-level for the content
319 * @param expiration expiration time for the content
320 * @param uid unique identifier for the datum;
321 * maybe 0 if no unique identifier is available
324 handle_datastore_reply (void *cls,
325 const struct GNUNET_HashCode *key,
328 enum GNUNET_BLOCK_Type type,
331 struct GNUNET_TIME_Absolute expiration,
334 struct CadetClient *sc = cls;
335 size_t msize = size + sizeof (struct CadetReplyMessage);
336 struct WriteQueueItem *wqi;
337 struct CadetReplyMessage *srm;
342 /* no result, this should not really happen, as for
343 non-anonymous routing only peers that HAVE the
344 answers should be queried; OTOH, this is not a
345 hard error as we might have had the answer in the
346 past and the user might have unindexed it. Hence
347 we log at level "INFO" for now. */
350 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
351 "Have no answer and the query was NULL\n");
355 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
356 "Have no answer for query `%s'\n",
359 GNUNET_STATISTICS_update (GSF_stats,
360 gettext_noop ("# queries received via CADET not answered"), 1,
362 continue_writing (sc);
365 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368 "Performing on-demand encoding for query %s\n",
371 GNUNET_FS_handle_on_demand_block (key,
375 &handle_datastore_reply,
378 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
379 "On-demand encoding request failed\n");
380 continue_writing (sc);
384 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
387 continue_writing (sc);
390 GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392 "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n",
397 wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
399 srm = (struct CadetReplyMessage *) &wqi[1];
400 srm->header.size = htons ((uint16_t) msize);
401 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
402 srm->type = htonl (type);
403 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
404 GNUNET_memcpy (&srm[1], data, size);
405 sc->reply_size = msize;
406 GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
409 continue_writing (sc);
414 * Functions with this signature are called whenever a
415 * complete query message is received.
417 * Do not call #GNUNET_SERVER_mst_destroy() in callback
419 * @param cls closure with the `struct CadetClient`
420 * @param channel channel handle
421 * @param channel_ctx channel context
422 * @param message the actual message
423 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
426 request_cb (void *cls,
427 struct GNUNET_CADET_Channel *channel,
429 const struct GNUNET_MessageHeader *message)
431 struct CadetClient *sc = *channel_ctx;
432 const struct CadetQueryMessage *sqm;
434 sqm = (const struct CadetQueryMessage *) message;
435 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
436 "Received query for `%s' via cadet from client %p\n",
437 GNUNET_h2s (&sqm->query),
439 GNUNET_STATISTICS_update (GSF_stats,
440 gettext_noop ("# queries received via cadet"), 1,
442 refresh_timeout_task (sc);
443 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
448 GSF_datastore_queue_size,
449 &handle_datastore_reply, sc);
452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453 "Queueing request with datastore failed (queue full?)\n");
454 continue_writing (sc);
461 * Functions of this type are called upon new cadet connection from other peers.
463 * @param cls the closure from GNUNET_CADET_connect
464 * @param channel the channel representing the cadet
465 * @param initiator the identity of the peer who wants to establish a cadet
466 * with us; NULL on binding error
467 * @param port cadet port used for the incoming connection
468 * @param options channel option flags
469 * @return initial channel context (our 'struct CadetClient')
472 accept_cb (void *cls,
473 struct GNUNET_CADET_Channel *channel,
474 const struct GNUNET_PeerIdentity *initiator,
475 uint32_t port, enum GNUNET_CADET_ChannelOption options)
477 struct CadetClient *sc;
479 GNUNET_assert (NULL != channel);
480 if (sc_count >= sc_count_max)
482 GNUNET_STATISTICS_update (GSF_stats,
483 gettext_noop ("# cadet client connections rejected"), 1,
485 GNUNET_CADET_channel_destroy (channel);
488 GNUNET_STATISTICS_update (GSF_stats,
489 gettext_noop ("# cadet connections active"), 1,
491 sc = GNUNET_new (struct CadetClient);
492 sc->channel = channel;
493 GNUNET_CONTAINER_DLL_insert (sc_head,
497 refresh_timeout_task (sc);
498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
499 "Accepting inbound cadet connection from `%s' as client %p\n",
500 GNUNET_i2s (initiator),
507 * Function called by cadet when a client disconnects.
508 * Cleans up our 'struct CadetClient' of that channel.
511 * @param channel channel of the disconnecting client
512 * @param channel_ctx our 'struct CadetClient'
515 cleaner_cb (void *cls,
516 const struct GNUNET_CADET_Channel *channel,
519 struct CadetClient *sc = channel_ctx;
520 struct WriteQueueItem *wqi;
525 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
526 "Terminating cadet connection with client %p\n",
528 GNUNET_STATISTICS_update (GSF_stats,
529 gettext_noop ("# cadet connections active"), -1,
531 if (NULL != sc->terminate_task)
532 GNUNET_SCHEDULER_cancel (sc->terminate_task);
533 if (NULL != sc->timeout_task)
534 GNUNET_SCHEDULER_cancel (sc->timeout_task);
536 GNUNET_CADET_notify_transmit_ready_cancel (sc->wh);
538 GNUNET_DATASTORE_cancel (sc->qe);
539 while (NULL != (wqi = sc->wqi_head))
541 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
546 GNUNET_CONTAINER_DLL_remove (sc_head,
555 * Initialize subsystem for non-anonymous file-sharing.
558 GSF_cadet_start_server ()
560 static const struct GNUNET_CADET_MessageHandler handlers[] = {
561 { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct CadetQueryMessage)},
564 static const uint32_t ports[] = {
565 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
570 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
575 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
576 "Initializing cadet FS server with a limit of %llu connections\n",
578 listen_channel = GNUNET_CADET_connect (GSF_cfg,
588 * Shutdown subsystem for non-anonymous file-sharing.
591 GSF_cadet_stop_server ()
593 if (NULL != listen_channel)
595 GNUNET_CADET_disconnect (listen_channel);
596 listen_channel = NULL;
598 GNUNET_assert (NULL == sc_head);
599 GNUNET_assert (0 == sc_count);
602 /* end of gnunet-service-fs_cadet.c */