2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_stream.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 #include "gnunet_constants.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet-service-fs.h"
33 #include "gnunet-service-fs_indexing.h"
34 #include "gnunet-service-fs_stream.h"
37 * After how long do we termiante idle connections?
39 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
43 * Information we keep around for each active streaming client.
50 struct StreamClient *next;
55 struct StreamClient *prev;
58 * Socket for communication.
60 struct GNUNET_STREAM_Socket *socket;
63 * Handle for active read operation, or NULL.
65 struct GNUNET_STREAM_IOReadHandle *rh;
68 * Handle for active write operation, or NULL.
70 struct GNUNET_STREAM_IOWriteHandle *wh;
73 * Tokenizer for requests.
75 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
78 * Current active request to the datastore, if we have one pending.
80 struct GNUNET_DATASTORE_QueueEntry *qe;
83 * Task that is scheduled to asynchronously terminate the connection.
85 GNUNET_SCHEDULER_TaskIdentifier terminate_task;
88 * Task that is scheduled to terminate idle connections.
90 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
93 * Size of the last write that was initiated.
101 * Query from one peer, asking the other for CHK-data.
103 struct StreamQueryMessage
107 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
109 struct GNUNET_MessageHeader header;
112 * Block type must be DBLOCK or IBLOCK.
117 * Query hash from CHK (hash of encrypted block).
119 struct GNUNET_HashCode query;
125 * Reply to a StreamQueryMessage.
127 struct StreamReplyMessage
131 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
133 struct GNUNET_MessageHeader header;
136 * Block type must be DBLOCK or IBLOCK.
141 * Expiration time for the block.
143 struct GNUNET_TIME_AbsoluteNBO expiration;
145 /* followed by the encrypted block */
151 * Handle for a stream to another peer.
157 * Handle for a request that is going out via stream API.
159 struct GSF_StreamRequest
165 struct GSF_StreamRequest *next;
170 struct GSF_StreamRequest *prev;
173 * Which stream is this request associated with?
175 struct StreamHandle *sh;
178 * Function to call with the result.
180 GSF_StreamReplyProcessor proc;
188 * Query to transmit to the other peer.
190 struct GNUNET_HashCode query;
193 * Desired type for the reply.
195 enum GNUNET_BLOCK_Type type;
198 * Did we transmit this request already? YES if we are
199 * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
206 * Handle for a stream to another peer.
211 * Head of DLL of pending requests on this stream.
213 struct GSF_StreamRequest *pending_head;
216 * Tail of DLL of pending requests on this stream.
218 struct GSF_StreamRequest *pending_tail;
221 * Map from query to 'struct GSF_StreamRequest's waiting for
224 struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
227 * Connection to the other peer.
229 struct GNUNET_STREAM_Socket *stream;
232 * Handle for active read operation, or NULL.
234 struct GNUNET_STREAM_IOReadHandle *rh;
237 * Handle for active write operation, or NULL.
239 struct GNUNET_STREAM_IOWriteHandle *wh;
242 * Tokenizer for replies.
244 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
247 * Which peer does this stream go to?
249 struct GNUNET_PeerIdentity target;
252 * Task to kill inactive streams (we keep them around for
253 * a few seconds to give the application a chance to give
256 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
259 * Task to reset streams that had errors (asynchronously,
260 * as we may not be able to do it immediately during a
261 * callback from the stream API).
263 GNUNET_SCHEDULER_TaskIdentifier reset_task;
266 * Is this stream ready for transmission?
274 * Listen socket for incoming requests.
276 static struct GNUNET_STREAM_ListenSocket *listen_socket;
279 * Head of DLL of stream clients.
281 static struct StreamClient *sc_head;
284 * Tail of DLL of stream clients.
286 static struct StreamClient *sc_tail;
289 * Number of active stream clients in the 'sc_*'-DLL.
291 static unsigned int sc_count;
294 * Maximum allowed number of stream clients.
296 static unsigned long long sc_count_max;
299 * Map from peer identities to 'struct StreamHandles' with streams to
302 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
305 /* ********************* client-side code ************************* */
308 * Iterator called on each entry in a waiting map to
309 * call the 'proc' continuation and release associated
312 * @param cls the 'struct StreamHandle'
313 * @param key the key of the entry in the map (the query)
314 * @param value the 'struct GSF_StreamRequest' to clean up
315 * @return GNUNET_YES (continue to iterate)
318 free_waiting_entry (void *cls,
319 const struct GNUNET_HashCode *key,
322 struct GSF_StreamRequest *sr = value;
324 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
325 GNUNET_TIME_UNIT_FOREVER_ABS,
327 GSF_stream_query_cancel (sr);
333 * Destroy a stream handle.
335 * @param sh stream to process
338 destroy_stream_handle (struct StreamHandle *sh)
340 struct GSF_StreamRequest *sr;
342 while (NULL != (sr = sh->pending_head))
344 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
345 GNUNET_TIME_UNIT_FOREVER_ABS,
347 GSF_stream_query_cancel (sr);
349 GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
353 GNUNET_STREAM_io_write_cancel (sh->wh);
355 GNUNET_STREAM_io_read_cancel (sh->rh);
356 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
357 GNUNET_SCHEDULER_cancel (sh->timeout_task);
358 GNUNET_STREAM_close (sh->stream);
359 GNUNET_assert (GNUNET_OK ==
360 GNUNET_CONTAINER_multihashmap_remove (stream_map,
361 &sh->target.hashPubKey,
363 GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
369 * Transmit pending requests via the stream.
371 * @param sh stream to process
374 transmit_pending (struct StreamHandle *sh);
378 * Function called once the stream is ready for transmission.
380 * @param cls the 'struct StreamHandle'
381 * @param socket stream socket handle
384 stream_ready_cb (void *cls,
385 struct GNUNET_STREAM_Socket *socket)
387 struct StreamHandle *sh = cls;
389 sh->is_ready = GNUNET_YES;
390 transmit_pending (sh);
395 * Iterator called on each entry in a waiting map to
396 * move it back to the pending list.
398 * @param cls the 'struct StreamHandle'
399 * @param key the key of the entry in the map (the query)
400 * @param value the 'struct GSF_StreamRequest' to move to pending
401 * @return GNUNET_YES (continue to iterate)
404 move_to_pending (void *cls,
405 const struct GNUNET_HashCode *key,
408 struct StreamHandle *sh = cls;
409 struct GSF_StreamRequest *sr = value;
411 GNUNET_assert (GNUNET_YES ==
412 GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
415 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
418 sr->was_transmitted = GNUNET_NO;
424 * We had a serious error, tear down and re-create stream from scratch.
426 * @param sh stream to reset
429 reset_stream (struct StreamHandle *sh)
431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
432 "Resetting stream to %s\n",
433 GNUNET_i2s (&sh->target));
435 GNUNET_STREAM_io_read_cancel (sh->rh);
436 GNUNET_STREAM_close (sh->stream);
437 sh->is_ready = GNUNET_NO;
438 GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
441 sh->stream = GNUNET_STREAM_open (GSF_cfg,
443 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
444 &stream_ready_cb, sh,
445 GNUNET_STREAM_OPTION_END);
450 * Task called when it is time to destroy an inactive stream.
452 * @param cls the 'struct StreamHandle' to tear down
453 * @param tc scheduler context, unused
456 stream_timeout (void *cls,
457 const struct GNUNET_SCHEDULER_TaskContext *tc)
459 struct StreamHandle *sh = cls;
461 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
462 "Timeout on stream to %s\n",
463 GNUNET_i2s (&sh->target));
464 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
465 destroy_stream_handle (sh);
470 * Task called when it is time to reset an stream.
472 * @param cls the 'struct StreamHandle' to tear down
473 * @param tc scheduler context, unused
476 reset_stream_task (void *cls,
477 const struct GNUNET_SCHEDULER_TaskContext *tc)
479 struct StreamHandle *sh = cls;
481 sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
487 * We had a serious error, tear down and re-create stream from scratch,
488 * but do so asynchronously.
490 * @param sh stream to reset
493 reset_stream_async (struct StreamHandle *sh)
495 if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
496 sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
502 * We got a reply from the stream. Process it.
504 * @param cls the struct StreamHandle
505 * @param status the status of the stream at the time this function is called
506 * @param data traffic from the other side
507 * @param size the number of bytes available in data read; will be 0 on timeout
508 * @return number of bytes of processed from 'data' (any data remaining should be
509 * given to the next time the read processor is called).
512 handle_stream_reply (void *cls,
513 enum GNUNET_STREAM_Status status,
517 struct StreamHandle *sh = cls;
520 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521 "Received %u bytes from stream to %s\n",
523 GNUNET_i2s (&sh->target));
525 GNUNET_SERVER_mst_receive (sh->mst,
528 GNUNET_NO, GNUNET_NO))
531 reset_stream_async (sh);
534 sh->rh = GNUNET_STREAM_read (sh->stream,
535 GNUNET_TIME_UNIT_FOREVER_REL,
536 &handle_stream_reply,
543 * Functions of this signature are called whenever we transmitted a
544 * query via a stream.
546 * @param cls the struct StreamHandle for which we did the write call
547 * @param status the status of the stream at the time this function is called;
548 * GNUNET_OK if writing to stream was completed successfully,
549 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
551 * @param size the number of bytes written
554 query_write_continuation (void *cls,
555 enum GNUNET_STREAM_Status status,
558 struct StreamHandle *sh = cls;
561 if ( (GNUNET_STREAM_OK != status) ||
562 (sizeof (struct StreamQueryMessage) != size) )
567 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
568 "Successfully transmitted %u bytes via stream to %s\n",
570 GNUNET_i2s (&sh->target));
572 sh->rh = GNUNET_STREAM_read (sh->stream,
573 GNUNET_TIME_UNIT_FOREVER_REL,
574 &handle_stream_reply,
576 transmit_pending (sh);
581 * Transmit pending requests via the stream.
583 * @param sh stream to process
586 transmit_pending (struct StreamHandle *sh)
588 struct StreamQueryMessage sqm;
589 struct GSF_StreamRequest *sr;
593 sr = sh->pending_head;
596 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
599 GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
602 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
603 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
604 "Sending query via stream to %s\n",
605 GNUNET_i2s (&sh->target));
606 sr->was_transmitted = GNUNET_YES;
607 sqm.header.size = htons (sizeof (sqm));
608 sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
609 sqm.type = htonl (sr->type);
610 sqm.query = sr->query;
611 sh->wh = GNUNET_STREAM_write (sh->stream,
613 GNUNET_TIME_UNIT_FOREVER_REL,
614 &query_write_continuation,
620 * Closure for 'handle_reply'.
622 struct HandleReplyClosure
631 * Expiration time for the block.
633 struct GNUNET_TIME_Absolute expiration;
636 * Number of bytes in 'data'.
643 enum GNUNET_BLOCK_Type type;
646 * Did we have a matching query?
653 * Iterator called on each entry in a waiting map to
656 * @param cls the 'struct HandleReplyClosure'
657 * @param key the key of the entry in the map (the query)
658 * @param value the 'struct GSF_StreamRequest' to handle result for
659 * @return GNUNET_YES (continue to iterate)
662 handle_reply (void *cls,
663 const struct GNUNET_HashCode *key,
666 struct HandleReplyClosure *hrc = cls;
667 struct GSF_StreamRequest *sr = value;
669 sr->proc (sr->proc_cls,
674 GSF_stream_query_cancel (sr);
675 hrc->found = GNUNET_YES;
681 * Functions with this signature are called whenever a
682 * complete reply is received.
684 * Do not call GNUNET_SERVER_mst_destroy in callback
686 * @param cls closure with the 'struct StreamHandle'
687 * @param client identification of the client, NULL
688 * @param message the actual message
689 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
694 const struct GNUNET_MessageHeader *message)
696 struct StreamHandle *sh = cls;
697 const struct StreamReplyMessage *srm;
698 struct HandleReplyClosure hrc;
700 enum GNUNET_BLOCK_Type type;
701 struct GNUNET_HashCode query;
703 msize = ntohs (message->size);
704 switch (ntohs (message->type))
706 case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
707 if (sizeof (struct StreamReplyMessage) > msize)
710 reset_stream_async (sh);
711 return GNUNET_SYSERR;
713 srm = (const struct StreamReplyMessage *) message;
714 msize -= sizeof (struct StreamReplyMessage);
715 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
717 GNUNET_BLOCK_get_key (GSF_block_ctx,
719 &srm[1], msize, &query))
722 reset_stream_async (sh);
723 return GNUNET_SYSERR;
725 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
726 "Received reply `%s' via stream\n",
727 GNUNET_h2s (&query));
728 GNUNET_STATISTICS_update (GSF_stats,
729 gettext_noop ("# replies received via stream"), 1,
732 hrc.data_size = msize;
733 hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
735 hrc.found = GNUNET_NO;
736 GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
740 if (GNUNET_NO == hrc.found)
742 GNUNET_STATISTICS_update (GSF_stats,
743 gettext_noop ("# replies received via stream dropped"), 1,
750 reset_stream_async (sh);
751 return GNUNET_SYSERR;
757 * Get (or create) a stream to talk to the given peer.
759 * @param target peer we want to communicate with
761 static struct StreamHandle *
762 get_stream (const struct GNUNET_PeerIdentity *target)
764 struct StreamHandle *sh;
766 sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
767 &target->hashPubKey);
770 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
772 GNUNET_SCHEDULER_cancel (sh->timeout_task);
773 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
777 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
778 "Creating stream to %s\n",
779 GNUNET_i2s (target));
780 sh = GNUNET_malloc (sizeof (struct StreamHandle));
781 sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
783 sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
784 sh->target = *target;
785 sh->stream = GNUNET_STREAM_open (GSF_cfg,
787 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
788 &stream_ready_cb, sh,
789 GNUNET_STREAM_OPTION_END);
790 GNUNET_assert (GNUNET_OK ==
791 GNUNET_CONTAINER_multihashmap_put (stream_map,
792 &sh->target.hashPubKey,
794 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
800 * Look for a block by directly contacting a particular peer.
802 * @param target peer that should have the block
803 * @param query hash to query for the block
804 * @param type desired type for the block
805 * @param proc function to call with result
806 * @param proc_cls closure for 'proc'
807 * @return handle to cancel the operation
809 struct GSF_StreamRequest *
810 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
811 const struct GNUNET_HashCode *query,
812 enum GNUNET_BLOCK_Type type,
813 GSF_StreamReplyProcessor proc, void *proc_cls)
815 struct StreamHandle *sh;
816 struct GSF_StreamRequest *sr;
818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819 "Preparing to send query for %s via stream to %s\n",
821 GNUNET_i2s (target));
822 sh = get_stream (target);
823 sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
826 sr->proc_cls = proc_cls;
829 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
832 if (GNUNET_YES == sh->is_ready)
833 transmit_pending (sh);
839 * Cancel an active request; must not be called after 'proc'
842 * @param sr request to cancel
845 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
847 struct StreamHandle *sh = sr->sh;
849 if (GNUNET_YES == sr->was_transmitted)
850 GNUNET_assert (GNUNET_OK ==
851 GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
855 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
859 if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
860 (NULL == sh->pending_head) )
861 sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
867 /* ********************* server-side code ************************* */
871 * We're done with a particular client, clean up.
873 * @param sc client to clean up
876 terminate_stream (struct StreamClient *sc)
878 GNUNET_STATISTICS_update (GSF_stats,
879 gettext_noop ("# stream connections active"), -1,
881 if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
882 GNUNET_SCHEDULER_cancel (sc->terminate_task);
883 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
884 GNUNET_SCHEDULER_cancel (sc->timeout_task);
886 GNUNET_STREAM_io_read_cancel (sc->rh);
888 GNUNET_STREAM_io_write_cancel (sc->wh);
890 GNUNET_DATASTORE_cancel (sc->qe);
891 GNUNET_SERVER_mst_destroy (sc->mst);
892 GNUNET_STREAM_close (sc->socket);
893 GNUNET_CONTAINER_DLL_remove (sc_head,
902 * Task run to asynchronously terminate the stream.
904 * @param cls the 'struct StreamClient'
905 * @param tc scheduler context
908 terminate_stream_task (void *cls,
909 const struct GNUNET_SCHEDULER_TaskContext *tc)
911 struct StreamClient *sc = cls;
913 sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
914 terminate_stream (sc);
919 * Task run to asynchronously terminate the stream due to timeout.
921 * @param cls the 'struct StreamClient'
922 * @param tc scheduler context
925 timeout_stream_task (void *cls,
926 const struct GNUNET_SCHEDULER_TaskContext *tc)
928 struct StreamClient *sc = cls;
930 sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
931 terminate_stream (sc);
936 * Reset the timeout for the stream client (due to activity).
938 * @param sc client handle to reset timeout for
941 refresh_timeout_task (struct StreamClient *sc)
943 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
944 GNUNET_SCHEDULER_cancel (sc->timeout_task);
945 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
946 &timeout_stream_task,
952 * We had a serious error, termiante stream,
953 * but do so asynchronously.
955 * @param sc stream to reset
958 terminate_stream_async (struct StreamClient *sc)
960 if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
961 sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
967 * Functions of this signature are called whenever data is available from the
970 * @param cls the closure from GNUNET_STREAM_read
971 * @param status the status of the stream at the time this function is called
972 * @param data traffic from the other side
973 * @param size the number of bytes available in data read; will be 0 on timeout
974 * @return number of bytes of processed from 'data' (any data remaining should be
975 * given to the next time the read processor is called).
978 process_request (void *cls,
979 enum GNUNET_STREAM_Status status,
985 * We're done handling a request from a client, read the next one.
987 * @param sc client to continue reading requests from
990 continue_reading (struct StreamClient *sc)
995 GNUNET_SERVER_mst_receive (sc->mst,
998 GNUNET_NO, GNUNET_YES);
999 if (GNUNET_NO == ret)
1001 refresh_timeout_task (sc);
1002 sc->rh = GNUNET_STREAM_read (sc->socket,
1003 GNUNET_TIME_UNIT_FOREVER_REL,
1010 * Functions of this signature are called whenever data is available from the
1013 * @param cls the closure from GNUNET_STREAM_read
1014 * @param status the status of the stream at the time this function is called
1015 * @param data traffic from the other side
1016 * @param size the number of bytes available in data read; will be 0 on timeout
1017 * @return number of bytes of processed from 'data' (any data remaining should be
1018 * given to the next time the read processor is called).
1021 process_request (void *cls,
1022 enum GNUNET_STREAM_Status status,
1026 struct StreamClient *sc = cls;
1030 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1031 "Received %u byte query via stream\n",
1032 (unsigned int) size);
1035 case GNUNET_STREAM_OK:
1037 GNUNET_SERVER_mst_receive (sc->mst,
1040 GNUNET_NO, GNUNET_YES);
1041 if (GNUNET_NO == ret)
1042 return size; /* more messages in MST */
1043 if (GNUNET_SYSERR == ret)
1045 GNUNET_break_op (0);
1046 terminate_stream_async (sc);
1050 case GNUNET_STREAM_TIMEOUT:
1051 case GNUNET_STREAM_SHUTDOWN:
1052 case GNUNET_STREAM_SYSERR:
1053 terminate_stream_async (sc);
1059 continue_reading (sc);
1065 * Sending a reply was completed, continue processing.
1067 * @param cls closure with the struct StreamClient which sent the query
1068 * @param status result code for the operation
1069 * @param size number of bytes that were transmitted
1072 write_continuation (void *cls,
1073 enum GNUNET_STREAM_Status status,
1076 struct StreamClient *sc = cls;
1079 if ( (GNUNET_STREAM_OK == status) &&
1080 (size == sc->reply_size) )
1082 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083 "Transmitted %u byte reply via stream\n",
1084 (unsigned int) size);
1085 GNUNET_STATISTICS_update (GSF_stats,
1086 gettext_noop ("# Blocks transferred via stream"), 1,
1088 continue_reading (sc);
1092 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1093 "Transmission of reply failed, terminating stream\n");
1094 terminate_stream (sc);
1100 * Process a datum that was stored in the datastore.
1102 * @param cls closure with the struct StreamClient which sent the query
1103 * @param key key for the content
1104 * @param size number of bytes in data
1105 * @param data content stored
1106 * @param type type of the content
1107 * @param priority priority of the content
1108 * @param anonymity anonymity-level for the content
1109 * @param expiration expiration time for the content
1110 * @param uid unique identifier for the datum;
1111 * maybe 0 if no unique identifier is available
1114 handle_datastore_reply (void *cls,
1115 const struct GNUNET_HashCode * key,
1116 size_t size, const void *data,
1117 enum GNUNET_BLOCK_Type type,
1120 struct GNUNET_TIME_Absolute
1121 expiration, uint64_t uid)
1123 struct StreamClient *sc = cls;
1124 size_t msize = size + sizeof (struct StreamReplyMessage);
1125 char buf[msize] GNUNET_ALIGN;
1126 struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
1129 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1132 GNUNET_FS_handle_on_demand_block (key,
1134 priority, anonymity,
1136 &handle_datastore_reply,
1139 continue_reading (sc);
1143 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
1146 continue_reading (sc);
1149 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1150 "Starting transmission of %u byte reply via stream\n",
1151 (unsigned int) size);
1152 srm->header.size = htons ((uint16_t) msize);
1153 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
1154 srm->type = htonl (type);
1155 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1156 memcpy (&srm[1], data, size);
1157 sc->reply_size = msize;
1158 sc->wh = GNUNET_STREAM_write (sc->socket,
1160 GNUNET_TIME_UNIT_FOREVER_REL,
1161 &write_continuation,
1165 terminate_stream (sc);
1172 * Functions with this signature are called whenever a
1173 * complete query message is received.
1175 * Do not call GNUNET_SERVER_mst_destroy in callback
1177 * @param cls closure with the 'struct StreamClient'
1178 * @param client identification of the client, NULL
1179 * @param message the actual message
1180 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1183 request_cb (void *cls,
1185 const struct GNUNET_MessageHeader *message)
1187 struct StreamClient *sc = cls;
1188 const struct StreamQueryMessage *sqm;
1190 switch (ntohs (message->type))
1192 case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
1193 if (sizeof (struct StreamQueryMessage) !=
1194 ntohs (message->size))
1196 GNUNET_break_op (0);
1197 terminate_stream_async (sc);
1198 return GNUNET_SYSERR;
1200 sqm = (const struct StreamQueryMessage *) message;
1201 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1202 "Received query for `%s' via stream\n",
1203 GNUNET_h2s (&sqm->query));
1204 GNUNET_STATISTICS_update (GSF_stats,
1205 gettext_noop ("# queries received via stream"), 1,
1207 refresh_timeout_task (sc);
1208 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1213 GSF_datastore_queue_size,
1214 GNUNET_TIME_UNIT_FOREVER_REL,
1215 &handle_datastore_reply, sc);
1217 continue_reading (sc);
1220 GNUNET_break_op (0);
1221 terminate_stream_async (sc);
1222 return GNUNET_SYSERR;
1228 * Functions of this type are called upon new stream connection from other peers
1229 * or upon binding error which happen when the app_port given in
1230 * GNUNET_STREAM_listen() is already taken.
1232 * @param cls the closure from GNUNET_STREAM_listen
1233 * @param socket the socket representing the stream; NULL on binding error
1234 * @param initiator the identity of the peer who wants to establish a stream
1235 * with us; NULL on binding error
1236 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1237 * stream (the socket will be invalid after the call)
1240 accept_cb (void *cls,
1241 struct GNUNET_STREAM_Socket *socket,
1242 const struct GNUNET_PeerIdentity *initiator)
1244 struct StreamClient *sc;
1247 return GNUNET_SYSERR;
1248 if (sc_count >= sc_count_max)
1250 GNUNET_STATISTICS_update (GSF_stats,
1251 gettext_noop ("# stream client connections rejected"), 1,
1253 return GNUNET_SYSERR;
1255 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1256 "Accepting inbound stream connection from `%s'\n",
1257 GNUNET_i2s (initiator));
1258 GNUNET_STATISTICS_update (GSF_stats,
1259 gettext_noop ("# stream connections active"), 1,
1261 sc = GNUNET_malloc (sizeof (struct StreamClient));
1262 sc->socket = socket;
1263 sc->mst = GNUNET_SERVER_mst_create (&request_cb,
1265 sc->rh = GNUNET_STREAM_read (sc->socket,
1266 GNUNET_TIME_UNIT_FOREVER_REL,
1269 GNUNET_CONTAINER_DLL_insert (sc_head,
1273 refresh_timeout_task (sc);
1279 * Initialize subsystem for non-anonymous file-sharing.
1284 stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1286 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1288 "MAX_STREAM_CLIENTS",
1291 listen_socket = GNUNET_STREAM_listen (GSF_cfg,
1292 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1294 GNUNET_STREAM_OPTION_END);
1300 * Function called on each active streams to shut them down.
1303 * @param key target peer, unused
1304 * @param value the 'struct StreamHandle' to destroy
1305 * @return GNUNET_YES (continue to iterate)
1308 release_streams (void *cls,
1309 const struct GNUNET_HashCode *key,
1312 struct StreamHandle *sh = value;
1314 destroy_stream_handle (sh);
1320 * Shutdown subsystem for non-anonymous file-sharing.
1325 struct StreamClient *sc;
1327 while (NULL != (sc = sc_head))
1328 terminate_stream (sc);
1329 if (NULL != listen_socket)
1331 GNUNET_STREAM_listen_close (listen_socket);
1332 listen_socket = NULL;
1334 GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1337 GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1341 /* end of gnunet-service-fs_stream.c */