2 This file is part of GNUnet.
3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_mesh_server.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - PORT is set to old application type, unsure if we should keep
28 * it that way (fine for now)
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_mesh_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_mesh.h"
41 * After how long do we termiante idle connections?
43 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
47 * A message in the queue to be written to the mesh.
54 struct WriteQueueItem *next;
59 struct WriteQueueItem *prev;
62 * Number of bytes of payload, allocated at the end of this struct.
69 * Information we keep around for each active meshing client.
76 struct MeshClient *next;
81 struct MeshClient *prev;
84 * Tunnel for communication.
86 struct GNUNET_MESH_Tunnel *tunnel;
89 * Handle for active write operation, or NULL.
91 struct GNUNET_MESH_TransmitHandle *wh;
94 * Head of write queue.
96 struct WriteQueueItem *wqi_head;
99 * Tail of write queue.
101 struct WriteQueueItem *wqi_tail;
104 * Current active request to the datastore, if we have one pending.
106 struct GNUNET_DATASTORE_QueueEntry *qe;
109 * Task that is scheduled to asynchronously terminate the connection.
111 GNUNET_SCHEDULER_TaskIdentifier terminate_task;
114 * Task that is scheduled to terminate idle connections.
116 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
119 * Size of the last write that was initiated.
127 * Listen tunnel for incoming requests.
129 static struct GNUNET_MESH_Handle *listen_tunnel;
132 * Head of DLL of mesh clients.
134 static struct MeshClient *sc_head;
137 * Tail of DLL of mesh clients.
139 static struct MeshClient *sc_tail;
142 * Number of active mesh clients in the 'sc_*'-DLL.
144 static unsigned int sc_count;
147 * Maximum allowed number of mesh clients.
149 static unsigned long long sc_count_max;
154 * Task run to asynchronously terminate the mesh due to timeout.
156 * @param cls the 'struct MeshClient'
157 * @param tc scheduler context
160 timeout_mesh_task (void *cls,
161 const struct GNUNET_SCHEDULER_TaskContext *tc)
163 struct MeshClient *sc = cls;
164 struct GNUNET_MESH_Tunnel *tun;
166 sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
169 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
170 "Timeout for inactive mesh client %p\n",
172 GNUNET_MESH_tunnel_destroy (tun);
177 * Reset the timeout for the mesh client (due to activity).
179 * @param sc client handle to reset timeout for
182 refresh_timeout_task (struct MeshClient *sc)
184 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
185 GNUNET_SCHEDULER_cancel (sc->timeout_task);
186 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
193 * We're done handling a request from a client, read the next one.
195 * @param sc client to continue reading requests from
198 continue_reading (struct MeshClient *sc)
200 refresh_timeout_task (sc);
201 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
202 "Finished processing mesh request from client %p, ready to receive the next one\n",
204 GNUNET_MESH_receive_done (sc->tunnel);
209 * Transmit the next entry from the write queue.
211 * @param sc where to process the write queue
214 continue_writing (struct MeshClient *sc);
218 * Send a reply now, mesh is ready.
220 * @param cls closure with the struct MeshClient which sent the query
221 * @param size number of bytes available in 'buf'
222 * @param buf where to write the message
223 * @return number of bytes written to 'buf'
226 write_continuation (void *cls,
230 struct MeshClient *sc = cls;
231 struct GNUNET_MESH_Tunnel *tun;
232 struct WriteQueueItem *wqi;
236 if (NULL == (wqi = sc->wqi_head))
238 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
239 "Write queue empty, reading more requests\n");
244 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
245 "Transmission of reply failed, terminating mesh\n");
248 GNUNET_MESH_tunnel_destroy (tun);
251 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
254 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
255 "Transmitted %u byte reply via mesh to %p\n",
258 GNUNET_STATISTICS_update (GSF_stats,
259 gettext_noop ("# Blocks transferred via mesh"), 1,
261 memcpy (buf, &wqi[1], ret = wqi->msize);
263 continue_writing (sc);
269 * Transmit the next entry from the write queue.
271 * @param sc where to process the write queue
274 continue_writing (struct MeshClient *sc)
276 struct WriteQueueItem *wqi;
277 struct GNUNET_MESH_Tunnel *tun;
281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
282 "Write pending, waiting for it to complete\n");
283 return; /* write already pending */
285 if (NULL == (wqi = sc->wqi_head))
287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
288 "Write queue empty, reading more requests\n");
289 continue_reading (sc);
292 sc->wh = GNUNET_MESH_notify_transmit_ready (sc->tunnel, GNUNET_NO,
293 GNUNET_TIME_UNIT_FOREVER_REL,
299 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
300 "Write failed; terminating mesh\n");
303 GNUNET_MESH_tunnel_destroy (tun);
310 * Process a datum that was stored in the datastore.
312 * @param cls closure with the struct MeshClient which sent the query
313 * @param key key for the content
314 * @param size number of bytes in data
315 * @param data content stored
316 * @param type type of the content
317 * @param priority priority of the content
318 * @param anonymity anonymity-level for the content
319 * @param expiration expiration time for the content
320 * @param uid unique identifier for the datum;
321 * maybe 0 if no unique identifier is available
324 handle_datastore_reply (void *cls,
325 const struct GNUNET_HashCode *key,
326 size_t size, const void *data,
327 enum GNUNET_BLOCK_Type type,
330 struct GNUNET_TIME_Absolute
331 expiration, uint64_t uid)
333 struct MeshClient *sc = cls;
334 size_t msize = size + sizeof (struct MeshReplyMessage);
335 struct WriteQueueItem *wqi;
336 struct MeshReplyMessage *srm;
339 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
341 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
342 "Performing on-demand encoding for query %s\n",
345 GNUNET_FS_handle_on_demand_block (key,
349 &handle_datastore_reply,
352 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
353 "On-demand encoding request failed\n");
354 continue_writing (sc);
358 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
361 continue_writing (sc);
364 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365 "Starting transmission of %u byte reply for query `%s' via mesh to %p\n",
369 wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
371 srm = (struct MeshReplyMessage *) &wqi[1];
372 srm->header.size = htons ((uint16_t) msize);
373 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_REPLY);
374 srm->type = htonl (type);
375 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
376 memcpy (&srm[1], data, size);
377 sc->reply_size = msize;
378 GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
381 continue_writing (sc);
386 * Functions with this signature are called whenever a
387 * complete query message is received.
389 * Do not call GNUNET_SERVER_mst_destroy in callback
391 * @param cls closure with the 'struct MeshClient'
392 * @param tunnel tunnel handle
393 * @param tunnel_ctx tunnel context
394 * @param message the actual message
395 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
398 request_cb (void *cls,
399 struct GNUNET_MESH_Tunnel *tunnel,
401 const struct GNUNET_MessageHeader *message)
403 struct MeshClient *sc = *tunnel_ctx;
404 const struct MeshQueryMessage *sqm;
406 sqm = (const struct MeshQueryMessage *) message;
407 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
408 "Received query for `%s' via mesh from client %p\n",
409 GNUNET_h2s (&sqm->query),
411 GNUNET_STATISTICS_update (GSF_stats,
412 gettext_noop ("# queries received via mesh"), 1,
414 refresh_timeout_task (sc);
415 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
420 GSF_datastore_queue_size,
421 GNUNET_TIME_UNIT_FOREVER_REL,
422 &handle_datastore_reply, sc);
425 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
426 "Queueing request with datastore failed (queue full?)\n");
427 continue_writing (sc);
434 * Functions of this type are called upon new mesh connection from other peers.
436 * @param cls the closure from GNUNET_MESH_connect
437 * @param tunnel the tunnel representing the mesh
438 * @param initiator the identity of the peer who wants to establish a mesh
439 * with us; NULL on binding error
440 * @param port mesh port used for the incoming connection
441 * @return initial tunnel context (our 'struct MeshClient')
444 accept_cb (void *cls,
445 struct GNUNET_MESH_Tunnel *tunnel,
446 const struct GNUNET_PeerIdentity *initiator,
449 struct MeshClient *sc;
451 GNUNET_assert (NULL != tunnel);
452 if (sc_count >= sc_count_max)
454 GNUNET_STATISTICS_update (GSF_stats,
455 gettext_noop ("# mesh client connections rejected"), 1,
457 GNUNET_MESH_tunnel_destroy (tunnel);
460 GNUNET_STATISTICS_update (GSF_stats,
461 gettext_noop ("# mesh connections active"), 1,
463 sc = GNUNET_new (struct MeshClient);
465 GNUNET_CONTAINER_DLL_insert (sc_head,
469 refresh_timeout_task (sc);
470 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471 "Accepting inbound mesh connection from `%s' as client %p\n",
472 GNUNET_i2s (initiator),
479 * Function called by mesh when a client disconnects.
480 * Cleans up our 'struct MeshClient' of that tunnel.
483 * @param tunnel tunnel of the disconnecting client
484 * @param tunnel_ctx our 'struct MeshClient'
487 cleaner_cb (void *cls,
488 const struct GNUNET_MESH_Tunnel *tunnel,
491 struct MeshClient *sc = tunnel_ctx;
492 struct WriteQueueItem *wqi;
497 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498 "Terminating mesh connection with client %p\n",
500 GNUNET_STATISTICS_update (GSF_stats,
501 gettext_noop ("# mesh connections active"), -1,
503 if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
504 GNUNET_SCHEDULER_cancel (sc->terminate_task);
505 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
506 GNUNET_SCHEDULER_cancel (sc->timeout_task);
508 GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
510 GNUNET_DATASTORE_cancel (sc->qe);
511 while (NULL != (wqi = sc->wqi_head))
513 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
518 GNUNET_CONTAINER_DLL_remove (sc_head,
527 * Initialize subsystem for non-anonymous file-sharing.
530 GSF_mesh_start_server ()
532 static const struct GNUNET_MESH_MessageHandler handlers[] = {
533 { &request_cb, GNUNET_MESSAGE_TYPE_FS_MESH_QUERY, sizeof (struct MeshQueryMessage)},
536 static const uint32_t ports[] = {
537 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
542 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
547 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
548 "Initializing mesh FS server with a limit of %llu connections\n",
550 listen_tunnel = GNUNET_MESH_connect (GSF_cfg,
560 * Shutdown subsystem for non-anonymous file-sharing.
563 GSF_mesh_stop_server ()
565 if (NULL != listen_tunnel)
567 GNUNET_MESH_disconnect (listen_tunnel);
568 listen_tunnel = NULL;
570 GNUNET_assert (NULL == sc_head);
571 GNUNET_assert (0 == sc_count);
574 /* end of gnunet-service-fs_mesh.c */