2 This file is part of GNUnet.
3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_mesh.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - MESH2 API doesn't allow flow control for server yet (needed!)
28 * - likely need to register clean up handler with mesh to handle
29 * client disconnect (likely leaky right now)
30 * - server is optional, currently client code will NPE if we have
31 * no server, again MESH2 API requirement forcing this for now
32 * - message handlers are symmetric for client/server, should be
33 * separated (currently clients can get requests and servers can
34 * handle answers, not good)
35 * - code is entirely untested
36 * - might have overlooked a few possible simplifications
37 * - PORT is set to old application type, unsure if we should keep
38 * it that way (fine for now)
41 #include "gnunet_constants.h"
42 #include "gnunet_util_lib.h"
43 #include "gnunet_mesh_service.h"
44 #include "gnunet_protocols.h"
45 #include "gnunet_applications.h"
46 #include "gnunet-service-fs.h"
47 #include "gnunet-service-fs_indexing.h"
48 #include "gnunet-service-fs_mesh.h"
51 * After how long do we termiante idle connections?
53 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
57 * A message in the queue to be written to the mesh.
64 struct WriteQueueItem *next;
69 struct WriteQueueItem *prev;
72 * Number of bytes of payload, allocated at the end of this struct.
79 * Information we keep around for each active meshing client.
86 struct MeshClient *next;
91 struct MeshClient *prev;
94 * Socket for communication.
96 struct GNUNET_MESH_Tunnel *socket;
99 * Handle for active write operation, or NULL.
101 struct GNUNET_MESH_TransmitHandle *wh;
104 * Head of write queue.
106 struct WriteQueueItem *wqi_head;
109 * Tail of write queue.
111 struct WriteQueueItem *wqi_tail;
114 * Current active request to the datastore, if we have one pending.
116 struct GNUNET_DATASTORE_QueueEntry *qe;
119 * Task that is scheduled to asynchronously terminate the connection.
121 GNUNET_SCHEDULER_TaskIdentifier terminate_task;
124 * Task that is scheduled to terminate idle connections.
126 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
129 * Size of the last write that was initiated.
137 * Listen socket for incoming requests.
139 static struct GNUNET_MESH_Handle *listen_socket;
142 * Head of DLL of mesh clients.
144 static struct MeshClient *sc_head;
147 * Tail of DLL of mesh clients.
149 static struct MeshClient *sc_tail;
152 * Number of active mesh clients in the 'sc_*'-DLL.
154 static unsigned int sc_count;
157 * Maximum allowed number of mesh clients.
159 static unsigned long long sc_count_max;
163 /* ********************* server-side code ************************* */
167 * We're done with a particular client, clean up.
169 * @param sc client to clean up
172 terminate_mesh (struct MeshClient *sc)
174 struct WriteQueueItem *wqi;
177 "terminate mesh called for %p\n",
179 GNUNET_STATISTICS_update (GSF_stats,
180 gettext_noop ("# mesh connections active"), -1,
182 if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
183 GNUNET_SCHEDULER_cancel (sc->terminate_task);
184 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
185 GNUNET_SCHEDULER_cancel (sc->timeout_task);
187 GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
189 GNUNET_DATASTORE_cancel (sc->qe);
190 while (NULL != (wqi = sc->wqi_head))
192 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
197 GNUNET_CONTAINER_DLL_remove (sc_head,
206 * Task run to asynchronously terminate the mesh due to timeout.
208 * @param cls the 'struct MeshClient'
209 * @param tc scheduler context
212 timeout_mesh_task (void *cls,
213 const struct GNUNET_SCHEDULER_TaskContext *tc)
215 struct MeshClient *sc = cls;
216 struct GNUNET_MESH_Tunnel *tun;
218 sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
221 GNUNET_MESH_tunnel_destroy (tun);
226 * Reset the timeout for the mesh client (due to activity).
228 * @param sc client handle to reset timeout for
231 refresh_timeout_task (struct MeshClient *sc)
233 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
234 GNUNET_SCHEDULER_cancel (sc->timeout_task);
235 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
242 * We're done handling a request from a client, read the next one.
244 * @param sc client to continue reading requests from
247 continue_reading (struct MeshClient *sc)
249 refresh_timeout_task (sc);
250 GNUNET_MESH_receive_done (sc->socket);
255 * Transmit the next entry from the write queue.
257 * @param sc where to process the write queue
260 continue_writing (struct MeshClient *sc);
264 * Send a reply now, mesh is ready.
266 * @param cls closure with the struct MeshClient which sent the query
267 * @param size number of bytes available in 'buf'
268 * @param buf where to write the message
269 * @return number of bytes written to 'buf'
272 write_continuation (void *cls,
276 struct MeshClient *sc = cls;
277 struct WriteQueueItem *wqi;
281 if (NULL == (wqi = sc->wqi_head))
283 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
284 "Write queue empty, reading more requests\n");
289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
290 "Transmission of reply failed, terminating mesh\n");
294 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
298 "Transmitted %u byte reply via mesh\n",
299 (unsigned int) size);
300 GNUNET_STATISTICS_update (GSF_stats,
301 gettext_noop ("# Blocks transferred via mesh"), 1,
303 memcpy (buf, &wqi[1], ret = wqi->msize);
305 continue_writing (sc);
311 * Transmit the next entry from the write queue.
313 * @param sc where to process the write queue
316 continue_writing (struct MeshClient *sc)
318 struct WriteQueueItem *wqi;
322 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
323 "Write pending, waiting for it to complete\n");
324 return; /* write already pending */
326 if (NULL == (wqi = sc->wqi_head))
328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329 "Write queue empty, reading more requests\n");
330 continue_reading (sc);
333 sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
334 GNUNET_TIME_UNIT_FOREVER_REL,
340 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
341 "Write failed; terminating mesh\n");
349 * Process a datum that was stored in the datastore.
351 * @param cls closure with the struct MeshClient which sent the query
352 * @param key key for the content
353 * @param size number of bytes in data
354 * @param data content stored
355 * @param type type of the content
356 * @param priority priority of the content
357 * @param anonymity anonymity-level for the content
358 * @param expiration expiration time for the content
359 * @param uid unique identifier for the datum;
360 * maybe 0 if no unique identifier is available
363 handle_datastore_reply (void *cls,
364 const struct GNUNET_HashCode * key,
365 size_t size, const void *data,
366 enum GNUNET_BLOCK_Type type,
369 struct GNUNET_TIME_Absolute
370 expiration, uint64_t uid)
372 struct MeshClient *sc = cls;
373 size_t msize = size + sizeof (struct MeshReplyMessage);
374 struct WriteQueueItem *wqi;
375 struct MeshReplyMessage *srm;
378 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
381 "Performing on-demand encoding\n");
383 GNUNET_FS_handle_on_demand_block (key,
387 &handle_datastore_reply,
390 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
391 "On-demand encoding request failed\n");
392 continue_writing (sc);
396 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
399 continue_writing (sc);
402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
403 "Starting transmission of %u byte reply for query `%s' via mesh\n",
406 wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
408 srm = (struct MeshReplyMessage *) &wqi[1];
409 srm->header.size = htons ((uint16_t) msize);
410 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_REPLY);
411 srm->type = htonl (type);
412 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
413 memcpy (&srm[1], data, size);
414 sc->reply_size = msize;
415 GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
418 continue_writing (sc);
423 * Functions with this signature are called whenever a
424 * complete query message is received.
426 * Do not call GNUNET_SERVER_mst_destroy in callback
428 * @param cls closure with the 'struct MeshClient'
429 * @param tunnel tunnel handle
430 * @param tunnel_ctx tunnel context
431 * @param message the actual message
432 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
435 request_cb (void *cls,
436 struct GNUNET_MESH_Tunnel *tunnel,
438 const struct GNUNET_MessageHeader *message)
440 struct MeshClient *sc = *tunnel_ctx;
441 const struct MeshQueryMessage *sqm;
446 sqm = (const struct MeshQueryMessage *) message;
447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
448 "Received query for `%s' via mesh\n",
449 GNUNET_h2s (&sqm->query));
450 GNUNET_STATISTICS_update (GSF_stats,
451 gettext_noop ("# queries received via mesh"), 1,
453 refresh_timeout_task (sc);
454 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
459 GSF_datastore_queue_size,
460 GNUNET_TIME_UNIT_FOREVER_REL,
461 &handle_datastore_reply, sc);
464 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
465 "Queueing request with datastore failed (queue full?)\n");
466 continue_writing (sc);
473 * Functions of this type are called upon new mesh connection from other peers.
475 * @param cls the closure from GNUNET_MESH_connect
476 * @param socket the socket representing the mesh
477 * @param initiator the identity of the peer who wants to establish a mesh
478 * with us; NULL on binding error
479 * @param port mesh port used for the incoming connection
480 * @return initial tunnel context (our 'struct MeshClient')
483 accept_cb (void *cls,
484 struct GNUNET_MESH_Tunnel *socket,
485 const struct GNUNET_PeerIdentity *initiator,
488 struct MeshClient *sc;
490 GNUNET_assert (NULL != socket);
491 if (sc_count >= sc_count_max)
493 GNUNET_STATISTICS_update (GSF_stats,
494 gettext_noop ("# mesh client connections rejected"), 1,
496 GNUNET_MESH_tunnel_destroy (socket);
499 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500 "Accepting inbound mesh connection from `%s'\n",
501 GNUNET_i2s (initiator));
502 GNUNET_STATISTICS_update (GSF_stats,
503 gettext_noop ("# mesh connections active"), 1,
505 sc = GNUNET_new (struct MeshClient);
507 GNUNET_CONTAINER_DLL_insert (sc_head,
511 refresh_timeout_task (sc);
513 "Accept returns %p\n",
520 * Function called by mesh when a client disconnects.
521 * Cleans up our 'struct MeshClient' of that tunnel.
524 * @param tunnel tunnel of the disconnecting client
525 * @param tunnel_ctx our 'struct MeshClient'
528 cleaner_cb (void *cls,
529 const struct GNUNET_MESH_Tunnel *tunnel,
532 struct MeshClient *sc = tunnel_ctx;
535 "Cleaner called with %p\n",
543 * Initialize subsystem for non-anonymous file-sharing.
546 GSF_mesh_start_server ()
548 static const struct GNUNET_MESH_MessageHandler handlers[] = {
549 { &request_cb, GNUNET_MESSAGE_TYPE_FS_MESH_QUERY, sizeof (struct MeshQueryMessage)},
552 static const uint32_t ports[] = {
553 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
558 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
563 listen_socket = GNUNET_MESH_connect (GSF_cfg,
573 * Shutdown subsystem for non-anonymous file-sharing.
576 GSF_mesh_stop_server ()
578 struct MeshClient *sc;
580 while (NULL != (sc = sc_head))
582 if (NULL != listen_socket)
584 GNUNET_MESH_disconnect (listen_socket);
585 listen_socket = NULL;
589 /* end of gnunet-service-fs_mesh.c */