2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_stream.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - limit # concurrent clients, have timeouts for server-side
28 * - stream shutdown in callbacks from stream may not always work right now (check with stream_api!)
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_stream_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_stream.h"
41 * Information we keep around for each active streaming client.
48 struct StreamClient *next;
53 struct StreamClient *prev;
56 * Socket for communication.
58 struct GNUNET_STREAM_Socket *socket;
61 * Handle for active read operation, or NULL.
63 struct GNUNET_STREAM_IOReadHandle *rh;
66 * Handle for active write operation, or NULL.
68 struct GNUNET_STREAM_IOWriteHandle *wh;
71 * Tokenizer for requests.
73 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
76 * Current active request to the datastore, if we have one pending.
78 struct GNUNET_DATASTORE_QueueEntry *qe;
81 * Size of the last write that was initiated.
89 * Query from one peer, asking the other for CHK-data.
91 struct StreamQueryMessage
95 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
97 struct GNUNET_MessageHeader header;
100 * Block type must be DBLOCK or IBLOCK.
105 * Query hash from CHK (hash of encrypted block).
107 struct GNUNET_HashCode query;
113 * Reply to a StreamQueryMessage.
115 struct StreamReplyMessage
119 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
121 struct GNUNET_MessageHeader header;
124 * Block type must be DBLOCK or IBLOCK.
129 * Expiration time for the block.
131 struct GNUNET_TIME_AbsoluteNBO expiration;
133 /* followed by the encrypted block */
139 * Handle for a stream to another peer.
145 * Handle for a request that is going out via stream API.
147 struct GSF_StreamRequest
153 struct GSF_StreamRequest *next;
158 struct GSF_StreamRequest *prev;
161 * Which stream is this request associated with?
163 struct StreamHandle *sh;
166 * Function to call with the result.
168 GSF_StreamReplyProcessor proc;
176 * Query to transmit to the other peer.
178 struct GNUNET_HashCode query;
181 * Desired type for the reply.
183 enum GNUNET_BLOCK_Type type;
186 * Did we transmit this request already? YES if we are
187 * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
194 * Handle for a stream to another peer.
199 * Head of DLL of pending requests on this stream.
201 struct GSF_StreamRequest *pending_head;
204 * Tail of DLL of pending requests on this stream.
206 struct GSF_StreamRequest *pending_tail;
209 * Head of DLL of requests waiting for a reply on this stream.
211 struct GSF_StreamRequest *waiting_head;
214 * Tail of DLL of requests waiting for a reply on this stream.
216 struct GSF_StreamRequest *waiting_tail;
219 * Connection to the other peer.
221 struct GNUNET_STREAM_Socket *stream;
224 * Handle for active read operation, or NULL.
226 struct GNUNET_STREAM_IOReadHandle *rh;
229 * Handle for active write operation, or NULL.
231 struct GNUNET_STREAM_IOWriteHandle *wh;
234 * Tokenizer for replies.
236 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
239 * Which peer does this stream go to?
241 struct GNUNET_PeerIdentity target;
244 * Task to kill inactive streams (we keep them around for
245 * a few seconds to give the application a chance to give
248 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
251 * Is this stream ready for transmission?
259 * Listen socket for incoming requests.
261 static struct GNUNET_STREAM_ListenSocket *listen_socket;
264 * Head of DLL of stream clients.
266 static struct StreamClient *sc_head;
269 * Tail of DLL of stream clients.
271 static struct StreamClient *sc_tail;
274 * Map from peer identities to 'struct StreamHandles' with streams to
277 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
280 /* ********************* client-side code ************************* */
284 * Destroy a stream handle.
286 * @param sh stream to process
289 destroy_stream_handle (struct StreamHandle *sh)
291 struct GSF_StreamRequest *sr;
293 while (NULL != (sr = sh->pending_head))
295 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
296 GNUNET_TIME_UNIT_FOREVER_ABS,
298 GSF_stream_query_cancel (sr);
300 while (NULL != (sr = sh->waiting_head))
302 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
303 GNUNET_TIME_UNIT_FOREVER_ABS,
305 GSF_stream_query_cancel (sr);
308 GNUNET_STREAM_io_write_cancel (sh->wh);
310 GNUNET_STREAM_io_read_cancel (sh->rh);
311 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
312 GNUNET_SCHEDULER_cancel (sh->timeout_task);
313 GNUNET_STREAM_close (sh->stream);
314 GNUNET_assert (GNUNET_OK ==
315 GNUNET_CONTAINER_multihashmap_remove (stream_map,
316 &sh->target.hashPubKey,
323 * Transmit pending requests via the stream.
325 * @param sh stream to process
328 transmit_pending (struct StreamHandle *sh);
332 * Function called once the stream is ready for transmission.
334 * @param cls the 'struct StreamHandle'
335 * @param socket stream socket handle
338 stream_ready_cb (void *cls,
339 struct GNUNET_STREAM_Socket *socket)
341 struct StreamHandle *sh = cls;
343 sh->is_ready = GNUNET_YES;
344 transmit_pending (sh);
349 * We had a serious error, tear down and re-create stream from scratch.
351 * @param sh stream to reset
354 reset_stream (struct StreamHandle *sh)
356 struct GSF_StreamRequest *sr;
359 GNUNET_STREAM_io_read_cancel (sh->rh);
360 GNUNET_STREAM_close (sh->stream);
361 sh->is_ready = GNUNET_NO;
362 while (NULL != (sr = sh->waiting_tail))
364 GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
367 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
370 sr->was_transmitted = GNUNET_NO;
372 sh->stream = GNUNET_STREAM_open (GSF_cfg,
374 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
375 &stream_ready_cb, sh,
376 GNUNET_STREAM_OPTION_END);
381 * We got a reply from the stream. Process it.
383 * @param cls the struct StreamHandle
384 * @param status the status of the stream at the time this function is called
385 * @param data traffic from the other side
386 * @param size the number of bytes available in data read; will be 0 on timeout
387 * @return number of bytes of processed from 'data' (any data remaining should be
388 * given to the next time the read processor is called).
391 handle_stream_reply (void *cls,
392 enum GNUNET_STREAM_Status status,
396 struct StreamHandle *sh = cls;
400 GNUNET_SERVER_mst_receive (sh->mst,
403 GNUNET_NO, GNUNET_NO))
409 sh->rh = GNUNET_STREAM_read (sh->stream,
410 GNUNET_TIME_UNIT_FOREVER_REL,
411 &handle_stream_reply,
418 * Functions of this signature are called whenever we transmitted a
419 * query via a stream.
421 * @param cls the struct StreamHandle for which we did the write call
422 * @param status the status of the stream at the time this function is called;
423 * GNUNET_OK if writing to stream was completed successfully,
424 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
426 * @param size the number of bytes written
429 query_write_continuation (void *cls,
430 enum GNUNET_STREAM_Status status,
433 struct StreamHandle *sh = cls;
436 if ( (GNUNET_STREAM_OK != status) ||
437 (sizeof (struct StreamQueryMessage) != size) )
443 sh->rh = GNUNET_STREAM_read (sh->stream,
444 GNUNET_TIME_UNIT_FOREVER_REL,
445 &handle_stream_reply,
447 transmit_pending (sh);
452 * Transmit pending requests via the stream.
454 * @param sh stream to process
457 transmit_pending (struct StreamHandle *sh)
459 struct StreamQueryMessage sqm;
460 struct GSF_StreamRequest *sr;
464 sr = sh->pending_head;
467 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
470 GNUNET_CONTAINER_DLL_insert_tail (sh->waiting_head,
473 sr->was_transmitted = GNUNET_YES;
474 sqm.header.size = htons (sizeof (sqm));
475 sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
476 sqm.type = htonl (sr->type);
477 sqm.query = sr->query;
478 sh->wh = GNUNET_STREAM_write (sh->stream,
480 GNUNET_TIME_UNIT_FOREVER_REL,
481 &query_write_continuation,
487 * Functions with this signature are called whenever a
488 * complete reply is received.
490 * Do not call GNUNET_SERVER_mst_destroy in callback
492 * @param cls closure with the 'struct StreamHandle'
493 * @param client identification of the client, NULL
494 * @param message the actual message
495 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
500 const struct GNUNET_MessageHeader *message)
502 struct StreamHandle *sh = cls;
503 const struct StreamReplyMessage *srm;
505 enum GNUNET_BLOCK_Type type;
506 struct GNUNET_HashCode query;
507 struct GSF_StreamRequest *sr;
509 msize = ntohs (message->size);
510 switch (ntohs (message->type))
512 case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
513 if (sizeof (struct StreamReplyMessage) > msize)
516 return GNUNET_SYSERR;
518 srm = (const struct StreamReplyMessage *) message;
519 msize -= sizeof (struct StreamReplyMessage);
520 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
522 GNUNET_BLOCK_get_key (GSF_block_ctx,
524 &srm[1], msize, &query))
527 return GNUNET_SYSERR;
529 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
530 "Received reply `%s' via stream\n",
531 GNUNET_h2s (&query));
532 GNUNET_STATISTICS_update (GSF_stats,
533 gettext_noop ("# replies received via stream"), 1,
535 for (sr = sh->waiting_head; NULL != sr; sr = sr->next)
536 if (0 == memcmp (&query,
538 sizeof (struct GNUNET_HashCode)))
542 GNUNET_STATISTICS_update (GSF_stats,
543 gettext_noop ("# replies received via stream dropped"), 1,
547 sr->proc (sr->proc_cls,
549 GNUNET_TIME_absolute_ntoh (srm->expiration),
552 GSF_stream_query_cancel (sr);
556 return GNUNET_SYSERR;
562 * Get (or create) a stream to talk to the given peer.
564 * @param target peer we want to communicate with
566 static struct StreamHandle *
567 get_stream (const struct GNUNET_PeerIdentity *target)
569 struct StreamHandle *sh;
571 sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
572 &target->hashPubKey);
575 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
577 GNUNET_SCHEDULER_cancel (sh->timeout_task);
578 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
582 sh = GNUNET_malloc (sizeof (struct StreamHandle));
583 sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
585 sh->target = *target;
586 sh->stream = GNUNET_STREAM_open (GSF_cfg,
588 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
589 &stream_ready_cb, sh,
590 GNUNET_STREAM_OPTION_END);
591 GNUNET_assert (GNUNET_OK ==
592 GNUNET_CONTAINER_multihashmap_put (stream_map,
593 &sh->target.hashPubKey,
595 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
601 * Look for a block by directly contacting a particular peer.
603 * @param target peer that should have the block
604 * @param query hash to query for the block
605 * @param type desired type for the block
606 * @param proc function to call with result
607 * @param proc_cls closure for 'proc'
608 * @return handle to cancel the operation
610 struct GSF_StreamRequest *
611 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
612 const struct GNUNET_HashCode *query,
613 enum GNUNET_BLOCK_Type type,
614 GSF_StreamReplyProcessor proc, void *proc_cls)
616 struct StreamHandle *sh;
617 struct GSF_StreamRequest *sr;
619 sh = get_stream (target);
620 sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
623 sr->proc_cls = proc_cls;
626 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
629 if (GNUNET_YES == sh->is_ready)
630 transmit_pending (sh);
636 * Task called when it is time to destroy an inactive stream.
638 * @param cls the 'struct StreamHandle' to tear down
639 * @param tc scheduler context, unused
642 stream_timeout (void *cls,
643 const struct GNUNET_SCHEDULER_TaskContext *tc)
645 struct StreamHandle *sh = cls;
647 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
648 destroy_stream_handle (sh);
653 * Cancel an active request; must not be called after 'proc'
656 * @param sr request to cancel
659 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
661 struct StreamHandle *sh = sr->sh;
663 if (GNUNET_YES == sr->was_transmitted)
664 GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
668 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
672 if ( (NULL == sh->waiting_head) &&
673 (NULL == sh->pending_head) )
674 sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
680 /* ********************* server-side code ************************* */
684 * We're done with a particular client, clean up.
686 * @param sc client to clean up
689 terminate_stream (struct StreamClient *sc)
691 GNUNET_STATISTICS_update (GSF_stats,
692 gettext_noop ("# stream connections active"), -1,
695 GNUNET_STREAM_io_read_cancel (sc->rh);
697 GNUNET_STREAM_io_write_cancel (sc->wh);
699 GNUNET_DATASTORE_cancel (sc->qe);
700 GNUNET_SERVER_mst_destroy (sc->mst);
701 GNUNET_STREAM_close (sc->socket);
702 GNUNET_CONTAINER_DLL_remove (sc_head,
710 * Functions of this signature are called whenever data is available from the
713 * @param cls the closure from GNUNET_STREAM_read
714 * @param status the status of the stream at the time this function is called
715 * @param data traffic from the other side
716 * @param size the number of bytes available in data read; will be 0 on timeout
717 * @return number of bytes of processed from 'data' (any data remaining should be
718 * given to the next time the read processor is called).
721 process_request (void *cls,
722 enum GNUNET_STREAM_Status status,
728 * We're done handling a request from a client, read the next one.
730 * @param sc client to continue reading requests from
733 continue_reading (struct StreamClient *sc)
738 GNUNET_SERVER_mst_receive (sc->mst,
741 GNUNET_NO, GNUNET_YES);
742 if (GNUNET_NO == ret)
744 sc->rh = GNUNET_STREAM_read (sc->socket,
745 GNUNET_TIME_UNIT_FOREVER_REL,
752 * Functions of this signature are called whenever data is available from the
755 * @param cls the closure from GNUNET_STREAM_read
756 * @param status the status of the stream at the time this function is called
757 * @param data traffic from the other side
758 * @param size the number of bytes available in data read; will be 0 on timeout
759 * @return number of bytes of processed from 'data' (any data remaining should be
760 * given to the next time the read processor is called).
763 process_request (void *cls,
764 enum GNUNET_STREAM_Status status,
768 struct StreamClient *sc = cls;
774 case GNUNET_STREAM_OK:
776 GNUNET_SERVER_mst_receive (sc->mst,
779 GNUNET_NO, GNUNET_YES);
780 if (GNUNET_NO == ret)
781 return size; /* more messages in MST */
782 if (GNUNET_SYSERR == ret)
785 terminate_stream (sc);
789 case GNUNET_STREAM_TIMEOUT:
790 case GNUNET_STREAM_SHUTDOWN:
791 case GNUNET_STREAM_SYSERR:
792 case GNUNET_STREAM_BROKEN:
793 terminate_stream (sc);
799 continue_reading (sc);
805 * Sending a reply was completed, continue processing.
807 * @param cls closure with the struct StreamClient which sent the query
810 write_continuation (void *cls,
811 enum GNUNET_STREAM_Status status,
814 struct StreamClient *sc = cls;
817 if ( (GNUNET_STREAM_OK == status) &&
818 (size == sc->reply_size) )
820 GNUNET_STATISTICS_update (GSF_stats,
821 gettext_noop ("# Blocks transferred via stream"), 1,
823 continue_reading (sc);
826 terminate_stream (sc);
831 * Process a datum that was stored in the datastore.
833 * @param cls closure with the struct StreamClient which sent the query
834 * @param key key for the content
835 * @param size number of bytes in data
836 * @param data content stored
837 * @param type type of the content
838 * @param priority priority of the content
839 * @param anonymity anonymity-level for the content
840 * @param expiration expiration time for the content
841 * @param uid unique identifier for the datum;
842 * maybe 0 if no unique identifier is available
845 handle_datastore_reply (void *cls,
846 const struct GNUNET_HashCode * key,
847 size_t size, const void *data,
848 enum GNUNET_BLOCK_Type type,
851 struct GNUNET_TIME_Absolute
852 expiration, uint64_t uid)
854 struct StreamClient *sc = cls;
855 size_t msize = size + sizeof (struct StreamReplyMessage);
856 char buf[msize] GNUNET_ALIGN;
857 struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
860 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
863 GNUNET_FS_handle_on_demand_block (key,
867 &handle_datastore_reply,
870 continue_reading (sc);
874 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
877 continue_reading (sc);
880 srm->header.size = htons ((uint16_t) msize);
881 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
882 srm->type = htonl (type);
883 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
884 memcpy (&srm[1], data, size);
885 sc->reply_size = msize;
886 sc->wh = GNUNET_STREAM_write (sc->socket,
888 GNUNET_TIME_UNIT_FOREVER_REL,
893 terminate_stream (sc);
900 * Functions with this signature are called whenever a
901 * complete query message is received.
903 * Do not call GNUNET_SERVER_mst_destroy in callback
905 * @param cls closure with the 'struct StreamClient'
906 * @param client identification of the client, NULL
907 * @param message the actual message
908 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
911 request_cb (void *cls,
913 const struct GNUNET_MessageHeader *message)
915 struct StreamClient *sc = cls;
916 const struct StreamQueryMessage *sqm;
918 switch (ntohs (message->type))
920 case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
921 if (sizeof (struct StreamQueryMessage) !=
922 ntohs (message->size))
925 return GNUNET_SYSERR;
927 sqm = (const struct StreamQueryMessage *) message;
928 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
929 "Received query for `%s' via stream\n",
930 GNUNET_h2s (&sqm->query));
931 GNUNET_STATISTICS_update (GSF_stats,
932 gettext_noop ("# queries received via stream"), 1,
934 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
939 GSF_datastore_queue_size,
940 GNUNET_TIME_UNIT_FOREVER_REL,
941 &handle_datastore_reply, sc);
943 continue_reading (sc);
947 return GNUNET_SYSERR;
953 * Functions of this type are called upon new stream connection from other peers
954 * or upon binding error which happen when the app_port given in
955 * GNUNET_STREAM_listen() is already taken.
957 * @param cls the closure from GNUNET_STREAM_listen
958 * @param socket the socket representing the stream; NULL on binding error
959 * @param initiator the identity of the peer who wants to establish a stream
960 * with us; NULL on binding error
961 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
962 * stream (the socket will be invalid after the call)
965 accept_cb (void *cls,
966 struct GNUNET_STREAM_Socket *socket,
967 const struct GNUNET_PeerIdentity *initiator)
969 struct StreamClient *sc;
972 return GNUNET_SYSERR;
973 GNUNET_STATISTICS_update (GSF_stats,
974 gettext_noop ("# stream connections active"), 1,
976 sc = GNUNET_malloc (sizeof (struct StreamClient));
978 sc->mst = GNUNET_SERVER_mst_create (&request_cb,
980 sc->rh = GNUNET_STREAM_read (sc->socket,
981 GNUNET_TIME_UNIT_FOREVER_REL,
984 GNUNET_CONTAINER_DLL_insert (sc_head,
992 * Initialize subsystem for non-anonymous file-sharing.
997 stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
998 listen_socket = GNUNET_STREAM_listen (GSF_cfg,
999 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1001 GNUNET_STREAM_OPTION_END);
1006 * Function called on each active streams to shut them down.
1009 * @param key target peer, unused
1010 * @param value the 'struct StreamHandle' to destroy
1011 * @return GNUNET_YES (continue to iterate)
1014 release_streams (void *cls,
1015 const struct GNUNET_HashCode *key,
1018 struct StreamHandle *sh = value;
1020 destroy_stream_handle (sh);
1026 * Shutdown subsystem for non-anonymous file-sharing.
1031 struct StreamClient *sc;
1033 while (NULL != (sc = sc_head))
1034 terminate_stream (sc);
1035 if (NULL != listen_socket)
1037 GNUNET_STREAM_listen_close (listen_socket);
1038 listen_socket = NULL;
1040 GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1043 GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1047 /* end of gnunet-service-fs_stream.c */