2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_stream.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 #include "gnunet_constants.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet-service-fs.h"
33 #include "gnunet-service-fs_indexing.h"
34 #include "gnunet-service-fs_stream.h"
37 * After how long do we termiante idle connections?
39 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
42 * After how long do we reset connections without replies?
44 #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
48 * A message in the queue to be written to the stream.
55 struct WriteQueueItem *next;
60 struct WriteQueueItem *prev;
63 * Number of bytes of payload, allocated at the end of this struct.
70 * Information we keep around for each active streaming client.
77 struct StreamClient *next;
82 struct StreamClient *prev;
85 * Socket for communication.
87 struct GNUNET_STREAM_Socket *socket;
90 * Handle for active read operation, or NULL.
92 struct GNUNET_STREAM_ReadHandle *rh;
95 * Handle for active write operation, or NULL.
97 struct GNUNET_STREAM_WriteHandle *wh;
100 * Head of write queue.
102 struct WriteQueueItem *wqi_head;
105 * Tail of write queue.
107 struct WriteQueueItem *wqi_tail;
110 * Tokenizer for requests.
112 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
115 * Current active request to the datastore, if we have one pending.
117 struct GNUNET_DATASTORE_QueueEntry *qe;
120 * Task that is scheduled to asynchronously terminate the connection.
122 GNUNET_SCHEDULER_TaskIdentifier terminate_task;
125 * Task that is scheduled to terminate idle connections.
127 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
130 * Size of the last write that was initiated.
138 * Query from one peer, asking the other for CHK-data.
140 struct StreamQueryMessage
144 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
146 struct GNUNET_MessageHeader header;
149 * Block type must be DBLOCK or IBLOCK.
154 * Query hash from CHK (hash of encrypted block).
156 struct GNUNET_HashCode query;
162 * Reply to a StreamQueryMessage.
164 struct StreamReplyMessage
168 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
170 struct GNUNET_MessageHeader header;
173 * Block type must be DBLOCK or IBLOCK.
178 * Expiration time for the block.
180 struct GNUNET_TIME_AbsoluteNBO expiration;
182 /* followed by the encrypted block */
188 * Handle for a stream to another peer.
194 * Handle for a request that is going out via stream API.
196 struct GSF_StreamRequest
202 struct GSF_StreamRequest *next;
207 struct GSF_StreamRequest *prev;
210 * Which stream is this request associated with?
212 struct StreamHandle *sh;
215 * Function to call with the result.
217 GSF_StreamReplyProcessor proc;
225 * Query to transmit to the other peer.
227 struct GNUNET_HashCode query;
230 * Desired type for the reply.
232 enum GNUNET_BLOCK_Type type;
235 * Did we transmit this request already? YES if we are
236 * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
243 * Handle for a stream to another peer.
248 * Head of DLL of pending requests on this stream.
250 struct GSF_StreamRequest *pending_head;
253 * Tail of DLL of pending requests on this stream.
255 struct GSF_StreamRequest *pending_tail;
258 * Map from query to 'struct GSF_StreamRequest's waiting for
261 struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
264 * Connection to the other peer.
266 struct GNUNET_STREAM_Socket *stream;
269 * Handle for active read operation, or NULL.
271 struct GNUNET_STREAM_ReadHandle *rh;
274 * Handle for active write operation, or NULL.
276 struct GNUNET_STREAM_WriteHandle *wh;
279 * Tokenizer for replies.
281 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
284 * Which peer does this stream go to?
286 struct GNUNET_PeerIdentity target;
289 * Task to kill inactive streams (we keep them around for
290 * a few seconds to give the application a chance to give
293 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
296 * Task to reset streams that had errors (asynchronously,
297 * as we may not be able to do it immediately during a
298 * callback from the stream API).
300 GNUNET_SCHEDULER_TaskIdentifier reset_task;
303 * Is this stream ready for transmission?
311 * Listen socket for incoming requests.
313 static struct GNUNET_STREAM_ListenSocket *listen_socket;
316 * Head of DLL of stream clients.
318 static struct StreamClient *sc_head;
321 * Tail of DLL of stream clients.
323 static struct StreamClient *sc_tail;
326 * Number of active stream clients in the 'sc_*'-DLL.
328 static unsigned int sc_count;
331 * Maximum allowed number of stream clients.
333 static unsigned long long sc_count_max;
336 * Map from peer identities to 'struct StreamHandles' with streams to
339 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
342 /* ********************* client-side code ************************* */
345 * Iterator called on each entry in a waiting map to
346 * call the 'proc' continuation and release associated
349 * @param cls the 'struct StreamHandle'
350 * @param key the key of the entry in the map (the query)
351 * @param value the 'struct GSF_StreamRequest' to clean up
352 * @return GNUNET_YES (continue to iterate)
355 free_waiting_entry (void *cls,
356 const struct GNUNET_HashCode *key,
359 struct GSF_StreamRequest *sr = value;
361 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
362 GNUNET_TIME_UNIT_FOREVER_ABS,
364 GSF_stream_query_cancel (sr);
370 * Destroy a stream handle.
372 * @param sh stream to process
375 destroy_stream_handle (struct StreamHandle *sh)
377 struct GSF_StreamRequest *sr;
379 while (NULL != (sr = sh->pending_head))
381 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
382 GNUNET_TIME_UNIT_FOREVER_ABS,
384 GSF_stream_query_cancel (sr);
386 GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
390 GNUNET_STREAM_write_cancel (sh->wh);
392 GNUNET_STREAM_read_cancel (sh->rh);
393 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
394 GNUNET_SCHEDULER_cancel (sh->timeout_task);
395 if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
396 GNUNET_SCHEDULER_cancel (sh->reset_task);
397 GNUNET_STREAM_close (sh->stream);
398 GNUNET_assert (GNUNET_OK ==
399 GNUNET_CONTAINER_multihashmap_remove (stream_map,
400 &sh->target.hashPubKey,
402 GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
408 * Transmit pending requests via the stream.
410 * @param sh stream to process
413 transmit_pending (struct StreamHandle *sh);
417 * Function called once the stream is ready for transmission.
419 * @param cls the 'struct StreamHandle'
420 * @param socket stream socket handle
423 stream_ready_cb (void *cls,
424 struct GNUNET_STREAM_Socket *socket)
426 struct StreamHandle *sh = cls;
428 sh->is_ready = GNUNET_YES;
429 transmit_pending (sh);
434 * Iterator called on each entry in a waiting map to
435 * move it back to the pending list.
437 * @param cls the 'struct StreamHandle'
438 * @param key the key of the entry in the map (the query)
439 * @param value the 'struct GSF_StreamRequest' to move to pending
440 * @return GNUNET_YES (continue to iterate)
443 move_to_pending (void *cls,
444 const struct GNUNET_HashCode *key,
447 struct StreamHandle *sh = cls;
448 struct GSF_StreamRequest *sr = value;
450 GNUNET_assert (GNUNET_YES ==
451 GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
454 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
457 sr->was_transmitted = GNUNET_NO;
463 * We had a serious error, tear down and re-create stream from scratch.
465 * @param sh stream to reset
468 reset_stream (struct StreamHandle *sh)
470 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471 "Resetting stream to %s\n",
472 GNUNET_i2s (&sh->target));
475 GNUNET_STREAM_read_cancel (sh->rh);
478 GNUNET_STREAM_close (sh->stream);
479 sh->is_ready = GNUNET_NO;
480 GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
483 sh->stream = GNUNET_STREAM_open (GSF_cfg,
485 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
486 &stream_ready_cb, sh,
487 GNUNET_STREAM_OPTION_END);
492 * Task called when it is time to destroy an inactive stream.
494 * @param cls the 'struct StreamHandle' to tear down
495 * @param tc scheduler context, unused
498 stream_timeout (void *cls,
499 const struct GNUNET_SCHEDULER_TaskContext *tc)
501 struct StreamHandle *sh = cls;
503 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
504 "Timeout on stream to %s\n",
505 GNUNET_i2s (&sh->target));
506 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
507 destroy_stream_handle (sh);
512 * Task called when it is time to reset an stream.
514 * @param cls the 'struct StreamHandle' to tear down
515 * @param tc scheduler context, unused
518 reset_stream_task (void *cls,
519 const struct GNUNET_SCHEDULER_TaskContext *tc)
521 struct StreamHandle *sh = cls;
523 sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
529 * We had a serious error, tear down and re-create stream from scratch,
530 * but do so asynchronously.
532 * @param sh stream to reset
535 reset_stream_async (struct StreamHandle *sh)
537 if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
538 GNUNET_SCHEDULER_cancel (sh->reset_task);
539 sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
545 * We got a reply from the stream. Process it.
547 * @param cls the struct StreamHandle
548 * @param status the status of the stream at the time this function is called
549 * @param data traffic from the other side
550 * @param size the number of bytes available in data read; will be 0 on timeout
551 * @return number of bytes of processed from 'data' (any data remaining should be
552 * given to the next time the read processor is called).
555 handle_stream_reply (void *cls,
556 enum GNUNET_STREAM_Status status,
560 struct StreamHandle *sh = cls;
563 GNUNET_SCHEDULER_cancel (sh->reset_task);
564 sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
567 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
568 "Received %u bytes from stream to %s\n",
570 GNUNET_i2s (&sh->target));
572 GNUNET_SERVER_mst_receive (sh->mst,
575 GNUNET_NO, GNUNET_NO))
578 reset_stream_async (sh);
582 sh->rh = GNUNET_STREAM_read (sh->stream,
583 GNUNET_TIME_UNIT_FOREVER_REL,
584 &handle_stream_reply,
591 * Functions of this signature are called whenever we transmitted a
592 * query via a stream.
594 * @param cls the struct StreamHandle for which we did the write call
595 * @param status the status of the stream at the time this function is called;
596 * GNUNET_OK if writing to stream was completed successfully,
597 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
599 * @param size the number of bytes written
602 query_write_continuation (void *cls,
603 enum GNUNET_STREAM_Status status,
606 struct StreamHandle *sh = cls;
609 if ( (GNUNET_STREAM_OK != status) ||
610 (sizeof (struct StreamQueryMessage) != size) )
615 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616 "Successfully transmitted %u bytes via stream to %s\n",
618 GNUNET_i2s (&sh->target));
620 sh->rh = GNUNET_STREAM_read (sh->stream,
621 GNUNET_TIME_UNIT_FOREVER_REL,
622 &handle_stream_reply,
624 transmit_pending (sh);
629 * Transmit pending requests via the stream.
631 * @param sh stream to process
634 transmit_pending (struct StreamHandle *sh)
636 struct StreamQueryMessage sqm;
637 struct GSF_StreamRequest *sr;
641 sr = sh->pending_head;
644 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
647 GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
650 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
652 "Sending query via stream to %s\n",
653 GNUNET_i2s (&sh->target));
654 sr->was_transmitted = GNUNET_YES;
655 sqm.header.size = htons (sizeof (sqm));
656 sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
657 sqm.type = htonl (sr->type);
658 sqm.query = sr->query;
659 sh->wh = GNUNET_STREAM_write (sh->stream,
661 GNUNET_TIME_UNIT_FOREVER_REL,
662 &query_write_continuation,
668 * Closure for 'handle_reply'.
670 struct HandleReplyClosure
679 * Expiration time for the block.
681 struct GNUNET_TIME_Absolute expiration;
684 * Number of bytes in 'data'.
691 enum GNUNET_BLOCK_Type type;
694 * Did we have a matching query?
701 * Iterator called on each entry in a waiting map to
704 * @param cls the 'struct HandleReplyClosure'
705 * @param key the key of the entry in the map (the query)
706 * @param value the 'struct GSF_StreamRequest' to handle result for
707 * @return GNUNET_YES (continue to iterate)
710 handle_reply (void *cls,
711 const struct GNUNET_HashCode *key,
714 struct HandleReplyClosure *hrc = cls;
715 struct GSF_StreamRequest *sr = value;
717 sr->proc (sr->proc_cls,
722 GSF_stream_query_cancel (sr);
723 hrc->found = GNUNET_YES;
729 * Functions with this signature are called whenever a
730 * complete reply is received.
732 * Do not call GNUNET_SERVER_mst_destroy in callback
734 * @param cls closure with the 'struct StreamHandle'
735 * @param client identification of the client, NULL
736 * @param message the actual message
737 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
742 const struct GNUNET_MessageHeader *message)
744 struct StreamHandle *sh = cls;
745 const struct StreamReplyMessage *srm;
746 struct HandleReplyClosure hrc;
748 enum GNUNET_BLOCK_Type type;
749 struct GNUNET_HashCode query;
751 msize = ntohs (message->size);
752 switch (ntohs (message->type))
754 case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
755 if (sizeof (struct StreamReplyMessage) > msize)
758 reset_stream_async (sh);
759 return GNUNET_SYSERR;
761 srm = (const struct StreamReplyMessage *) message;
762 msize -= sizeof (struct StreamReplyMessage);
763 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
765 GNUNET_BLOCK_get_key (GSF_block_ctx,
767 &srm[1], msize, &query))
770 reset_stream_async (sh);
771 return GNUNET_SYSERR;
773 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774 "Received reply `%s' via stream\n",
775 GNUNET_h2s (&query));
776 GNUNET_STATISTICS_update (GSF_stats,
777 gettext_noop ("# replies received via stream"), 1,
780 hrc.data_size = msize;
781 hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
783 hrc.found = GNUNET_NO;
784 GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
788 if (GNUNET_NO == hrc.found)
790 GNUNET_STATISTICS_update (GSF_stats,
791 gettext_noop ("# replies received via stream dropped"), 1,
798 reset_stream_async (sh);
799 return GNUNET_SYSERR;
805 * Get (or create) a stream to talk to the given peer.
807 * @param target peer we want to communicate with
809 static struct StreamHandle *
810 get_stream (const struct GNUNET_PeerIdentity *target)
812 struct StreamHandle *sh;
814 sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
815 &target->hashPubKey);
818 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
820 GNUNET_SCHEDULER_cancel (sh->timeout_task);
821 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
825 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
826 "Creating stream to %s\n",
827 GNUNET_i2s (target));
828 sh = GNUNET_malloc (sizeof (struct StreamHandle));
829 sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
832 sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
834 sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
835 sh->target = *target;
836 sh->stream = GNUNET_STREAM_open (GSF_cfg,
838 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
839 &stream_ready_cb, sh,
840 GNUNET_STREAM_OPTION_END);
841 GNUNET_assert (GNUNET_OK ==
842 GNUNET_CONTAINER_multihashmap_put (stream_map,
843 &sh->target.hashPubKey,
845 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
851 * Look for a block by directly contacting a particular peer.
853 * @param target peer that should have the block
854 * @param query hash to query for the block
855 * @param type desired type for the block
856 * @param proc function to call with result
857 * @param proc_cls closure for 'proc'
858 * @return handle to cancel the operation
860 struct GSF_StreamRequest *
861 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
862 const struct GNUNET_HashCode *query,
863 enum GNUNET_BLOCK_Type type,
864 GSF_StreamReplyProcessor proc, void *proc_cls)
866 struct StreamHandle *sh;
867 struct GSF_StreamRequest *sr;
869 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
870 "Preparing to send query for %s via stream to %s\n",
872 GNUNET_i2s (target));
873 sh = get_stream (target);
874 sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
877 sr->proc_cls = proc_cls;
880 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
883 if (GNUNET_YES == sh->is_ready)
884 transmit_pending (sh);
890 * Cancel an active request; must not be called after 'proc'
893 * @param sr request to cancel
896 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
898 struct StreamHandle *sh = sr->sh;
900 if (GNUNET_YES == sr->was_transmitted)
901 GNUNET_assert (GNUNET_OK ==
902 GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
906 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
910 if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
911 (NULL == sh->pending_head) )
912 sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
918 /* ********************* server-side code ************************* */
922 * We're done with a particular client, clean up.
924 * @param sc client to clean up
927 terminate_stream (struct StreamClient *sc)
929 GNUNET_STATISTICS_update (GSF_stats,
930 gettext_noop ("# stream connections active"), -1,
932 if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
933 GNUNET_SCHEDULER_cancel (sc->terminate_task);
934 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
935 GNUNET_SCHEDULER_cancel (sc->timeout_task);
937 GNUNET_STREAM_read_cancel (sc->rh);
939 GNUNET_STREAM_write_cancel (sc->wh);
941 GNUNET_DATASTORE_cancel (sc->qe);
942 GNUNET_SERVER_mst_destroy (sc->mst);
943 GNUNET_STREAM_close (sc->socket);
944 struct WriteQueueItem *wqi;
945 while (NULL != (wqi = sc->wqi_head))
947 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
954 GNUNET_CONTAINER_DLL_remove (sc_head,
963 * Task run to asynchronously terminate the stream.
965 * @param cls the 'struct StreamClient'
966 * @param tc scheduler context
969 terminate_stream_task (void *cls,
970 const struct GNUNET_SCHEDULER_TaskContext *tc)
972 struct StreamClient *sc = cls;
974 sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
975 terminate_stream (sc);
980 * Task run to asynchronously terminate the stream due to timeout.
982 * @param cls the 'struct StreamClient'
983 * @param tc scheduler context
986 timeout_stream_task (void *cls,
987 const struct GNUNET_SCHEDULER_TaskContext *tc)
989 struct StreamClient *sc = cls;
991 sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
992 terminate_stream (sc);
997 * Reset the timeout for the stream client (due to activity).
999 * @param sc client handle to reset timeout for
1002 refresh_timeout_task (struct StreamClient *sc)
1004 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
1005 GNUNET_SCHEDULER_cancel (sc->timeout_task);
1006 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
1007 &timeout_stream_task,
1013 * We had a serious error, termiante stream,
1014 * but do so asynchronously.
1016 * @param sc stream to reset
1019 terminate_stream_async (struct StreamClient *sc)
1021 if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
1022 sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
1028 * Functions of this signature are called whenever data is available from the
1031 * @param cls the closure from GNUNET_STREAM_read
1032 * @param status the status of the stream at the time this function is called
1033 * @param data traffic from the other side
1034 * @param size the number of bytes available in data read; will be 0 on timeout
1035 * @return number of bytes of processed from 'data' (any data remaining should be
1036 * given to the next time the read processor is called).
1039 process_request (void *cls,
1040 enum GNUNET_STREAM_Status status,
1046 * We're done handling a request from a client, read the next one.
1048 * @param sc client to continue reading requests from
1051 continue_reading (struct StreamClient *sc)
1056 GNUNET_SERVER_mst_receive (sc->mst,
1059 GNUNET_NO, GNUNET_NO);
1060 if (GNUNET_NO == ret)
1062 refresh_timeout_task (sc);
1065 sc->rh = GNUNET_STREAM_read (sc->socket,
1066 GNUNET_TIME_UNIT_FOREVER_REL,
1073 * Transmit the next entry from the write queue.
1075 * @param sc where to process the write queue
1078 continue_writing (struct StreamClient *sc);
1082 * Functions of this signature are called whenever data is available from the
1085 * @param cls the closure from GNUNET_STREAM_read
1086 * @param status the status of the stream at the time this function is called
1087 * @param data traffic from the other side
1088 * @param size the number of bytes available in data read; will be 0 on timeout
1089 * @return number of bytes of processed from 'data' (any data remaining should be
1090 * given to the next time the read processor is called).
1093 process_request (void *cls,
1094 enum GNUNET_STREAM_Status status,
1098 struct StreamClient *sc = cls;
1102 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103 "Received %u byte query via stream\n",
1104 (unsigned int) size);
1107 case GNUNET_STREAM_OK:
1109 GNUNET_SERVER_mst_receive (sc->mst,
1112 GNUNET_NO, GNUNET_NO);
1113 if (GNUNET_NO == ret)
1114 return size; /* more messages in MST */
1115 if (GNUNET_SYSERR == ret)
1117 GNUNET_break_op (0);
1118 terminate_stream_async (sc);
1122 case GNUNET_STREAM_TIMEOUT:
1123 case GNUNET_STREAM_SHUTDOWN:
1124 case GNUNET_STREAM_SYSERR:
1125 terminate_stream_async (sc);
1131 continue_writing (sc);
1137 * Sending a reply was completed, continue processing.
1139 * @param cls closure with the struct StreamClient which sent the query
1140 * @param status result code for the operation
1141 * @param size number of bytes that were transmitted
1144 write_continuation (void *cls,
1145 enum GNUNET_STREAM_Status status,
1148 struct StreamClient *sc = cls;
1151 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152 "Write continuation called on 'server' side with status %d\n",
1154 if ( (GNUNET_STREAM_OK != status) ||
1155 (size != sc->reply_size) )
1157 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1158 "Transmission of reply failed, terminating stream\n");
1159 terminate_stream (sc);
1162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163 "Transmitted %u byte reply via stream\n",
1164 (unsigned int) size);
1165 GNUNET_STATISTICS_update (GSF_stats,
1166 gettext_noop ("# Blocks transferred via stream"), 1,
1168 continue_writing (sc);
1173 * Transmit the next entry from the write queue.
1175 * @param sc where to process the write queue
1178 continue_writing (struct StreamClient *sc)
1180 struct WriteQueueItem *wqi;
1184 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185 "Write pending, waiting for it to complete\n");
1186 return; /* write already pending */
1188 if (NULL == (wqi = sc->wqi_head))
1190 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1191 "Write queue empty, reading more requests\n");
1192 continue_reading (sc);
1195 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
1198 sc->wh = GNUNET_STREAM_write (sc->socket,
1199 &wqi[1], wqi->msize,
1200 GNUNET_TIME_UNIT_FOREVER_REL,
1201 &write_continuation,
1204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1205 "Gave %u bytes for stream for transmission\n",
1206 (unsigned int) wqi->msize);
1210 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1211 "Write failed; terminating stream\n");
1212 terminate_stream (sc);
1219 * Process a datum that was stored in the datastore.
1221 * @param cls closure with the struct StreamClient which sent the query
1222 * @param key key for the content
1223 * @param size number of bytes in data
1224 * @param data content stored
1225 * @param type type of the content
1226 * @param priority priority of the content
1227 * @param anonymity anonymity-level for the content
1228 * @param expiration expiration time for the content
1229 * @param uid unique identifier for the datum;
1230 * maybe 0 if no unique identifier is available
1233 handle_datastore_reply (void *cls,
1234 const struct GNUNET_HashCode * key,
1235 size_t size, const void *data,
1236 enum GNUNET_BLOCK_Type type,
1239 struct GNUNET_TIME_Absolute
1240 expiration, uint64_t uid)
1242 struct StreamClient *sc = cls;
1243 size_t msize = size + sizeof (struct StreamReplyMessage);
1244 struct WriteQueueItem *wqi;
1245 struct StreamReplyMessage *srm;
1248 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1250 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1251 "Performing on-demand encoding\n");
1253 GNUNET_FS_handle_on_demand_block (key,
1255 priority, anonymity,
1257 &handle_datastore_reply,
1260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1261 "On-demand encoding request failed\n");
1262 continue_writing (sc);
1266 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
1269 continue_writing (sc);
1272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273 "Starting transmission of %u byte reply for query `%s' via stream\n",
1274 (unsigned int) size,
1276 wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
1278 srm = (struct StreamReplyMessage *) &wqi[1];
1279 srm->header.size = htons ((uint16_t) msize);
1280 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
1281 srm->type = htonl (type);
1282 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1283 memcpy (&srm[1], data, size);
1284 sc->reply_size = msize;
1285 GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
1288 continue_writing (sc);
1293 * Functions with this signature are called whenever a
1294 * complete query message is received.
1296 * Do not call GNUNET_SERVER_mst_destroy in callback
1298 * @param cls closure with the 'struct StreamClient'
1299 * @param client identification of the client, NULL
1300 * @param message the actual message
1301 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1304 request_cb (void *cls,
1306 const struct GNUNET_MessageHeader *message)
1308 struct StreamClient *sc = cls;
1309 const struct StreamQueryMessage *sqm;
1311 switch (ntohs (message->type))
1313 case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
1314 if (sizeof (struct StreamQueryMessage) !=
1315 ntohs (message->size))
1317 GNUNET_break_op (0);
1318 terminate_stream_async (sc);
1319 return GNUNET_SYSERR;
1321 sqm = (const struct StreamQueryMessage *) message;
1322 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1323 "Received query for `%s' via stream\n",
1324 GNUNET_h2s (&sqm->query));
1325 GNUNET_STATISTICS_update (GSF_stats,
1326 gettext_noop ("# queries received via stream"), 1,
1328 refresh_timeout_task (sc);
1329 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1334 GSF_datastore_queue_size,
1335 GNUNET_TIME_UNIT_FOREVER_REL,
1336 &handle_datastore_reply, sc);
1339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1340 "Queueing request with datastore failed (queue full?)\n");
1341 continue_writing (sc);
1345 GNUNET_break_op (0);
1346 terminate_stream_async (sc);
1347 return GNUNET_SYSERR;
1353 * Functions of this type are called upon new stream connection from other peers
1354 * or upon binding error which happen when the app_port given in
1355 * GNUNET_STREAM_listen() is already taken.
1357 * @param cls the closure from GNUNET_STREAM_listen
1358 * @param socket the socket representing the stream; NULL on binding error
1359 * @param initiator the identity of the peer who wants to establish a stream
1360 * with us; NULL on binding error
1361 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1362 * stream (the socket will be invalid after the call)
1365 accept_cb (void *cls,
1366 struct GNUNET_STREAM_Socket *socket,
1367 const struct GNUNET_PeerIdentity *initiator)
1369 struct StreamClient *sc;
1372 return GNUNET_SYSERR;
1373 if (sc_count >= sc_count_max)
1375 GNUNET_STATISTICS_update (GSF_stats,
1376 gettext_noop ("# stream client connections rejected"), 1,
1378 return GNUNET_SYSERR;
1380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1381 "Accepting inbound stream connection from `%s'\n",
1382 GNUNET_i2s (initiator));
1383 GNUNET_STATISTICS_update (GSF_stats,
1384 gettext_noop ("# stream connections active"), 1,
1386 sc = GNUNET_malloc (sizeof (struct StreamClient));
1387 sc->socket = socket;
1388 sc->mst = GNUNET_SERVER_mst_create (&request_cb,
1390 sc->rh = GNUNET_STREAM_read (sc->socket,
1391 GNUNET_TIME_UNIT_FOREVER_REL,
1394 GNUNET_CONTAINER_DLL_insert (sc_head,
1398 refresh_timeout_task (sc);
1404 * Initialize subsystem for non-anonymous file-sharing.
1409 stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1411 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1413 "MAX_STREAM_CLIENTS",
1416 listen_socket = GNUNET_STREAM_listen (GSF_cfg,
1417 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1419 GNUNET_STREAM_OPTION_END);
1425 * Function called on each active streams to shut them down.
1428 * @param key target peer, unused
1429 * @param value the 'struct StreamHandle' to destroy
1430 * @return GNUNET_YES (continue to iterate)
1433 release_streams (void *cls,
1434 const struct GNUNET_HashCode *key,
1437 struct StreamHandle *sh = value;
1439 destroy_stream_handle (sh);
1445 * Shutdown subsystem for non-anonymous file-sharing.
1450 struct StreamClient *sc;
1452 while (NULL != (sc = sc_head))
1453 terminate_stream (sc);
1454 if (NULL != listen_socket)
1456 GNUNET_STREAM_listen_close (listen_socket);
1457 listen_socket = NULL;
1459 GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1462 GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1466 /* end of gnunet-service-fs_stream.c */