2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_stream.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - limit # concurrent clients, have timeouts for server-side
30 #include "gnunet_constants.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_applications.h"
35 #include "gnunet-service-fs.h"
36 #include "gnunet-service-fs_indexing.h"
37 #include "gnunet-service-fs_stream.h"
40 * Information we keep around for each active streaming client.
47 struct StreamClient *next;
52 struct StreamClient *prev;
55 * Socket for communication.
57 struct GNUNET_STREAM_Socket *socket;
60 * Handle for active read operation, or NULL.
62 struct GNUNET_STREAM_IOReadHandle *rh;
65 * Handle for active write operation, or NULL.
67 struct GNUNET_STREAM_IOWriteHandle *wh;
70 * Tokenizer for requests.
72 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
75 * Current active request to the datastore, if we have one pending.
77 struct GNUNET_DATASTORE_QueueEntry *qe;
80 * Task that is scheduled to asynchronously terminate the connection.
82 GNUNET_SCHEDULER_TaskIdentifier terminate_task;
85 * Size of the last write that was initiated.
93 * Query from one peer, asking the other for CHK-data.
95 struct StreamQueryMessage
99 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
101 struct GNUNET_MessageHeader header;
104 * Block type must be DBLOCK or IBLOCK.
109 * Query hash from CHK (hash of encrypted block).
111 struct GNUNET_HashCode query;
117 * Reply to a StreamQueryMessage.
119 struct StreamReplyMessage
123 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
125 struct GNUNET_MessageHeader header;
128 * Block type must be DBLOCK or IBLOCK.
133 * Expiration time for the block.
135 struct GNUNET_TIME_AbsoluteNBO expiration;
137 /* followed by the encrypted block */
143 * Handle for a stream to another peer.
149 * Handle for a request that is going out via stream API.
151 struct GSF_StreamRequest
157 struct GSF_StreamRequest *next;
162 struct GSF_StreamRequest *prev;
165 * Which stream is this request associated with?
167 struct StreamHandle *sh;
170 * Function to call with the result.
172 GSF_StreamReplyProcessor proc;
180 * Query to transmit to the other peer.
182 struct GNUNET_HashCode query;
185 * Desired type for the reply.
187 enum GNUNET_BLOCK_Type type;
190 * Did we transmit this request already? YES if we are
191 * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
198 * Handle for a stream to another peer.
203 * Head of DLL of pending requests on this stream.
205 struct GSF_StreamRequest *pending_head;
208 * Tail of DLL of pending requests on this stream.
210 struct GSF_StreamRequest *pending_tail;
213 * Head of DLL of requests waiting for a reply on this stream.
215 struct GSF_StreamRequest *waiting_head;
218 * Tail of DLL of requests waiting for a reply on this stream.
220 struct GSF_StreamRequest *waiting_tail;
223 * Connection to the other peer.
225 struct GNUNET_STREAM_Socket *stream;
228 * Handle for active read operation, or NULL.
230 struct GNUNET_STREAM_IOReadHandle *rh;
233 * Handle for active write operation, or NULL.
235 struct GNUNET_STREAM_IOWriteHandle *wh;
238 * Tokenizer for replies.
240 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
243 * Which peer does this stream go to?
245 struct GNUNET_PeerIdentity target;
248 * Task to kill inactive streams (we keep them around for
249 * a few seconds to give the application a chance to give
252 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
255 * Task to reset streams that had errors (asynchronously,
256 * as we may not be able to do it immediately during a
257 * callback from the stream API).
259 GNUNET_SCHEDULER_TaskIdentifier reset_task;
262 * Is this stream ready for transmission?
270 * Listen socket for incoming requests.
272 static struct GNUNET_STREAM_ListenSocket *listen_socket;
275 * Head of DLL of stream clients.
277 static struct StreamClient *sc_head;
280 * Tail of DLL of stream clients.
282 static struct StreamClient *sc_tail;
285 * Map from peer identities to 'struct StreamHandles' with streams to
288 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
291 /* ********************* client-side code ************************* */
295 * Destroy a stream handle.
297 * @param sh stream to process
300 destroy_stream_handle (struct StreamHandle *sh)
302 struct GSF_StreamRequest *sr;
304 while (NULL != (sr = sh->pending_head))
306 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
307 GNUNET_TIME_UNIT_FOREVER_ABS,
309 GSF_stream_query_cancel (sr);
311 while (NULL != (sr = sh->waiting_head))
313 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
314 GNUNET_TIME_UNIT_FOREVER_ABS,
316 GSF_stream_query_cancel (sr);
319 GNUNET_STREAM_io_write_cancel (sh->wh);
321 GNUNET_STREAM_io_read_cancel (sh->rh);
322 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
323 GNUNET_SCHEDULER_cancel (sh->timeout_task);
324 GNUNET_STREAM_close (sh->stream);
325 GNUNET_assert (GNUNET_OK ==
326 GNUNET_CONTAINER_multihashmap_remove (stream_map,
327 &sh->target.hashPubKey,
334 * Transmit pending requests via the stream.
336 * @param sh stream to process
339 transmit_pending (struct StreamHandle *sh);
343 * Function called once the stream is ready for transmission.
345 * @param cls the 'struct StreamHandle'
346 * @param socket stream socket handle
349 stream_ready_cb (void *cls,
350 struct GNUNET_STREAM_Socket *socket)
352 struct StreamHandle *sh = cls;
354 sh->is_ready = GNUNET_YES;
355 transmit_pending (sh);
360 * We had a serious error, tear down and re-create stream from scratch.
362 * @param sh stream to reset
365 reset_stream (struct StreamHandle *sh)
367 struct GSF_StreamRequest *sr;
370 GNUNET_STREAM_io_read_cancel (sh->rh);
371 GNUNET_STREAM_close (sh->stream);
372 sh->is_ready = GNUNET_NO;
373 while (NULL != (sr = sh->waiting_tail))
375 GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
378 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
381 sr->was_transmitted = GNUNET_NO;
383 sh->stream = GNUNET_STREAM_open (GSF_cfg,
385 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
386 &stream_ready_cb, sh,
387 GNUNET_STREAM_OPTION_END);
392 * Task called when it is time to destroy an inactive stream.
394 * @param cls the 'struct StreamHandle' to tear down
395 * @param tc scheduler context, unused
398 stream_timeout (void *cls,
399 const struct GNUNET_SCHEDULER_TaskContext *tc)
401 struct StreamHandle *sh = cls;
403 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
404 destroy_stream_handle (sh);
409 * Task called when it is time to reset an stream.
411 * @param cls the 'struct StreamHandle' to tear down
412 * @param tc scheduler context, unused
415 reset_stream_task (void *cls,
416 const struct GNUNET_SCHEDULER_TaskContext *tc)
418 struct StreamHandle *sh = cls;
420 sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
426 * We had a serious error, tear down and re-create stream from scratch,
427 * but do so asynchronously.
429 * @param sh stream to reset
432 reset_stream_async (struct StreamHandle *sh)
434 if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
435 sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
441 * We got a reply from the stream. Process it.
443 * @param cls the struct StreamHandle
444 * @param status the status of the stream at the time this function is called
445 * @param data traffic from the other side
446 * @param size the number of bytes available in data read; will be 0 on timeout
447 * @return number of bytes of processed from 'data' (any data remaining should be
448 * given to the next time the read processor is called).
451 handle_stream_reply (void *cls,
452 enum GNUNET_STREAM_Status status,
456 struct StreamHandle *sh = cls;
460 GNUNET_SERVER_mst_receive (sh->mst,
463 GNUNET_NO, GNUNET_NO))
466 reset_stream_async (sh);
469 sh->rh = GNUNET_STREAM_read (sh->stream,
470 GNUNET_TIME_UNIT_FOREVER_REL,
471 &handle_stream_reply,
478 * Functions of this signature are called whenever we transmitted a
479 * query via a stream.
481 * @param cls the struct StreamHandle for which we did the write call
482 * @param status the status of the stream at the time this function is called;
483 * GNUNET_OK if writing to stream was completed successfully,
484 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
486 * @param size the number of bytes written
489 query_write_continuation (void *cls,
490 enum GNUNET_STREAM_Status status,
493 struct StreamHandle *sh = cls;
496 if ( (GNUNET_STREAM_OK != status) ||
497 (sizeof (struct StreamQueryMessage) != size) )
503 sh->rh = GNUNET_STREAM_read (sh->stream,
504 GNUNET_TIME_UNIT_FOREVER_REL,
505 &handle_stream_reply,
507 transmit_pending (sh);
512 * Transmit pending requests via the stream.
514 * @param sh stream to process
517 transmit_pending (struct StreamHandle *sh)
519 struct StreamQueryMessage sqm;
520 struct GSF_StreamRequest *sr;
524 sr = sh->pending_head;
527 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
530 GNUNET_CONTAINER_DLL_insert_tail (sh->waiting_head,
533 sr->was_transmitted = GNUNET_YES;
534 sqm.header.size = htons (sizeof (sqm));
535 sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
536 sqm.type = htonl (sr->type);
537 sqm.query = sr->query;
538 sh->wh = GNUNET_STREAM_write (sh->stream,
540 GNUNET_TIME_UNIT_FOREVER_REL,
541 &query_write_continuation,
547 * Functions with this signature are called whenever a
548 * complete reply is received.
550 * Do not call GNUNET_SERVER_mst_destroy in callback
552 * @param cls closure with the 'struct StreamHandle'
553 * @param client identification of the client, NULL
554 * @param message the actual message
555 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
560 const struct GNUNET_MessageHeader *message)
562 struct StreamHandle *sh = cls;
563 const struct StreamReplyMessage *srm;
565 enum GNUNET_BLOCK_Type type;
566 struct GNUNET_HashCode query;
567 struct GSF_StreamRequest *sr;
569 msize = ntohs (message->size);
570 switch (ntohs (message->type))
572 case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
573 if (sizeof (struct StreamReplyMessage) > msize)
576 reset_stream_async (sh);
577 return GNUNET_SYSERR;
579 srm = (const struct StreamReplyMessage *) message;
580 msize -= sizeof (struct StreamReplyMessage);
581 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
583 GNUNET_BLOCK_get_key (GSF_block_ctx,
585 &srm[1], msize, &query))
588 reset_stream_async (sh);
589 return GNUNET_SYSERR;
591 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
592 "Received reply `%s' via stream\n",
593 GNUNET_h2s (&query));
594 GNUNET_STATISTICS_update (GSF_stats,
595 gettext_noop ("# replies received via stream"), 1,
597 for (sr = sh->waiting_head; NULL != sr; sr = sr->next)
598 if (0 == memcmp (&query,
600 sizeof (struct GNUNET_HashCode)))
604 GNUNET_STATISTICS_update (GSF_stats,
605 gettext_noop ("# replies received via stream dropped"), 1,
609 sr->proc (sr->proc_cls,
611 GNUNET_TIME_absolute_ntoh (srm->expiration),
614 GSF_stream_query_cancel (sr);
618 reset_stream_async (sh);
619 return GNUNET_SYSERR;
625 * Get (or create) a stream to talk to the given peer.
627 * @param target peer we want to communicate with
629 static struct StreamHandle *
630 get_stream (const struct GNUNET_PeerIdentity *target)
632 struct StreamHandle *sh;
634 sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
635 &target->hashPubKey);
638 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
640 GNUNET_SCHEDULER_cancel (sh->timeout_task);
641 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
645 sh = GNUNET_malloc (sizeof (struct StreamHandle));
646 sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
648 sh->target = *target;
649 sh->stream = GNUNET_STREAM_open (GSF_cfg,
651 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
652 &stream_ready_cb, sh,
653 GNUNET_STREAM_OPTION_END);
654 GNUNET_assert (GNUNET_OK ==
655 GNUNET_CONTAINER_multihashmap_put (stream_map,
656 &sh->target.hashPubKey,
658 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
664 * Look for a block by directly contacting a particular peer.
666 * @param target peer that should have the block
667 * @param query hash to query for the block
668 * @param type desired type for the block
669 * @param proc function to call with result
670 * @param proc_cls closure for 'proc'
671 * @return handle to cancel the operation
673 struct GSF_StreamRequest *
674 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
675 const struct GNUNET_HashCode *query,
676 enum GNUNET_BLOCK_Type type,
677 GSF_StreamReplyProcessor proc, void *proc_cls)
679 struct StreamHandle *sh;
680 struct GSF_StreamRequest *sr;
682 sh = get_stream (target);
683 sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
686 sr->proc_cls = proc_cls;
689 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
692 if (GNUNET_YES == sh->is_ready)
693 transmit_pending (sh);
699 * Cancel an active request; must not be called after 'proc'
702 * @param sr request to cancel
705 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
707 struct StreamHandle *sh = sr->sh;
709 if (GNUNET_YES == sr->was_transmitted)
710 GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
714 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
718 if ( (NULL == sh->waiting_head) &&
719 (NULL == sh->pending_head) )
720 sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
726 /* ********************* server-side code ************************* */
730 * We're done with a particular client, clean up.
732 * @param sc client to clean up
735 terminate_stream (struct StreamClient *sc)
737 GNUNET_STATISTICS_update (GSF_stats,
738 gettext_noop ("# stream connections active"), -1,
740 if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
741 GNUNET_SCHEDULER_cancel (sc->terminate_task);
743 GNUNET_STREAM_io_read_cancel (sc->rh);
745 GNUNET_STREAM_io_write_cancel (sc->wh);
747 GNUNET_DATASTORE_cancel (sc->qe);
748 GNUNET_SERVER_mst_destroy (sc->mst);
749 GNUNET_STREAM_close (sc->socket);
750 GNUNET_CONTAINER_DLL_remove (sc_head,
758 * Task run to asynchronously terminate the stream.
760 * @param cls the 'struct StreamClient'
761 * @param tc scheduler context
764 terminate_stream_task (void *cls,
765 const struct GNUNET_SCHEDULER_TaskContext *tc)
767 struct StreamClient *sc = cls;
769 sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
770 terminate_stream (sc);
775 * We had a serious error, termiante stream,
776 * but do so asynchronously.
778 * @param sc stream to reset
781 terminate_stream_async (struct StreamClient *sc)
783 if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
784 sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
790 * Functions of this signature are called whenever data is available from the
793 * @param cls the closure from GNUNET_STREAM_read
794 * @param status the status of the stream at the time this function is called
795 * @param data traffic from the other side
796 * @param size the number of bytes available in data read; will be 0 on timeout
797 * @return number of bytes of processed from 'data' (any data remaining should be
798 * given to the next time the read processor is called).
801 process_request (void *cls,
802 enum GNUNET_STREAM_Status status,
808 * We're done handling a request from a client, read the next one.
810 * @param sc client to continue reading requests from
813 continue_reading (struct StreamClient *sc)
818 GNUNET_SERVER_mst_receive (sc->mst,
821 GNUNET_NO, GNUNET_YES);
822 if (GNUNET_NO == ret)
824 sc->rh = GNUNET_STREAM_read (sc->socket,
825 GNUNET_TIME_UNIT_FOREVER_REL,
832 * Functions of this signature are called whenever data is available from the
835 * @param cls the closure from GNUNET_STREAM_read
836 * @param status the status of the stream at the time this function is called
837 * @param data traffic from the other side
838 * @param size the number of bytes available in data read; will be 0 on timeout
839 * @return number of bytes of processed from 'data' (any data remaining should be
840 * given to the next time the read processor is called).
843 process_request (void *cls,
844 enum GNUNET_STREAM_Status status,
848 struct StreamClient *sc = cls;
854 case GNUNET_STREAM_OK:
856 GNUNET_SERVER_mst_receive (sc->mst,
859 GNUNET_NO, GNUNET_YES);
860 if (GNUNET_NO == ret)
861 return size; /* more messages in MST */
862 if (GNUNET_SYSERR == ret)
865 terminate_stream_async (sc);
869 case GNUNET_STREAM_TIMEOUT:
870 case GNUNET_STREAM_SHUTDOWN:
871 case GNUNET_STREAM_SYSERR:
872 case GNUNET_STREAM_BROKEN:
873 terminate_stream_async (sc);
879 continue_reading (sc);
885 * Sending a reply was completed, continue processing.
887 * @param cls closure with the struct StreamClient which sent the query
890 write_continuation (void *cls,
891 enum GNUNET_STREAM_Status status,
894 struct StreamClient *sc = cls;
897 if ( (GNUNET_STREAM_OK == status) &&
898 (size == sc->reply_size) )
900 GNUNET_STATISTICS_update (GSF_stats,
901 gettext_noop ("# Blocks transferred via stream"), 1,
903 continue_reading (sc);
906 terminate_stream (sc);
911 * Process a datum that was stored in the datastore.
913 * @param cls closure with the struct StreamClient which sent the query
914 * @param key key for the content
915 * @param size number of bytes in data
916 * @param data content stored
917 * @param type type of the content
918 * @param priority priority of the content
919 * @param anonymity anonymity-level for the content
920 * @param expiration expiration time for the content
921 * @param uid unique identifier for the datum;
922 * maybe 0 if no unique identifier is available
925 handle_datastore_reply (void *cls,
926 const struct GNUNET_HashCode * key,
927 size_t size, const void *data,
928 enum GNUNET_BLOCK_Type type,
931 struct GNUNET_TIME_Absolute
932 expiration, uint64_t uid)
934 struct StreamClient *sc = cls;
935 size_t msize = size + sizeof (struct StreamReplyMessage);
936 char buf[msize] GNUNET_ALIGN;
937 struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
940 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
943 GNUNET_FS_handle_on_demand_block (key,
947 &handle_datastore_reply,
950 continue_reading (sc);
954 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
957 continue_reading (sc);
960 srm->header.size = htons ((uint16_t) msize);
961 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
962 srm->type = htonl (type);
963 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
964 memcpy (&srm[1], data, size);
965 sc->reply_size = msize;
966 sc->wh = GNUNET_STREAM_write (sc->socket,
968 GNUNET_TIME_UNIT_FOREVER_REL,
973 terminate_stream (sc);
980 * Functions with this signature are called whenever a
981 * complete query message is received.
983 * Do not call GNUNET_SERVER_mst_destroy in callback
985 * @param cls closure with the 'struct StreamClient'
986 * @param client identification of the client, NULL
987 * @param message the actual message
988 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
991 request_cb (void *cls,
993 const struct GNUNET_MessageHeader *message)
995 struct StreamClient *sc = cls;
996 const struct StreamQueryMessage *sqm;
998 switch (ntohs (message->type))
1000 case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
1001 if (sizeof (struct StreamQueryMessage) !=
1002 ntohs (message->size))
1004 GNUNET_break_op (0);
1005 terminate_stream_async (sc);
1006 return GNUNET_SYSERR;
1008 sqm = (const struct StreamQueryMessage *) message;
1009 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010 "Received query for `%s' via stream\n",
1011 GNUNET_h2s (&sqm->query));
1012 GNUNET_STATISTICS_update (GSF_stats,
1013 gettext_noop ("# queries received via stream"), 1,
1015 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1020 GSF_datastore_queue_size,
1021 GNUNET_TIME_UNIT_FOREVER_REL,
1022 &handle_datastore_reply, sc);
1024 continue_reading (sc);
1027 GNUNET_break_op (0);
1028 terminate_stream_async (sc);
1029 return GNUNET_SYSERR;
1035 * Functions of this type are called upon new stream connection from other peers
1036 * or upon binding error which happen when the app_port given in
1037 * GNUNET_STREAM_listen() is already taken.
1039 * @param cls the closure from GNUNET_STREAM_listen
1040 * @param socket the socket representing the stream; NULL on binding error
1041 * @param initiator the identity of the peer who wants to establish a stream
1042 * with us; NULL on binding error
1043 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1044 * stream (the socket will be invalid after the call)
1047 accept_cb (void *cls,
1048 struct GNUNET_STREAM_Socket *socket,
1049 const struct GNUNET_PeerIdentity *initiator)
1051 struct StreamClient *sc;
1054 return GNUNET_SYSERR;
1055 GNUNET_STATISTICS_update (GSF_stats,
1056 gettext_noop ("# stream connections active"), 1,
1058 sc = GNUNET_malloc (sizeof (struct StreamClient));
1059 sc->socket = socket;
1060 sc->mst = GNUNET_SERVER_mst_create (&request_cb,
1062 sc->rh = GNUNET_STREAM_read (sc->socket,
1063 GNUNET_TIME_UNIT_FOREVER_REL,
1066 GNUNET_CONTAINER_DLL_insert (sc_head,
1074 * Initialize subsystem for non-anonymous file-sharing.
1079 stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1080 listen_socket = GNUNET_STREAM_listen (GSF_cfg,
1081 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1083 GNUNET_STREAM_OPTION_END);
1088 * Function called on each active streams to shut them down.
1091 * @param key target peer, unused
1092 * @param value the 'struct StreamHandle' to destroy
1093 * @return GNUNET_YES (continue to iterate)
1096 release_streams (void *cls,
1097 const struct GNUNET_HashCode *key,
1100 struct StreamHandle *sh = value;
1102 destroy_stream_handle (sh);
1108 * Shutdown subsystem for non-anonymous file-sharing.
1113 struct StreamClient *sc;
1115 while (NULL != (sc = sc_head))
1116 terminate_stream (sc);
1117 if (NULL != listen_socket)
1119 GNUNET_STREAM_listen_close (listen_socket);
1120 listen_socket = NULL;
1122 GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1125 GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1129 /* end of gnunet-service-fs_stream.c */