2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_stream.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - limit # concurrent clients, have timeouts for server-side
30 #include "gnunet_constants.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_applications.h"
35 #include "gnunet-service-fs.h"
36 #include "gnunet-service-fs_indexing.h"
37 #include "gnunet-service-fs_stream.h"
40 * Information we keep around for each active streaming client.
47 struct StreamClient *next;
52 struct StreamClient *prev;
55 * Socket for communication.
57 struct GNUNET_STREAM_Socket *socket;
60 * Handle for active read operation, or NULL.
62 struct GNUNET_STREAM_IOReadHandle *rh;
65 * Handle for active write operation, or NULL.
67 struct GNUNET_STREAM_IOWriteHandle *wh;
70 * Tokenizer for requests.
72 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
75 * Current active request to the datastore, if we have one pending.
77 struct GNUNET_DATASTORE_QueueEntry *qe;
80 * Task that is scheduled to asynchronously terminate the connection.
82 GNUNET_SCHEDULER_TaskIdentifier terminate_task;
85 * Size of the last write that was initiated.
93 * Query from one peer, asking the other for CHK-data.
95 struct StreamQueryMessage
99 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
101 struct GNUNET_MessageHeader header;
104 * Block type must be DBLOCK or IBLOCK.
109 * Query hash from CHK (hash of encrypted block).
111 struct GNUNET_HashCode query;
117 * Reply to a StreamQueryMessage.
119 struct StreamReplyMessage
123 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
125 struct GNUNET_MessageHeader header;
128 * Block type must be DBLOCK or IBLOCK.
133 * Expiration time for the block.
135 struct GNUNET_TIME_AbsoluteNBO expiration;
137 /* followed by the encrypted block */
143 * Handle for a stream to another peer.
149 * Handle for a request that is going out via stream API.
151 struct GSF_StreamRequest
157 struct GSF_StreamRequest *next;
162 struct GSF_StreamRequest *prev;
165 * Which stream is this request associated with?
167 struct StreamHandle *sh;
170 * Function to call with the result.
172 GSF_StreamReplyProcessor proc;
180 * Query to transmit to the other peer.
182 struct GNUNET_HashCode query;
185 * Desired type for the reply.
187 enum GNUNET_BLOCK_Type type;
190 * Did we transmit this request already? YES if we are
191 * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
198 * Handle for a stream to another peer.
203 * Head of DLL of pending requests on this stream.
205 struct GSF_StreamRequest *pending_head;
208 * Tail of DLL of pending requests on this stream.
210 struct GSF_StreamRequest *pending_tail;
213 * Head of DLL of requests waiting for a reply on this stream.
215 struct GSF_StreamRequest *waiting_head;
218 * Tail of DLL of requests waiting for a reply on this stream.
220 struct GSF_StreamRequest *waiting_tail;
223 * Connection to the other peer.
225 struct GNUNET_STREAM_Socket *stream;
228 * Handle for active read operation, or NULL.
230 struct GNUNET_STREAM_IOReadHandle *rh;
233 * Handle for active write operation, or NULL.
235 struct GNUNET_STREAM_IOWriteHandle *wh;
238 * Tokenizer for replies.
240 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
243 * Which peer does this stream go to?
245 struct GNUNET_PeerIdentity target;
248 * Task to kill inactive streams (we keep them around for
249 * a few seconds to give the application a chance to give
252 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
255 * Task to reset streams that had errors (asynchronously,
256 * as we may not be able to do it immediately during a
257 * callback from the stream API).
259 GNUNET_SCHEDULER_TaskIdentifier reset_task;
262 * Is this stream ready for transmission?
270 * Listen socket for incoming requests.
272 static struct GNUNET_STREAM_ListenSocket *listen_socket;
275 * Head of DLL of stream clients.
277 static struct StreamClient *sc_head;
280 * Tail of DLL of stream clients.
282 static struct StreamClient *sc_tail;
285 * Map from peer identities to 'struct StreamHandles' with streams to
288 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
291 /* ********************* client-side code ************************* */
295 * Destroy a stream handle.
297 * @param sh stream to process
300 destroy_stream_handle (struct StreamHandle *sh)
302 struct GSF_StreamRequest *sr;
304 while (NULL != (sr = sh->pending_head))
306 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
307 GNUNET_TIME_UNIT_FOREVER_ABS,
309 GSF_stream_query_cancel (sr);
311 while (NULL != (sr = sh->waiting_head))
313 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
314 GNUNET_TIME_UNIT_FOREVER_ABS,
316 GSF_stream_query_cancel (sr);
319 GNUNET_STREAM_io_write_cancel (sh->wh);
321 GNUNET_STREAM_io_read_cancel (sh->rh);
322 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
323 GNUNET_SCHEDULER_cancel (sh->timeout_task);
324 GNUNET_STREAM_close (sh->stream);
325 GNUNET_assert (GNUNET_OK ==
326 GNUNET_CONTAINER_multihashmap_remove (stream_map,
327 &sh->target.hashPubKey,
334 * Transmit pending requests via the stream.
336 * @param sh stream to process
339 transmit_pending (struct StreamHandle *sh);
343 * Function called once the stream is ready for transmission.
345 * @param cls the 'struct StreamHandle'
346 * @param socket stream socket handle
349 stream_ready_cb (void *cls,
350 struct GNUNET_STREAM_Socket *socket)
352 struct StreamHandle *sh = cls;
354 sh->is_ready = GNUNET_YES;
355 transmit_pending (sh);
360 * We had a serious error, tear down and re-create stream from scratch.
362 * @param sh stream to reset
365 reset_stream (struct StreamHandle *sh)
367 struct GSF_StreamRequest *sr;
369 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
370 "Resetting stream to %s\n",
371 GNUNET_i2s (&sh->target));
373 GNUNET_STREAM_io_read_cancel (sh->rh);
374 GNUNET_STREAM_close (sh->stream);
375 sh->is_ready = GNUNET_NO;
376 while (NULL != (sr = sh->waiting_tail))
378 GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
381 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
384 sr->was_transmitted = GNUNET_NO;
386 sh->stream = GNUNET_STREAM_open (GSF_cfg,
388 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
389 &stream_ready_cb, sh,
390 GNUNET_STREAM_OPTION_END);
395 * Task called when it is time to destroy an inactive stream.
397 * @param cls the 'struct StreamHandle' to tear down
398 * @param tc scheduler context, unused
401 stream_timeout (void *cls,
402 const struct GNUNET_SCHEDULER_TaskContext *tc)
404 struct StreamHandle *sh = cls;
406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
407 "Timeout on stream to %s\n",
408 GNUNET_i2s (&sh->target));
409 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
410 destroy_stream_handle (sh);
415 * Task called when it is time to reset an stream.
417 * @param cls the 'struct StreamHandle' to tear down
418 * @param tc scheduler context, unused
421 reset_stream_task (void *cls,
422 const struct GNUNET_SCHEDULER_TaskContext *tc)
424 struct StreamHandle *sh = cls;
426 sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
432 * We had a serious error, tear down and re-create stream from scratch,
433 * but do so asynchronously.
435 * @param sh stream to reset
438 reset_stream_async (struct StreamHandle *sh)
440 if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
441 sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
447 * We got a reply from the stream. Process it.
449 * @param cls the struct StreamHandle
450 * @param status the status of the stream at the time this function is called
451 * @param data traffic from the other side
452 * @param size the number of bytes available in data read; will be 0 on timeout
453 * @return number of bytes of processed from 'data' (any data remaining should be
454 * given to the next time the read processor is called).
457 handle_stream_reply (void *cls,
458 enum GNUNET_STREAM_Status status,
462 struct StreamHandle *sh = cls;
465 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
466 "Received %u bytes from stream to %s\n",
468 GNUNET_i2s (&sh->target));
470 GNUNET_SERVER_mst_receive (sh->mst,
473 GNUNET_NO, GNUNET_NO))
476 reset_stream_async (sh);
479 sh->rh = GNUNET_STREAM_read (sh->stream,
480 GNUNET_TIME_UNIT_FOREVER_REL,
481 &handle_stream_reply,
488 * Functions of this signature are called whenever we transmitted a
489 * query via a stream.
491 * @param cls the struct StreamHandle for which we did the write call
492 * @param status the status of the stream at the time this function is called;
493 * GNUNET_OK if writing to stream was completed successfully,
494 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
496 * @param size the number of bytes written
499 query_write_continuation (void *cls,
500 enum GNUNET_STREAM_Status status,
503 struct StreamHandle *sh = cls;
506 if ( (GNUNET_STREAM_OK != status) ||
507 (sizeof (struct StreamQueryMessage) != size) )
512 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513 "Successfully transmitted %u bytes via stream to %s\n",
515 GNUNET_i2s (&sh->target));
517 sh->rh = GNUNET_STREAM_read (sh->stream,
518 GNUNET_TIME_UNIT_FOREVER_REL,
519 &handle_stream_reply,
521 transmit_pending (sh);
526 * Transmit pending requests via the stream.
528 * @param sh stream to process
531 transmit_pending (struct StreamHandle *sh)
533 struct StreamQueryMessage sqm;
534 struct GSF_StreamRequest *sr;
538 sr = sh->pending_head;
541 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
544 GNUNET_CONTAINER_DLL_insert_tail (sh->waiting_head,
547 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
548 "Sending query via stream to %s\n",
549 GNUNET_i2s (&sh->target));
550 sr->was_transmitted = GNUNET_YES;
551 sqm.header.size = htons (sizeof (sqm));
552 sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
553 sqm.type = htonl (sr->type);
554 sqm.query = sr->query;
555 sh->wh = GNUNET_STREAM_write (sh->stream,
557 GNUNET_TIME_UNIT_FOREVER_REL,
558 &query_write_continuation,
564 * Functions with this signature are called whenever a
565 * complete reply is received.
567 * Do not call GNUNET_SERVER_mst_destroy in callback
569 * @param cls closure with the 'struct StreamHandle'
570 * @param client identification of the client, NULL
571 * @param message the actual message
572 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
577 const struct GNUNET_MessageHeader *message)
579 struct StreamHandle *sh = cls;
580 const struct StreamReplyMessage *srm;
582 enum GNUNET_BLOCK_Type type;
583 struct GNUNET_HashCode query;
584 struct GSF_StreamRequest *sr;
586 msize = ntohs (message->size);
587 switch (ntohs (message->type))
589 case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
590 if (sizeof (struct StreamReplyMessage) > msize)
593 reset_stream_async (sh);
594 return GNUNET_SYSERR;
596 srm = (const struct StreamReplyMessage *) message;
597 msize -= sizeof (struct StreamReplyMessage);
598 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
600 GNUNET_BLOCK_get_key (GSF_block_ctx,
602 &srm[1], msize, &query))
605 reset_stream_async (sh);
606 return GNUNET_SYSERR;
608 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
609 "Received reply `%s' via stream\n",
610 GNUNET_h2s (&query));
611 GNUNET_STATISTICS_update (GSF_stats,
612 gettext_noop ("# replies received via stream"), 1,
614 for (sr = sh->waiting_head; NULL != sr; sr = sr->next)
615 if (0 == memcmp (&query,
617 sizeof (struct GNUNET_HashCode)))
621 GNUNET_STATISTICS_update (GSF_stats,
622 gettext_noop ("# replies received via stream dropped"), 1,
626 sr->proc (sr->proc_cls,
628 GNUNET_TIME_absolute_ntoh (srm->expiration),
631 GSF_stream_query_cancel (sr);
635 reset_stream_async (sh);
636 return GNUNET_SYSERR;
642 * Get (or create) a stream to talk to the given peer.
644 * @param target peer we want to communicate with
646 static struct StreamHandle *
647 get_stream (const struct GNUNET_PeerIdentity *target)
649 struct StreamHandle *sh;
651 sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
652 &target->hashPubKey);
655 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
657 GNUNET_SCHEDULER_cancel (sh->timeout_task);
658 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
662 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
663 "Creating stream to %s\n",
664 GNUNET_i2s (target));
665 sh = GNUNET_malloc (sizeof (struct StreamHandle));
666 sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
668 sh->target = *target;
669 sh->stream = GNUNET_STREAM_open (GSF_cfg,
671 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
672 &stream_ready_cb, sh,
673 GNUNET_STREAM_OPTION_END);
674 GNUNET_assert (GNUNET_OK ==
675 GNUNET_CONTAINER_multihashmap_put (stream_map,
676 &sh->target.hashPubKey,
678 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
684 * Look for a block by directly contacting a particular peer.
686 * @param target peer that should have the block
687 * @param query hash to query for the block
688 * @param type desired type for the block
689 * @param proc function to call with result
690 * @param proc_cls closure for 'proc'
691 * @return handle to cancel the operation
693 struct GSF_StreamRequest *
694 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
695 const struct GNUNET_HashCode *query,
696 enum GNUNET_BLOCK_Type type,
697 GSF_StreamReplyProcessor proc, void *proc_cls)
699 struct StreamHandle *sh;
700 struct GSF_StreamRequest *sr;
702 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
703 "Preparing to send query for %s via stream to %s\n",
705 GNUNET_i2s (target));
706 sh = get_stream (target);
707 sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
710 sr->proc_cls = proc_cls;
713 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
716 if (GNUNET_YES == sh->is_ready)
717 transmit_pending (sh);
723 * Cancel an active request; must not be called after 'proc'
726 * @param sr request to cancel
729 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
731 struct StreamHandle *sh = sr->sh;
733 if (GNUNET_YES == sr->was_transmitted)
734 GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
738 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
742 if ( (NULL == sh->waiting_head) &&
743 (NULL == sh->pending_head) )
744 sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
750 /* ********************* server-side code ************************* */
754 * We're done with a particular client, clean up.
756 * @param sc client to clean up
759 terminate_stream (struct StreamClient *sc)
761 GNUNET_STATISTICS_update (GSF_stats,
762 gettext_noop ("# stream connections active"), -1,
764 if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
765 GNUNET_SCHEDULER_cancel (sc->terminate_task);
767 GNUNET_STREAM_io_read_cancel (sc->rh);
769 GNUNET_STREAM_io_write_cancel (sc->wh);
771 GNUNET_DATASTORE_cancel (sc->qe);
772 GNUNET_SERVER_mst_destroy (sc->mst);
773 GNUNET_STREAM_close (sc->socket);
774 GNUNET_CONTAINER_DLL_remove (sc_head,
782 * Task run to asynchronously terminate the stream.
784 * @param cls the 'struct StreamClient'
785 * @param tc scheduler context
788 terminate_stream_task (void *cls,
789 const struct GNUNET_SCHEDULER_TaskContext *tc)
791 struct StreamClient *sc = cls;
793 sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
794 terminate_stream (sc);
799 * We had a serious error, termiante stream,
800 * but do so asynchronously.
802 * @param sc stream to reset
805 terminate_stream_async (struct StreamClient *sc)
807 if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
808 sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
814 * Functions of this signature are called whenever data is available from the
817 * @param cls the closure from GNUNET_STREAM_read
818 * @param status the status of the stream at the time this function is called
819 * @param data traffic from the other side
820 * @param size the number of bytes available in data read; will be 0 on timeout
821 * @return number of bytes of processed from 'data' (any data remaining should be
822 * given to the next time the read processor is called).
825 process_request (void *cls,
826 enum GNUNET_STREAM_Status status,
832 * We're done handling a request from a client, read the next one.
834 * @param sc client to continue reading requests from
837 continue_reading (struct StreamClient *sc)
842 GNUNET_SERVER_mst_receive (sc->mst,
845 GNUNET_NO, GNUNET_YES);
846 if (GNUNET_NO == ret)
848 sc->rh = GNUNET_STREAM_read (sc->socket,
849 GNUNET_TIME_UNIT_FOREVER_REL,
856 * Functions of this signature are called whenever data is available from the
859 * @param cls the closure from GNUNET_STREAM_read
860 * @param status the status of the stream at the time this function is called
861 * @param data traffic from the other side
862 * @param size the number of bytes available in data read; will be 0 on timeout
863 * @return number of bytes of processed from 'data' (any data remaining should be
864 * given to the next time the read processor is called).
867 process_request (void *cls,
868 enum GNUNET_STREAM_Status status,
872 struct StreamClient *sc = cls;
876 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
877 "Received %u byte query via stream\n",
878 (unsigned int) size);
881 case GNUNET_STREAM_OK:
883 GNUNET_SERVER_mst_receive (sc->mst,
886 GNUNET_NO, GNUNET_YES);
887 if (GNUNET_NO == ret)
888 return size; /* more messages in MST */
889 if (GNUNET_SYSERR == ret)
892 terminate_stream_async (sc);
896 case GNUNET_STREAM_TIMEOUT:
897 case GNUNET_STREAM_SHUTDOWN:
898 case GNUNET_STREAM_SYSERR:
899 case GNUNET_STREAM_BROKEN:
900 terminate_stream_async (sc);
906 continue_reading (sc);
912 * Sending a reply was completed, continue processing.
914 * @param cls closure with the struct StreamClient which sent the query
917 write_continuation (void *cls,
918 enum GNUNET_STREAM_Status status,
921 struct StreamClient *sc = cls;
924 if ( (GNUNET_STREAM_OK == status) &&
925 (size == sc->reply_size) )
927 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
928 "Transmitted %u byte reply via stream\n",
929 (unsigned int) size);
930 GNUNET_STATISTICS_update (GSF_stats,
931 gettext_noop ("# Blocks transferred via stream"), 1,
933 continue_reading (sc);
937 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938 "Transmission of reply failed, terminating stream\n");
939 terminate_stream (sc);
945 * Process a datum that was stored in the datastore.
947 * @param cls closure with the struct StreamClient which sent the query
948 * @param key key for the content
949 * @param size number of bytes in data
950 * @param data content stored
951 * @param type type of the content
952 * @param priority priority of the content
953 * @param anonymity anonymity-level for the content
954 * @param expiration expiration time for the content
955 * @param uid unique identifier for the datum;
956 * maybe 0 if no unique identifier is available
959 handle_datastore_reply (void *cls,
960 const struct GNUNET_HashCode * key,
961 size_t size, const void *data,
962 enum GNUNET_BLOCK_Type type,
965 struct GNUNET_TIME_Absolute
966 expiration, uint64_t uid)
968 struct StreamClient *sc = cls;
969 size_t msize = size + sizeof (struct StreamReplyMessage);
970 char buf[msize] GNUNET_ALIGN;
971 struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
974 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
977 GNUNET_FS_handle_on_demand_block (key,
981 &handle_datastore_reply,
984 continue_reading (sc);
988 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
991 continue_reading (sc);
994 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
995 "Starting transmission of %u byte reply via stream\n",
996 (unsigned int) size);
997 srm->header.size = htons ((uint16_t) msize);
998 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
999 srm->type = htonl (type);
1000 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1001 memcpy (&srm[1], data, size);
1002 sc->reply_size = msize;
1003 sc->wh = GNUNET_STREAM_write (sc->socket,
1005 GNUNET_TIME_UNIT_FOREVER_REL,
1006 &write_continuation,
1010 terminate_stream (sc);
1017 * Functions with this signature are called whenever a
1018 * complete query message is received.
1020 * Do not call GNUNET_SERVER_mst_destroy in callback
1022 * @param cls closure with the 'struct StreamClient'
1023 * @param client identification of the client, NULL
1024 * @param message the actual message
1025 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1028 request_cb (void *cls,
1030 const struct GNUNET_MessageHeader *message)
1032 struct StreamClient *sc = cls;
1033 const struct StreamQueryMessage *sqm;
1035 switch (ntohs (message->type))
1037 case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
1038 if (sizeof (struct StreamQueryMessage) !=
1039 ntohs (message->size))
1041 GNUNET_break_op (0);
1042 terminate_stream_async (sc);
1043 return GNUNET_SYSERR;
1045 sqm = (const struct StreamQueryMessage *) message;
1046 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1047 "Received query for `%s' via stream\n",
1048 GNUNET_h2s (&sqm->query));
1049 GNUNET_STATISTICS_update (GSF_stats,
1050 gettext_noop ("# queries received via stream"), 1,
1052 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1057 GSF_datastore_queue_size,
1058 GNUNET_TIME_UNIT_FOREVER_REL,
1059 &handle_datastore_reply, sc);
1061 continue_reading (sc);
1064 GNUNET_break_op (0);
1065 terminate_stream_async (sc);
1066 return GNUNET_SYSERR;
1072 * Functions of this type are called upon new stream connection from other peers
1073 * or upon binding error which happen when the app_port given in
1074 * GNUNET_STREAM_listen() is already taken.
1076 * @param cls the closure from GNUNET_STREAM_listen
1077 * @param socket the socket representing the stream; NULL on binding error
1078 * @param initiator the identity of the peer who wants to establish a stream
1079 * with us; NULL on binding error
1080 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1081 * stream (the socket will be invalid after the call)
1084 accept_cb (void *cls,
1085 struct GNUNET_STREAM_Socket *socket,
1086 const struct GNUNET_PeerIdentity *initiator)
1088 struct StreamClient *sc;
1091 return GNUNET_SYSERR;
1092 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1093 "Accepting inbound stream connection from `%s'\n",
1094 GNUNET_i2s (initiator));
1095 GNUNET_STATISTICS_update (GSF_stats,
1096 gettext_noop ("# stream connections active"), 1,
1098 sc = GNUNET_malloc (sizeof (struct StreamClient));
1099 sc->socket = socket;
1100 sc->mst = GNUNET_SERVER_mst_create (&request_cb,
1102 sc->rh = GNUNET_STREAM_read (sc->socket,
1103 GNUNET_TIME_UNIT_FOREVER_REL,
1106 GNUNET_CONTAINER_DLL_insert (sc_head,
1114 * Initialize subsystem for non-anonymous file-sharing.
1119 stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1120 listen_socket = GNUNET_STREAM_listen (GSF_cfg,
1121 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1123 GNUNET_STREAM_OPTION_END);
1128 * Function called on each active streams to shut them down.
1131 * @param key target peer, unused
1132 * @param value the 'struct StreamHandle' to destroy
1133 * @return GNUNET_YES (continue to iterate)
1136 release_streams (void *cls,
1137 const struct GNUNET_HashCode *key,
1140 struct StreamHandle *sh = value;
1142 destroy_stream_handle (sh);
1148 * Shutdown subsystem for non-anonymous file-sharing.
1153 struct StreamClient *sc;
1155 while (NULL != (sc = sc_head))
1156 terminate_stream (sc);
1157 if (NULL != listen_socket)
1159 GNUNET_STREAM_listen_close (listen_socket);
1160 listen_socket = NULL;
1162 GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1165 GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1169 /* end of gnunet-service-fs_stream.c */