2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_stream.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - limit # concurrent clients
30 #include "gnunet_constants.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_applications.h"
35 #include "gnunet-service-fs.h"
36 #include "gnunet-service-fs_indexing.h"
37 #include "gnunet-service-fs_stream.h"
40 * After how long do we termiante idle connections?
42 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
46 * Information we keep around for each active streaming client.
53 struct StreamClient *next;
58 struct StreamClient *prev;
61 * Socket for communication.
63 struct GNUNET_STREAM_Socket *socket;
66 * Handle for active read operation, or NULL.
68 struct GNUNET_STREAM_IOReadHandle *rh;
71 * Handle for active write operation, or NULL.
73 struct GNUNET_STREAM_IOWriteHandle *wh;
76 * Tokenizer for requests.
78 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
81 * Current active request to the datastore, if we have one pending.
83 struct GNUNET_DATASTORE_QueueEntry *qe;
86 * Task that is scheduled to asynchronously terminate the connection.
88 GNUNET_SCHEDULER_TaskIdentifier terminate_task;
91 * Task that is scheduled to terminate idle connections.
93 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
96 * Size of the last write that was initiated.
104 * Query from one peer, asking the other for CHK-data.
106 struct StreamQueryMessage
110 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
112 struct GNUNET_MessageHeader header;
115 * Block type must be DBLOCK or IBLOCK.
120 * Query hash from CHK (hash of encrypted block).
122 struct GNUNET_HashCode query;
128 * Reply to a StreamQueryMessage.
130 struct StreamReplyMessage
134 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
136 struct GNUNET_MessageHeader header;
139 * Block type must be DBLOCK or IBLOCK.
144 * Expiration time for the block.
146 struct GNUNET_TIME_AbsoluteNBO expiration;
148 /* followed by the encrypted block */
154 * Handle for a stream to another peer.
160 * Handle for a request that is going out via stream API.
162 struct GSF_StreamRequest
168 struct GSF_StreamRequest *next;
173 struct GSF_StreamRequest *prev;
176 * Which stream is this request associated with?
178 struct StreamHandle *sh;
181 * Function to call with the result.
183 GSF_StreamReplyProcessor proc;
191 * Query to transmit to the other peer.
193 struct GNUNET_HashCode query;
196 * Desired type for the reply.
198 enum GNUNET_BLOCK_Type type;
201 * Did we transmit this request already? YES if we are
202 * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
209 * Handle for a stream to another peer.
214 * Head of DLL of pending requests on this stream.
216 struct GSF_StreamRequest *pending_head;
219 * Tail of DLL of pending requests on this stream.
221 struct GSF_StreamRequest *pending_tail;
224 * Map from query to 'struct GSF_StreamRequest's waiting for
227 struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
230 * Connection to the other peer.
232 struct GNUNET_STREAM_Socket *stream;
235 * Handle for active read operation, or NULL.
237 struct GNUNET_STREAM_IOReadHandle *rh;
240 * Handle for active write operation, or NULL.
242 struct GNUNET_STREAM_IOWriteHandle *wh;
245 * Tokenizer for replies.
247 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
250 * Which peer does this stream go to?
252 struct GNUNET_PeerIdentity target;
255 * Task to kill inactive streams (we keep them around for
256 * a few seconds to give the application a chance to give
259 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
262 * Task to reset streams that had errors (asynchronously,
263 * as we may not be able to do it immediately during a
264 * callback from the stream API).
266 GNUNET_SCHEDULER_TaskIdentifier reset_task;
269 * Is this stream ready for transmission?
277 * Listen socket for incoming requests.
279 static struct GNUNET_STREAM_ListenSocket *listen_socket;
282 * Head of DLL of stream clients.
284 static struct StreamClient *sc_head;
287 * Tail of DLL of stream clients.
289 static struct StreamClient *sc_tail;
292 * Map from peer identities to 'struct StreamHandles' with streams to
295 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
298 /* ********************* client-side code ************************* */
301 * Iterator called on each entry in a waiting map to
302 * call the 'proc' continuation and release associated
305 * @param cls the 'struct StreamHandle'
306 * @param key the key of the entry in the map (the query)
307 * @param value the 'struct GSF_StreamRequest' to clean up
308 * @return GNUNET_YES (continue to iterate)
311 free_waiting_entry (void *cls,
312 const struct GNUNET_HashCode *key,
315 struct GSF_StreamRequest *sr = value;
317 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
318 GNUNET_TIME_UNIT_FOREVER_ABS,
320 GSF_stream_query_cancel (sr);
326 * Destroy a stream handle.
328 * @param sh stream to process
331 destroy_stream_handle (struct StreamHandle *sh)
333 struct GSF_StreamRequest *sr;
335 while (NULL != (sr = sh->pending_head))
337 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
338 GNUNET_TIME_UNIT_FOREVER_ABS,
340 GSF_stream_query_cancel (sr);
342 GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
346 GNUNET_STREAM_io_write_cancel (sh->wh);
348 GNUNET_STREAM_io_read_cancel (sh->rh);
349 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
350 GNUNET_SCHEDULER_cancel (sh->timeout_task);
351 GNUNET_STREAM_close (sh->stream);
352 GNUNET_assert (GNUNET_OK ==
353 GNUNET_CONTAINER_multihashmap_remove (stream_map,
354 &sh->target.hashPubKey,
356 GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
362 * Transmit pending requests via the stream.
364 * @param sh stream to process
367 transmit_pending (struct StreamHandle *sh);
371 * Function called once the stream is ready for transmission.
373 * @param cls the 'struct StreamHandle'
374 * @param socket stream socket handle
377 stream_ready_cb (void *cls,
378 struct GNUNET_STREAM_Socket *socket)
380 struct StreamHandle *sh = cls;
382 sh->is_ready = GNUNET_YES;
383 transmit_pending (sh);
388 * Iterator called on each entry in a waiting map to
389 * move it back to the pending list.
391 * @param cls the 'struct StreamHandle'
392 * @param key the key of the entry in the map (the query)
393 * @param value the 'struct GSF_StreamRequest' to move to pending
394 * @return GNUNET_YES (continue to iterate)
397 move_to_pending (void *cls,
398 const struct GNUNET_HashCode *key,
401 struct StreamHandle *sh = cls;
402 struct GSF_StreamRequest *sr = value;
404 GNUNET_assert (GNUNET_YES ==
405 GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
408 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
411 sr->was_transmitted = GNUNET_NO;
417 * We had a serious error, tear down and re-create stream from scratch.
419 * @param sh stream to reset
422 reset_stream (struct StreamHandle *sh)
424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425 "Resetting stream to %s\n",
426 GNUNET_i2s (&sh->target));
428 GNUNET_STREAM_io_read_cancel (sh->rh);
429 GNUNET_STREAM_close (sh->stream);
430 sh->is_ready = GNUNET_NO;
431 GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
434 sh->stream = GNUNET_STREAM_open (GSF_cfg,
436 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
437 &stream_ready_cb, sh,
438 GNUNET_STREAM_OPTION_END);
443 * Task called when it is time to destroy an inactive stream.
445 * @param cls the 'struct StreamHandle' to tear down
446 * @param tc scheduler context, unused
449 stream_timeout (void *cls,
450 const struct GNUNET_SCHEDULER_TaskContext *tc)
452 struct StreamHandle *sh = cls;
454 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
455 "Timeout on stream to %s\n",
456 GNUNET_i2s (&sh->target));
457 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
458 destroy_stream_handle (sh);
463 * Task called when it is time to reset an stream.
465 * @param cls the 'struct StreamHandle' to tear down
466 * @param tc scheduler context, unused
469 reset_stream_task (void *cls,
470 const struct GNUNET_SCHEDULER_TaskContext *tc)
472 struct StreamHandle *sh = cls;
474 sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
480 * We had a serious error, tear down and re-create stream from scratch,
481 * but do so asynchronously.
483 * @param sh stream to reset
486 reset_stream_async (struct StreamHandle *sh)
488 if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
489 sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
495 * We got a reply from the stream. Process it.
497 * @param cls the struct StreamHandle
498 * @param status the status of the stream at the time this function is called
499 * @param data traffic from the other side
500 * @param size the number of bytes available in data read; will be 0 on timeout
501 * @return number of bytes of processed from 'data' (any data remaining should be
502 * given to the next time the read processor is called).
505 handle_stream_reply (void *cls,
506 enum GNUNET_STREAM_Status status,
510 struct StreamHandle *sh = cls;
513 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
514 "Received %u bytes from stream to %s\n",
516 GNUNET_i2s (&sh->target));
518 GNUNET_SERVER_mst_receive (sh->mst,
521 GNUNET_NO, GNUNET_NO))
524 reset_stream_async (sh);
527 sh->rh = GNUNET_STREAM_read (sh->stream,
528 GNUNET_TIME_UNIT_FOREVER_REL,
529 &handle_stream_reply,
536 * Functions of this signature are called whenever we transmitted a
537 * query via a stream.
539 * @param cls the struct StreamHandle for which we did the write call
540 * @param status the status of the stream at the time this function is called;
541 * GNUNET_OK if writing to stream was completed successfully,
542 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
544 * @param size the number of bytes written
547 query_write_continuation (void *cls,
548 enum GNUNET_STREAM_Status status,
551 struct StreamHandle *sh = cls;
554 if ( (GNUNET_STREAM_OK != status) ||
555 (sizeof (struct StreamQueryMessage) != size) )
560 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
561 "Successfully transmitted %u bytes via stream to %s\n",
563 GNUNET_i2s (&sh->target));
565 sh->rh = GNUNET_STREAM_read (sh->stream,
566 GNUNET_TIME_UNIT_FOREVER_REL,
567 &handle_stream_reply,
569 transmit_pending (sh);
574 * Transmit pending requests via the stream.
576 * @param sh stream to process
579 transmit_pending (struct StreamHandle *sh)
581 struct StreamQueryMessage sqm;
582 struct GSF_StreamRequest *sr;
586 sr = sh->pending_head;
589 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
592 GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
595 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
596 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
597 "Sending query via stream to %s\n",
598 GNUNET_i2s (&sh->target));
599 sr->was_transmitted = GNUNET_YES;
600 sqm.header.size = htons (sizeof (sqm));
601 sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
602 sqm.type = htonl (sr->type);
603 sqm.query = sr->query;
604 sh->wh = GNUNET_STREAM_write (sh->stream,
606 GNUNET_TIME_UNIT_FOREVER_REL,
607 &query_write_continuation,
613 * Closure for 'handle_reply'.
615 struct HandleReplyClosure
624 * Expiration time for the block.
626 struct GNUNET_TIME_Absolute expiration;
629 * Number of bytes in 'data'.
636 enum GNUNET_BLOCK_Type type;
639 * Did we have a matching query?
646 * Iterator called on each entry in a waiting map to
649 * @param cls the 'struct HandleReplyClosure'
650 * @param key the key of the entry in the map (the query)
651 * @param value the 'struct GSF_StreamRequest' to handle result for
652 * @return GNUNET_YES (continue to iterate)
655 handle_reply (void *cls,
656 const struct GNUNET_HashCode *key,
659 struct HandleReplyClosure *hrc = cls;
660 struct GSF_StreamRequest *sr = value;
662 sr->proc (sr->proc_cls,
667 GSF_stream_query_cancel (sr);
668 hrc->found = GNUNET_YES;
674 * Functions with this signature are called whenever a
675 * complete reply is received.
677 * Do not call GNUNET_SERVER_mst_destroy in callback
679 * @param cls closure with the 'struct StreamHandle'
680 * @param client identification of the client, NULL
681 * @param message the actual message
682 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
687 const struct GNUNET_MessageHeader *message)
689 struct StreamHandle *sh = cls;
690 const struct StreamReplyMessage *srm;
691 struct HandleReplyClosure hrc;
693 enum GNUNET_BLOCK_Type type;
694 struct GNUNET_HashCode query;
696 msize = ntohs (message->size);
697 switch (ntohs (message->type))
699 case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
700 if (sizeof (struct StreamReplyMessage) > msize)
703 reset_stream_async (sh);
704 return GNUNET_SYSERR;
706 srm = (const struct StreamReplyMessage *) message;
707 msize -= sizeof (struct StreamReplyMessage);
708 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
710 GNUNET_BLOCK_get_key (GSF_block_ctx,
712 &srm[1], msize, &query))
715 reset_stream_async (sh);
716 return GNUNET_SYSERR;
718 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719 "Received reply `%s' via stream\n",
720 GNUNET_h2s (&query));
721 GNUNET_STATISTICS_update (GSF_stats,
722 gettext_noop ("# replies received via stream"), 1,
725 hrc.data_size = msize;
726 hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
728 hrc.found = GNUNET_NO;
729 GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
733 if (GNUNET_NO == hrc.found)
735 GNUNET_STATISTICS_update (GSF_stats,
736 gettext_noop ("# replies received via stream dropped"), 1,
743 reset_stream_async (sh);
744 return GNUNET_SYSERR;
750 * Get (or create) a stream to talk to the given peer.
752 * @param target peer we want to communicate with
754 static struct StreamHandle *
755 get_stream (const struct GNUNET_PeerIdentity *target)
757 struct StreamHandle *sh;
759 sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
760 &target->hashPubKey);
763 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
765 GNUNET_SCHEDULER_cancel (sh->timeout_task);
766 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
770 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
771 "Creating stream to %s\n",
772 GNUNET_i2s (target));
773 sh = GNUNET_malloc (sizeof (struct StreamHandle));
774 sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
776 sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
777 sh->target = *target;
778 sh->stream = GNUNET_STREAM_open (GSF_cfg,
780 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
781 &stream_ready_cb, sh,
782 GNUNET_STREAM_OPTION_END);
783 GNUNET_assert (GNUNET_OK ==
784 GNUNET_CONTAINER_multihashmap_put (stream_map,
785 &sh->target.hashPubKey,
787 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
793 * Look for a block by directly contacting a particular peer.
795 * @param target peer that should have the block
796 * @param query hash to query for the block
797 * @param type desired type for the block
798 * @param proc function to call with result
799 * @param proc_cls closure for 'proc'
800 * @return handle to cancel the operation
802 struct GSF_StreamRequest *
803 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
804 const struct GNUNET_HashCode *query,
805 enum GNUNET_BLOCK_Type type,
806 GSF_StreamReplyProcessor proc, void *proc_cls)
808 struct StreamHandle *sh;
809 struct GSF_StreamRequest *sr;
811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812 "Preparing to send query for %s via stream to %s\n",
814 GNUNET_i2s (target));
815 sh = get_stream (target);
816 sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
819 sr->proc_cls = proc_cls;
822 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
825 if (GNUNET_YES == sh->is_ready)
826 transmit_pending (sh);
832 * Cancel an active request; must not be called after 'proc'
835 * @param sr request to cancel
838 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
840 struct StreamHandle *sh = sr->sh;
842 if (GNUNET_YES == sr->was_transmitted)
843 GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
847 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
851 if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
852 (NULL == sh->pending_head) )
853 sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
859 /* ********************* server-side code ************************* */
863 * We're done with a particular client, clean up.
865 * @param sc client to clean up
868 terminate_stream (struct StreamClient *sc)
870 GNUNET_STATISTICS_update (GSF_stats,
871 gettext_noop ("# stream connections active"), -1,
873 if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
874 GNUNET_SCHEDULER_cancel (sc->terminate_task);
875 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
876 GNUNET_SCHEDULER_cancel (sc->timeout_task);
878 GNUNET_STREAM_io_read_cancel (sc->rh);
880 GNUNET_STREAM_io_write_cancel (sc->wh);
882 GNUNET_DATASTORE_cancel (sc->qe);
883 GNUNET_SERVER_mst_destroy (sc->mst);
884 GNUNET_STREAM_close (sc->socket);
885 GNUNET_CONTAINER_DLL_remove (sc_head,
893 * Task run to asynchronously terminate the stream.
895 * @param cls the 'struct StreamClient'
896 * @param tc scheduler context
899 terminate_stream_task (void *cls,
900 const struct GNUNET_SCHEDULER_TaskContext *tc)
902 struct StreamClient *sc = cls;
904 sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
905 terminate_stream (sc);
910 * Task run to asynchronously terminate the stream due to timeout.
912 * @param cls the 'struct StreamClient'
913 * @param tc scheduler context
916 timeout_stream_task (void *cls,
917 const struct GNUNET_SCHEDULER_TaskContext *tc)
919 struct StreamClient *sc = cls;
921 sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
922 terminate_stream (sc);
927 * Reset the timeout for the stream client (due to activity).
929 * @param sc client handle to reset timeout for
932 refresh_timeout_task (struct StreamClient *sc)
934 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
935 GNUNET_SCHEDULER_cancel (sc->timeout_task);
936 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
937 &timeout_stream_task,
943 * We had a serious error, termiante stream,
944 * but do so asynchronously.
946 * @param sc stream to reset
949 terminate_stream_async (struct StreamClient *sc)
951 if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
952 sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
958 * Functions of this signature are called whenever data is available from the
961 * @param cls the closure from GNUNET_STREAM_read
962 * @param status the status of the stream at the time this function is called
963 * @param data traffic from the other side
964 * @param size the number of bytes available in data read; will be 0 on timeout
965 * @return number of bytes of processed from 'data' (any data remaining should be
966 * given to the next time the read processor is called).
969 process_request (void *cls,
970 enum GNUNET_STREAM_Status status,
976 * We're done handling a request from a client, read the next one.
978 * @param sc client to continue reading requests from
981 continue_reading (struct StreamClient *sc)
986 GNUNET_SERVER_mst_receive (sc->mst,
989 GNUNET_NO, GNUNET_YES);
990 if (GNUNET_NO == ret)
992 refresh_timeout_task (sc);
993 sc->rh = GNUNET_STREAM_read (sc->socket,
994 GNUNET_TIME_UNIT_FOREVER_REL,
1001 * Functions of this signature are called whenever data is available from the
1004 * @param cls the closure from GNUNET_STREAM_read
1005 * @param status the status of the stream at the time this function is called
1006 * @param data traffic from the other side
1007 * @param size the number of bytes available in data read; will be 0 on timeout
1008 * @return number of bytes of processed from 'data' (any data remaining should be
1009 * given to the next time the read processor is called).
1012 process_request (void *cls,
1013 enum GNUNET_STREAM_Status status,
1017 struct StreamClient *sc = cls;
1021 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022 "Received %u byte query via stream\n",
1023 (unsigned int) size);
1026 case GNUNET_STREAM_OK:
1028 GNUNET_SERVER_mst_receive (sc->mst,
1031 GNUNET_NO, GNUNET_YES);
1032 if (GNUNET_NO == ret)
1033 return size; /* more messages in MST */
1034 if (GNUNET_SYSERR == ret)
1036 GNUNET_break_op (0);
1037 terminate_stream_async (sc);
1041 case GNUNET_STREAM_TIMEOUT:
1042 case GNUNET_STREAM_SHUTDOWN:
1043 case GNUNET_STREAM_SYSERR:
1044 case GNUNET_STREAM_BROKEN:
1045 terminate_stream_async (sc);
1051 continue_reading (sc);
1057 * Sending a reply was completed, continue processing.
1059 * @param cls closure with the struct StreamClient which sent the query
1060 * @param status result code for the operation
1061 * @param size number of bytes that were transmitted
1064 write_continuation (void *cls,
1065 enum GNUNET_STREAM_Status status,
1068 struct StreamClient *sc = cls;
1071 if ( (GNUNET_STREAM_OK == status) &&
1072 (size == sc->reply_size) )
1074 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1075 "Transmitted %u byte reply via stream\n",
1076 (unsigned int) size);
1077 GNUNET_STATISTICS_update (GSF_stats,
1078 gettext_noop ("# Blocks transferred via stream"), 1,
1080 continue_reading (sc);
1084 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085 "Transmission of reply failed, terminating stream\n");
1086 terminate_stream (sc);
1092 * Process a datum that was stored in the datastore.
1094 * @param cls closure with the struct StreamClient which sent the query
1095 * @param key key for the content
1096 * @param size number of bytes in data
1097 * @param data content stored
1098 * @param type type of the content
1099 * @param priority priority of the content
1100 * @param anonymity anonymity-level for the content
1101 * @param expiration expiration time for the content
1102 * @param uid unique identifier for the datum;
1103 * maybe 0 if no unique identifier is available
1106 handle_datastore_reply (void *cls,
1107 const struct GNUNET_HashCode * key,
1108 size_t size, const void *data,
1109 enum GNUNET_BLOCK_Type type,
1112 struct GNUNET_TIME_Absolute
1113 expiration, uint64_t uid)
1115 struct StreamClient *sc = cls;
1116 size_t msize = size + sizeof (struct StreamReplyMessage);
1117 char buf[msize] GNUNET_ALIGN;
1118 struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
1121 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1124 GNUNET_FS_handle_on_demand_block (key,
1126 priority, anonymity,
1128 &handle_datastore_reply,
1131 continue_reading (sc);
1135 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
1138 continue_reading (sc);
1141 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1142 "Starting transmission of %u byte reply via stream\n",
1143 (unsigned int) size);
1144 srm->header.size = htons ((uint16_t) msize);
1145 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
1146 srm->type = htonl (type);
1147 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1148 memcpy (&srm[1], data, size);
1149 sc->reply_size = msize;
1150 sc->wh = GNUNET_STREAM_write (sc->socket,
1152 GNUNET_TIME_UNIT_FOREVER_REL,
1153 &write_continuation,
1157 terminate_stream (sc);
1164 * Functions with this signature are called whenever a
1165 * complete query message is received.
1167 * Do not call GNUNET_SERVER_mst_destroy in callback
1169 * @param cls closure with the 'struct StreamClient'
1170 * @param client identification of the client, NULL
1171 * @param message the actual message
1172 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1175 request_cb (void *cls,
1177 const struct GNUNET_MessageHeader *message)
1179 struct StreamClient *sc = cls;
1180 const struct StreamQueryMessage *sqm;
1182 switch (ntohs (message->type))
1184 case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
1185 if (sizeof (struct StreamQueryMessage) !=
1186 ntohs (message->size))
1188 GNUNET_break_op (0);
1189 terminate_stream_async (sc);
1190 return GNUNET_SYSERR;
1192 sqm = (const struct StreamQueryMessage *) message;
1193 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1194 "Received query for `%s' via stream\n",
1195 GNUNET_h2s (&sqm->query));
1196 GNUNET_STATISTICS_update (GSF_stats,
1197 gettext_noop ("# queries received via stream"), 1,
1199 refresh_timeout_task (sc);
1200 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1205 GSF_datastore_queue_size,
1206 GNUNET_TIME_UNIT_FOREVER_REL,
1207 &handle_datastore_reply, sc);
1209 continue_reading (sc);
1212 GNUNET_break_op (0);
1213 terminate_stream_async (sc);
1214 return GNUNET_SYSERR;
1220 * Functions of this type are called upon new stream connection from other peers
1221 * or upon binding error which happen when the app_port given in
1222 * GNUNET_STREAM_listen() is already taken.
1224 * @param cls the closure from GNUNET_STREAM_listen
1225 * @param socket the socket representing the stream; NULL on binding error
1226 * @param initiator the identity of the peer who wants to establish a stream
1227 * with us; NULL on binding error
1228 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1229 * stream (the socket will be invalid after the call)
1232 accept_cb (void *cls,
1233 struct GNUNET_STREAM_Socket *socket,
1234 const struct GNUNET_PeerIdentity *initiator)
1236 struct StreamClient *sc;
1239 return GNUNET_SYSERR;
1240 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1241 "Accepting inbound stream connection from `%s'\n",
1242 GNUNET_i2s (initiator));
1243 GNUNET_STATISTICS_update (GSF_stats,
1244 gettext_noop ("# stream connections active"), 1,
1246 sc = GNUNET_malloc (sizeof (struct StreamClient));
1247 sc->socket = socket;
1248 sc->mst = GNUNET_SERVER_mst_create (&request_cb,
1250 sc->rh = GNUNET_STREAM_read (sc->socket,
1251 GNUNET_TIME_UNIT_FOREVER_REL,
1254 GNUNET_CONTAINER_DLL_insert (sc_head,
1257 refresh_timeout_task (sc);
1263 * Initialize subsystem for non-anonymous file-sharing.
1268 stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1269 listen_socket = GNUNET_STREAM_listen (GSF_cfg,
1270 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1272 GNUNET_STREAM_OPTION_END);
1277 * Function called on each active streams to shut them down.
1280 * @param key target peer, unused
1281 * @param value the 'struct StreamHandle' to destroy
1282 * @return GNUNET_YES (continue to iterate)
1285 release_streams (void *cls,
1286 const struct GNUNET_HashCode *key,
1289 struct StreamHandle *sh = value;
1291 destroy_stream_handle (sh);
1297 * Shutdown subsystem for non-anonymous file-sharing.
1302 struct StreamClient *sc;
1304 while (NULL != (sc = sc_head))
1305 terminate_stream (sc);
1306 if (NULL != listen_socket)
1308 GNUNET_STREAM_listen_close (listen_socket);
1309 listen_socket = NULL;
1311 GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1314 GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1318 /* end of gnunet-service-fs_stream.c */