2 This file is part of GNUnet.
3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_mesh.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - update comments on functions (still matches 'mesh')
28 * - MESH2 API doesn't allow flow control for server yet (needed!)
29 * - likely need to register clean up handler with mesh to handle
30 * client disconnect (likely leaky right now)
31 * - server is optional, currently client code will NPE if we have
32 * no server, again MESH2 API requirement forcing this for now
33 * - message handlers are symmetric for client/server, should be
34 * separated (currently clients can get requests and servers can
35 * handle answers, not good)
36 * - code is entirely untested
37 * - might have overlooked a few possible simplifications
38 * - PORT is set to old application type, unsure if we should keep
39 * it that way (fine for now)
42 #include "gnunet_constants.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet_mesh2_service.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_applications.h"
47 #include "gnunet-service-fs.h"
48 #include "gnunet-service-fs_indexing.h"
49 #include "gnunet-service-fs_mesh.h"
52 * After how long do we termiante idle connections?
54 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
57 * After how long do we reset connections without replies?
59 #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
63 * A message in the queue to be written to the mesh.
70 struct WriteQueueItem *next;
75 struct WriteQueueItem *prev;
78 * Number of bytes of payload, allocated at the end of this struct.
85 * Information we keep around for each active meshing client.
92 struct StreamClient *next;
97 struct StreamClient *prev;
100 * Socket for communication.
102 struct GNUNET_MESH_Tunnel *socket;
105 * Handle for active write operation, or NULL.
107 struct GNUNET_MESH_TransmitHandle *wh;
110 * Head of write queue.
112 struct WriteQueueItem *wqi_head;
115 * Tail of write queue.
117 struct WriteQueueItem *wqi_tail;
120 * Current active request to the datastore, if we have one pending.
122 struct GNUNET_DATASTORE_QueueEntry *qe;
125 * Task that is scheduled to asynchronously terminate the connection.
127 GNUNET_SCHEDULER_TaskIdentifier terminate_task;
130 * Task that is scheduled to terminate idle connections.
132 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
135 * Size of the last write that was initiated.
143 * Query from one peer, asking the other for CHK-data.
145 struct StreamQueryMessage
149 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
151 struct GNUNET_MessageHeader header;
154 * Block type must be DBLOCK or IBLOCK.
159 * Query hash from CHK (hash of encrypted block).
161 struct GNUNET_HashCode query;
167 * Reply to a StreamQueryMessage.
169 struct StreamReplyMessage
173 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
175 struct GNUNET_MessageHeader header;
178 * Block type must be DBLOCK or IBLOCK.
183 * Expiration time for the block.
185 struct GNUNET_TIME_AbsoluteNBO expiration;
187 /* followed by the encrypted block */
193 * Handle for a mesh to another peer.
199 * Handle for a request that is going out via mesh API.
201 struct GSF_StreamRequest
207 struct GSF_StreamRequest *next;
212 struct GSF_StreamRequest *prev;
215 * Which mesh is this request associated with?
217 struct StreamHandle *sh;
220 * Function to call with the result.
222 GSF_StreamReplyProcessor proc;
230 * Query to transmit to the other peer.
232 struct GNUNET_HashCode query;
235 * Desired type for the reply.
237 enum GNUNET_BLOCK_Type type;
240 * Did we transmit this request already? YES if we are
241 * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
248 * Handle for a mesh to another peer.
253 * Head of DLL of pending requests on this mesh.
255 struct GSF_StreamRequest *pending_head;
258 * Tail of DLL of pending requests on this mesh.
260 struct GSF_StreamRequest *pending_tail;
263 * Map from query to 'struct GSF_StreamRequest's waiting for
266 struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
269 * Connection to the other peer.
271 struct GNUNET_MESH_Tunnel *mesh;
274 * Handle for active write operation, or NULL.
276 struct GNUNET_MESH_TransmitHandle *wh;
279 * Which peer does this mesh go to?
281 struct GNUNET_PeerIdentity target;
284 * Task to kill inactive meshs (we keep them around for
285 * a few seconds to give the application a chance to give
288 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
291 * Task to reset meshs that had errors (asynchronously,
292 * as we may not be able to do it immediately during a
293 * callback from the mesh API).
295 GNUNET_SCHEDULER_TaskIdentifier reset_task;
298 * Is this mesh ready for transmission?
306 * Listen socket for incoming requests.
308 static struct GNUNET_MESH_Handle *listen_socket;
311 * Head of DLL of mesh clients.
313 static struct StreamClient *sc_head;
316 * Tail of DLL of mesh clients.
318 static struct StreamClient *sc_tail;
321 * Number of active mesh clients in the 'sc_*'-DLL.
323 static unsigned int sc_count;
326 * Maximum allowed number of mesh clients.
328 static unsigned long long sc_count_max;
331 * Map from peer identities to 'struct StreamHandles' with meshs to
334 static struct GNUNET_CONTAINER_MultiHashMap *mesh_map;
337 /* ********************* client-side code ************************* */
340 * Iterator called on each entry in a waiting map to
341 * call the 'proc' continuation and release associated
344 * @param cls the 'struct StreamHandle'
345 * @param key the key of the entry in the map (the query)
346 * @param value the 'struct GSF_StreamRequest' to clean up
347 * @return GNUNET_YES (continue to iterate)
350 free_waiting_entry (void *cls,
351 const struct GNUNET_HashCode *key,
354 struct GSF_StreamRequest *sr = value;
356 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
357 GNUNET_TIME_UNIT_FOREVER_ABS,
359 GSF_mesh_query_cancel (sr);
365 * Destroy a mesh handle.
367 * @param sh mesh to process
370 destroy_mesh_handle (struct StreamHandle *sh)
372 struct GSF_StreamRequest *sr;
374 while (NULL != (sr = sh->pending_head))
376 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
377 GNUNET_TIME_UNIT_FOREVER_ABS,
379 GSF_mesh_query_cancel (sr);
381 GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
385 GNUNET_MESH_notify_transmit_ready_cancel (sh->wh);
386 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
387 GNUNET_SCHEDULER_cancel (sh->timeout_task);
388 if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
389 GNUNET_SCHEDULER_cancel (sh->reset_task);
390 GNUNET_MESH_tunnel_destroy (sh->mesh);
391 GNUNET_assert (GNUNET_OK ==
392 GNUNET_CONTAINER_multihashmap_remove (mesh_map,
393 &sh->target.hashPubKey,
395 GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
401 * Transmit pending requests via the mesh.
403 * @param sh mesh to process
406 transmit_pending (struct StreamHandle *sh);
410 * Iterator called on each entry in a waiting map to
411 * move it back to the pending list.
413 * @param cls the 'struct StreamHandle'
414 * @param key the key of the entry in the map (the query)
415 * @param value the 'struct GSF_StreamRequest' to move to pending
416 * @return GNUNET_YES (continue to iterate)
419 move_to_pending (void *cls,
420 const struct GNUNET_HashCode *key,
423 struct StreamHandle *sh = cls;
424 struct GSF_StreamRequest *sr = value;
426 GNUNET_assert (GNUNET_YES ==
427 GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
430 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
433 sr->was_transmitted = GNUNET_NO;
439 * We had a serious error, tear down and re-create mesh from scratch.
441 * @param sh mesh to reset
444 reset_mesh (struct StreamHandle *sh)
446 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
447 "Resetting mesh to %s\n",
448 GNUNET_i2s (&sh->target));
449 GNUNET_MESH_tunnel_destroy (sh->mesh);
450 sh->is_ready = GNUNET_NO;
451 GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
454 sh->mesh = GNUNET_MESH_tunnel_create (listen_socket,
457 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
462 * Task called when it is time to destroy an inactive mesh.
464 * @param cls the 'struct StreamHandle' to tear down
465 * @param tc scheduler context, unused
468 mesh_timeout (void *cls,
469 const struct GNUNET_SCHEDULER_TaskContext *tc)
471 struct StreamHandle *sh = cls;
473 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
474 "Timeout on mesh to %s\n",
475 GNUNET_i2s (&sh->target));
476 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
477 destroy_mesh_handle (sh);
482 * Task called when it is time to reset an mesh.
484 * @param cls the 'struct StreamHandle' to tear down
485 * @param tc scheduler context, unused
488 reset_mesh_task (void *cls,
489 const struct GNUNET_SCHEDULER_TaskContext *tc)
491 struct StreamHandle *sh = cls;
493 sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
499 * We had a serious error, tear down and re-create mesh from scratch,
500 * but do so asynchronously.
502 * @param sh mesh to reset
505 reset_mesh_async (struct StreamHandle *sh)
507 if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
508 GNUNET_SCHEDULER_cancel (sh->reset_task);
509 sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_mesh_task,
515 * Functions of this signature are called whenever we are ready to transmit
518 * @param cls the struct StreamHandle for which we did the write call
519 * @param size the number of bytes that can be written to 'buf'
520 * @param buf where to write the message
521 * @return number of bytes written to 'buf'
524 transmit_sqm (void *cls,
528 struct StreamHandle *sh = cls;
529 struct StreamQueryMessage sqm;
530 struct GSF_StreamRequest *sr;
538 sr = sh->pending_head;
541 GNUNET_assert (size >= sizeof (struct StreamQueryMessage));
542 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
545 GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
548 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550 "Sending query via mesh to %s\n",
551 GNUNET_i2s (&sh->target));
552 sr->was_transmitted = GNUNET_YES;
553 sqm.header.size = htons (sizeof (sqm));
554 sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
555 sqm.type = htonl (sr->type);
556 sqm.query = sr->query;
557 memcpy (buf, &sqm, sizeof (sqm));
558 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
559 "Successfully transmitted %u bytes via mesh to %s\n",
561 GNUNET_i2s (&sh->target));
562 transmit_pending (sh);
568 * Transmit pending requests via the mesh.
570 * @param sh mesh to process
573 transmit_pending (struct StreamHandle *sh)
577 sh->wh = GNUNET_MESH_notify_transmit_ready (sh->mesh, GNUNET_YES /* allow cork */,
578 GNUNET_TIME_UNIT_FOREVER_REL,
579 sizeof (struct StreamQueryMessage),
585 * Closure for 'handle_reply'.
587 struct HandleReplyClosure
596 * Expiration time for the block.
598 struct GNUNET_TIME_Absolute expiration;
601 * Number of bytes in 'data'.
608 enum GNUNET_BLOCK_Type type;
611 * Did we have a matching query?
618 * Iterator called on each entry in a waiting map to
621 * @param cls the 'struct HandleReplyClosure'
622 * @param key the key of the entry in the map (the query)
623 * @param value the 'struct GSF_StreamRequest' to handle result for
624 * @return GNUNET_YES (continue to iterate)
627 handle_reply (void *cls,
628 const struct GNUNET_HashCode *key,
631 struct HandleReplyClosure *hrc = cls;
632 struct GSF_StreamRequest *sr = value;
634 sr->proc (sr->proc_cls,
639 GSF_mesh_query_cancel (sr);
640 hrc->found = GNUNET_YES;
646 * Functions with this signature are called whenever a
647 * complete reply is received.
649 * @param cls closure with the 'struct StreamHandle'
650 * @param client identification of the client, NULL
651 * @param message the actual message
652 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
656 struct GNUNET_MESH_Tunnel *tunnel,
658 const struct GNUNET_PeerIdentity *sender,
659 const struct GNUNET_MessageHeader *message)
661 struct StreamHandle *sh = *tunnel_ctx;
662 const struct StreamReplyMessage *srm;
663 struct HandleReplyClosure hrc;
665 enum GNUNET_BLOCK_Type type;
666 struct GNUNET_HashCode query;
668 msize = ntohs (message->size);
669 if (sizeof (struct StreamReplyMessage) > msize)
672 reset_mesh_async (sh);
673 return GNUNET_SYSERR;
675 srm = (const struct StreamReplyMessage *) message;
676 msize -= sizeof (struct StreamReplyMessage);
677 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
679 GNUNET_BLOCK_get_key (GSF_block_ctx,
681 &srm[1], msize, &query))
684 reset_mesh_async (sh);
685 return GNUNET_SYSERR;
687 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688 "Received reply `%s' via mesh\n",
689 GNUNET_h2s (&query));
690 GNUNET_STATISTICS_update (GSF_stats,
691 gettext_noop ("# replies received via mesh"), 1,
694 hrc.data_size = msize;
695 hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
697 hrc.found = GNUNET_NO;
698 GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
702 if (GNUNET_NO == hrc.found)
704 GNUNET_STATISTICS_update (GSF_stats,
705 gettext_noop ("# replies received via mesh dropped"), 1,
714 * Get (or create) a mesh to talk to the given peer.
716 * @param target peer we want to communicate with
718 static struct StreamHandle *
719 get_mesh (const struct GNUNET_PeerIdentity *target)
721 struct StreamHandle *sh;
723 sh = GNUNET_CONTAINER_multihashmap_get (mesh_map,
724 &target->hashPubKey);
727 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
729 GNUNET_SCHEDULER_cancel (sh->timeout_task);
730 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735 "Creating mesh to %s\n",
736 GNUNET_i2s (target));
737 sh = GNUNET_malloc (sizeof (struct StreamHandle));
738 sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
741 sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
742 sh->target = *target;
743 sh->mesh = GNUNET_MESH_tunnel_create (listen_socket,
746 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
747 GNUNET_assert (GNUNET_OK ==
748 GNUNET_CONTAINER_multihashmap_put (mesh_map,
749 &sh->target.hashPubKey,
751 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
757 * Look for a block by directly contacting a particular peer.
759 * @param target peer that should have the block
760 * @param query hash to query for the block
761 * @param type desired type for the block
762 * @param proc function to call with result
763 * @param proc_cls closure for 'proc'
764 * @return handle to cancel the operation
766 struct GSF_StreamRequest *
767 GSF_mesh_query (const struct GNUNET_PeerIdentity *target,
768 const struct GNUNET_HashCode *query,
769 enum GNUNET_BLOCK_Type type,
770 GSF_StreamReplyProcessor proc, void *proc_cls)
772 struct StreamHandle *sh;
773 struct GSF_StreamRequest *sr;
775 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776 "Preparing to send query for %s via mesh to %s\n",
778 GNUNET_i2s (target));
779 sh = get_mesh (target);
780 sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
783 sr->proc_cls = proc_cls;
786 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
789 if (GNUNET_YES == sh->is_ready)
790 transmit_pending (sh);
796 * Cancel an active request; must not be called after 'proc'
799 * @param sr request to cancel
802 GSF_mesh_query_cancel (struct GSF_StreamRequest *sr)
804 struct StreamHandle *sh = sr->sh;
806 if (GNUNET_YES == sr->was_transmitted)
807 GNUNET_assert (GNUNET_OK ==
808 GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
812 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
816 if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
817 (NULL == sh->pending_head) )
818 sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
824 /* ********************* server-side code ************************* */
828 * We're done with a particular client, clean up.
830 * @param sc client to clean up
833 terminate_mesh (struct StreamClient *sc)
835 GNUNET_STATISTICS_update (GSF_stats,
836 gettext_noop ("# mesh connections active"), -1,
838 if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
839 GNUNET_SCHEDULER_cancel (sc->terminate_task);
840 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
841 GNUNET_SCHEDULER_cancel (sc->timeout_task);
843 GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
845 GNUNET_DATASTORE_cancel (sc->qe);
846 GNUNET_MESH_tunnel_destroy (sc->socket);
847 struct WriteQueueItem *wqi;
848 while (NULL != (wqi = sc->wqi_head))
850 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
855 GNUNET_CONTAINER_DLL_remove (sc_head,
864 * Task run to asynchronously terminate the mesh due to timeout.
866 * @param cls the 'struct StreamClient'
867 * @param tc scheduler context
870 timeout_mesh_task (void *cls,
871 const struct GNUNET_SCHEDULER_TaskContext *tc)
873 struct StreamClient *sc = cls;
875 sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
881 * Reset the timeout for the mesh client (due to activity).
883 * @param sc client handle to reset timeout for
886 refresh_timeout_task (struct StreamClient *sc)
888 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
889 GNUNET_SCHEDULER_cancel (sc->timeout_task);
890 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
897 * We're done handling a request from a client, read the next one.
899 * @param sc client to continue reading requests from
902 continue_reading (struct StreamClient *sc)
904 refresh_timeout_task (sc);
909 * Transmit the next entry from the write queue.
911 * @param sc where to process the write queue
914 continue_writing (struct StreamClient *sc);
918 * Send a reply now, mesh is ready.
920 * @param cls closure with the struct StreamClient which sent the query
921 * @param size number of bytes available in 'buf'
922 * @param buf where to write the message
923 * @return number of bytes written to 'buf'
926 write_continuation (void *cls,
930 struct StreamClient *sc = cls;
931 struct WriteQueueItem *wqi;
935 if (NULL == (wqi = sc->wqi_head))
937 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938 "Write queue empty, reading more requests\n");
943 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
944 "Transmission of reply failed, terminating mesh\n");
948 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
951 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952 "Transmitted %u byte reply via mesh\n",
953 (unsigned int) size);
954 GNUNET_STATISTICS_update (GSF_stats,
955 gettext_noop ("# Blocks transferred via mesh"), 1,
957 memcpy (buf, &wqi[1], ret = wqi->msize);
959 continue_writing (sc);
965 * Transmit the next entry from the write queue.
967 * @param sc where to process the write queue
970 continue_writing (struct StreamClient *sc)
972 struct WriteQueueItem *wqi;
976 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
977 "Write pending, waiting for it to complete\n");
978 return; /* write already pending */
980 if (NULL == (wqi = sc->wqi_head))
982 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
983 "Write queue empty, reading more requests\n");
984 continue_reading (sc);
987 sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
988 GNUNET_TIME_UNIT_FOREVER_REL,
994 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
995 "Write failed; terminating mesh\n");
1003 * Process a datum that was stored in the datastore.
1005 * @param cls closure with the struct StreamClient which sent the query
1006 * @param key key for the content
1007 * @param size number of bytes in data
1008 * @param data content stored
1009 * @param type type of the content
1010 * @param priority priority of the content
1011 * @param anonymity anonymity-level for the content
1012 * @param expiration expiration time for the content
1013 * @param uid unique identifier for the datum;
1014 * maybe 0 if no unique identifier is available
1017 handle_datastore_reply (void *cls,
1018 const struct GNUNET_HashCode * key,
1019 size_t size, const void *data,
1020 enum GNUNET_BLOCK_Type type,
1023 struct GNUNET_TIME_Absolute
1024 expiration, uint64_t uid)
1026 struct StreamClient *sc = cls;
1027 size_t msize = size + sizeof (struct StreamReplyMessage);
1028 struct WriteQueueItem *wqi;
1029 struct StreamReplyMessage *srm;
1032 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1034 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1035 "Performing on-demand encoding\n");
1037 GNUNET_FS_handle_on_demand_block (key,
1039 priority, anonymity,
1041 &handle_datastore_reply,
1044 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1045 "On-demand encoding request failed\n");
1046 continue_writing (sc);
1050 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
1053 continue_writing (sc);
1056 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057 "Starting transmission of %u byte reply for query `%s' via mesh\n",
1058 (unsigned int) size,
1060 wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
1062 srm = (struct StreamReplyMessage *) &wqi[1];
1063 srm->header.size = htons ((uint16_t) msize);
1064 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
1065 srm->type = htonl (type);
1066 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1067 memcpy (&srm[1], data, size);
1068 sc->reply_size = msize;
1069 GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
1072 continue_writing (sc);
1077 * Functions with this signature are called whenever a
1078 * complete query message is received.
1080 * Do not call GNUNET_SERVER_mst_destroy in callback
1082 * @param cls closure with the 'struct StreamClient'
1083 * @param client identification of the client, NULL
1084 * @param message the actual message
1085 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1088 request_cb (void *cls,
1089 struct GNUNET_MESH_Tunnel *tunnel,
1091 const struct GNUNET_PeerIdentity *sender,
1092 const struct GNUNET_MessageHeader *message)
1094 struct StreamClient *sc = *tunnel_ctx;
1095 const struct StreamQueryMessage *sqm;
1097 sqm = (const struct StreamQueryMessage *) message;
1098 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1099 "Received query for `%s' via mesh\n",
1100 GNUNET_h2s (&sqm->query));
1101 GNUNET_STATISTICS_update (GSF_stats,
1102 gettext_noop ("# queries received via mesh"), 1,
1104 refresh_timeout_task (sc);
1105 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1110 GSF_datastore_queue_size,
1111 GNUNET_TIME_UNIT_FOREVER_REL,
1112 &handle_datastore_reply, sc);
1115 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116 "Queueing request with datastore failed (queue full?)\n");
1117 continue_writing (sc);
1124 * Functions of this type are called upon new mesh connection from other peers
1125 * or upon binding error which happen when the app_port given in
1126 * GNUNET_STREAM_listen() is already taken.
1128 * @param cls the closure from GNUNET_STREAM_listen
1129 * @param socket the socket representing the mesh
1130 * @param initiator the identity of the peer who wants to establish a mesh
1131 * with us; NULL on binding error
1132 * @return initial tunnel context (our 'struct StreamClient')
1135 accept_cb (void *cls,
1136 struct GNUNET_MESH_Tunnel *socket,
1137 const struct GNUNET_PeerIdentity *initiator,
1140 struct StreamClient *sc;
1142 GNUNET_assert (NULL != socket);
1143 if (sc_count >= sc_count_max)
1145 GNUNET_STATISTICS_update (GSF_stats,
1146 gettext_noop ("# mesh client connections rejected"), 1,
1148 GNUNET_MESH_tunnel_destroy (socket);
1151 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152 "Accepting inbound mesh connection from `%s'\n",
1153 GNUNET_i2s (initiator));
1154 GNUNET_STATISTICS_update (GSF_stats,
1155 gettext_noop ("# mesh connections active"), 1,
1157 sc = GNUNET_malloc (sizeof (struct StreamClient));
1158 sc->socket = socket;
1159 GNUNET_CONTAINER_DLL_insert (sc_head,
1163 refresh_timeout_task (sc);
1169 * Initialize subsystem for non-anonymous file-sharing.
1174 static const struct GNUNET_MESH_MessageHandler handlers[] = {
1175 { &request_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY, sizeof (struct StreamQueryMessage)},
1176 { &reply_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY, 0 },
1179 static const uint32_t ports[] = {
1180 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1184 mesh_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1186 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1188 "MAX_STREAM_CLIENTS",
1191 listen_socket = GNUNET_MESH_connect (GSF_cfg,
1194 NULL /* FIXME: have a cleanup callback? */,
1202 * Function called on each active meshs to shut them down.
1205 * @param key target peer, unused
1206 * @param value the 'struct StreamHandle' to destroy
1207 * @return GNUNET_YES (continue to iterate)
1210 release_meshs (void *cls,
1211 const struct GNUNET_HashCode *key,
1214 struct StreamHandle *sh = value;
1216 destroy_mesh_handle (sh);
1222 * Shutdown subsystem for non-anonymous file-sharing.
1227 struct StreamClient *sc;
1229 while (NULL != (sc = sc_head))
1230 terminate_mesh (sc);
1231 if (NULL != listen_socket)
1233 GNUNET_MESH_disconnect (listen_socket);
1234 listen_socket = NULL;
1236 GNUNET_CONTAINER_multihashmap_iterate (mesh_map,
1239 GNUNET_CONTAINER_multihashmap_destroy (mesh_map);
1243 /* end of gnunet-service-fs_mesh.c */