2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_stream.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
28 * - limit # concurrent clients, timeout for read
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_stream_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_stream.h"
41 * Information we keep around for each active streaming client.
48 struct StreamClient *next;
53 struct StreamClient *prev;
56 * Socket for communication.
58 struct GNUNET_STREAM_Socket *socket;
61 * Handle for active read operation, or NULL.
63 struct GNUNET_STREAM_IOReadHandle *rh;
66 * Handle for active write operation, or NULL.
68 struct GNUNET_STREAM_IOWriteHandle *wh;
71 * Tokenizer for requests.
73 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
76 * Current active request to the datastore, if we have one pending.
78 struct GNUNET_DATASTORE_QueueEntry *qe;
81 * Size of the last write that was initiated.
89 * Query from one peer, asking the other for CHK-data.
91 struct StreamQueryMessage
95 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
97 struct GNUNET_MessageHeader header;
100 * Block type must be DBLOCK or IBLOCK.
105 * Query hash from CHK (hash of encrypted block).
107 struct GNUNET_HashCode query;
113 * Reply to a StreamQueryMessage.
115 struct StreamReplyMessage
119 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
121 struct GNUNET_MessageHeader header;
124 * Block type must be DBLOCK or IBLOCK.
129 * Expiration time for the block.
131 struct GNUNET_TIME_AbsoluteNBO expiration;
133 /* followed by the encrypted block */
139 * Listen socket for incoming requests.
141 static struct GNUNET_STREAM_ListenSocket *listen_socket;
144 * Head of DLL of stream clients.
146 static struct StreamClient *sc_head;
149 * Tail of DLL of stream clients.
151 static struct StreamClient *sc_tail;
155 * We're done with a particular client, clean up.
157 * @param sc client to clean up
160 terminate_stream (struct StreamClient *sc)
163 GNUNET_STREAM_io_read_cancel (sc->rh);
165 GNUNET_STREAM_io_write_cancel (sc->wh);
167 GNUNET_DATASTORE_cancel (sc->qe);
168 GNUNET_SERVER_mst_destroy (sc->mst);
169 GNUNET_STREAM_close (sc->socket);
170 GNUNET_CONTAINER_DLL_remove (sc_head,
178 * Functions of this signature are called whenever data is available from the
181 * @param cls the closure from GNUNET_STREAM_read
182 * @param status the status of the stream at the time this function is called
183 * @param data traffic from the other side
184 * @param size the number of bytes available in data read; will be 0 on timeout
185 * @return number of bytes of processed from 'data' (any data remaining should be
186 * given to the next time the read processor is called).
189 process_request (void *cls,
190 enum GNUNET_STREAM_Status status,
196 * We're done handling a request from a client, read the next one.
198 * @param sc client to continue reading requests from
201 continue_reading (struct StreamClient *sc)
206 GNUNET_SERVER_mst_receive (sc->mst,
209 GNUNET_NO, GNUNET_YES);
210 if (GNUNET_NO == ret)
212 sc->rh = GNUNET_STREAM_read (sc->socket,
213 GNUNET_TIME_UNIT_FOREVER_REL,
220 * Functions of this signature are called whenever data is available from the
223 * @param cls the closure from GNUNET_STREAM_read
224 * @param status the status of the stream at the time this function is called
225 * @param data traffic from the other side
226 * @param size the number of bytes available in data read; will be 0 on timeout
227 * @return number of bytes of processed from 'data' (any data remaining should be
228 * given to the next time the read processor is called).
231 process_request (void *cls,
232 enum GNUNET_STREAM_Status status,
236 struct StreamClient *sc = cls;
242 case GNUNET_STREAM_OK:
244 GNUNET_SERVER_mst_receive (sc->mst,
247 GNUNET_NO, GNUNET_YES);
248 if (GNUNET_NO == ret)
249 return size; /* more messages in MST */
250 if (GNUNET_SYSERR == ret)
253 terminate_stream (sc);
257 case GNUNET_STREAM_TIMEOUT:
258 case GNUNET_STREAM_SHUTDOWN:
259 case GNUNET_STREAM_SYSERR:
260 case GNUNET_STREAM_BROKEN:
261 terminate_stream (sc);
267 continue_reading (sc);
273 * Sending a reply was completed, continue processing.
275 * @param cls closure with the struct StreamClient which sent the query
278 write_continuation (void *cls,
279 enum GNUNET_STREAM_Status status,
282 struct StreamClient *sc = cls;
285 if ( (GNUNET_STREAM_OK == status) &&
286 (size == sc->reply_size) )
287 continue_reading (sc);
289 terminate_stream (sc);
294 * Process a datum that was stored in the datastore.
296 * @param cls closure with the struct StreamClient which sent the query
297 * @param key key for the content
298 * @param size number of bytes in data
299 * @param data content stored
300 * @param type type of the content
301 * @param priority priority of the content
302 * @param anonymity anonymity-level for the content
303 * @param expiration expiration time for the content
304 * @param uid unique identifier for the datum;
305 * maybe 0 if no unique identifier is available
308 handle_datastore_reply (void *cls,
309 const struct GNUNET_HashCode * key,
310 size_t size, const void *data,
311 enum GNUNET_BLOCK_Type type,
314 struct GNUNET_TIME_Absolute
315 expiration, uint64_t uid)
317 struct StreamClient *sc = cls;
318 size_t msize = size + sizeof (struct StreamReplyMessage);
319 char buf[msize] GNUNET_ALIGN;
320 struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
323 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
326 GNUNET_FS_handle_on_demand_block (key,
330 &handle_datastore_reply,
333 continue_reading (sc);
337 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
340 continue_reading (sc);
343 srm->header.size = htons ((uint16_t) msize);
344 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
345 srm->type = htonl (type);
346 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
347 memcpy (&srm[1], data, size);
348 sc->reply_size = msize;
349 sc->wh = GNUNET_STREAM_write (sc->socket,
351 GNUNET_TIME_UNIT_FOREVER_REL,
356 terminate_stream (sc);
363 * Functions with this signature are called whenever a
364 * complete message is received.
366 * Do not call GNUNET_SERVER_mst_destroy in callback
368 * @param cls closure with the 'struct StreamClient'
369 * @param client identification of the client, NULL
370 * @param message the actual message
371 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
374 request_cb (void *cls,
376 const struct GNUNET_MessageHeader *message)
378 struct StreamClient *sc = cls;
379 const struct StreamQueryMessage *sqm;
381 switch (ntohs (message->type))
383 case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
384 if (sizeof (struct StreamQueryMessage) !=
385 ntohs (message->size))
388 return GNUNET_SYSERR;
390 sqm = (const struct StreamQueryMessage *) message;
391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392 "Received query for `%s' via stream\n",
393 GNUNET_h2s (&sqm->query));
394 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
398 0 /* FIXME: priority */,
399 GSF_datastore_queue_size,
400 GNUNET_TIME_UNIT_FOREVER_REL,
401 &handle_datastore_reply, sc);
403 continue_reading (sc);
407 return GNUNET_SYSERR;
413 * Functions of this type are called upon new stream connection from other peers
414 * or upon binding error which happen when the app_port given in
415 * GNUNET_STREAM_listen() is already taken.
417 * @param cls the closure from GNUNET_STREAM_listen
418 * @param socket the socket representing the stream; NULL on binding error
419 * @param initiator the identity of the peer who wants to establish a stream
420 * with us; NULL on binding error
421 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
422 * stream (the socket will be invalid after the call)
425 accept_cb (void *cls,
426 struct GNUNET_STREAM_Socket *socket,
427 const struct GNUNET_PeerIdentity *initiator)
429 struct StreamClient *sc;
432 return GNUNET_SYSERR;
433 sc = GNUNET_malloc (sizeof (struct StreamClient));
435 sc->mst = GNUNET_SERVER_mst_create (&request_cb,
437 sc->rh = GNUNET_STREAM_read (sc->socket,
438 GNUNET_TIME_UNIT_FOREVER_REL,
441 GNUNET_CONTAINER_DLL_insert (sc_head,
449 * Initialize subsystem for non-anonymous file-sharing.
454 listen_socket = GNUNET_STREAM_listen (GSF_cfg,
455 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
457 GNUNET_STREAM_OPTION_END);
462 * Shutdown subsystem for non-anonymous file-sharing.
467 struct StreamClient *sc;
469 while (NULL != (sc = sc_head))
470 terminate_stream (sc);
471 if (NULL != listen_socket)
473 GNUNET_STREAM_listen_close (listen_socket);
474 listen_socket = NULL;
478 /* end of gnunet-service-fs_stream.c */