2 This file is part of GNUnet.
3 (C) 2009 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs.c
23 * @brief program that provides the file-sharing service
24 * @author Christian Grothoff
27 * - validation of KBLOCKS (almost done)
28 * - validation of SBLOCKS
29 * - validation of KSBLOCKS
30 * - actually DO P2P search upon P2P/CS requests (pass appropriate handler to core
31 * and work through request list there; also update ttl/priority for our client's requests)
32 * - randomly delay processing for improved anonymity (can wait)
33 * - content migration (put in local DS) (can wait)
34 * - check that we decrement PIDs always where necessary (can wait)
35 * - handle some special cases when forwarding replies based on tracked requests (can wait)
36 * - tracking of success correlations for hot-path routing (can wait)
37 * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
38 * core to transmit to another peer; need to make sure this is bounded overall...
39 * - various load-based actions (can wait)
40 * - remove on-demand blocks if they keep failing (can wait)
43 #include "gnunet_core_service.h"
44 #include "gnunet_datastore_service.h"
45 #include "gnunet_peer_lib.h"
46 #include "gnunet_protocols.h"
47 #include "gnunet_signatures.h"
48 #include "gnunet_util_lib.h"
53 * In-memory information about indexed files (also available
60 * This is a linked list.
62 struct IndexInfo *next;
65 * Name of the indexed file. Memory allocated
66 * at the end of this struct (do not free).
71 * Context for transmitting confirmation to client,
72 * NULL if we've done this already.
74 struct GNUNET_SERVER_TransmitContext *tc;
77 * Hash of the contents of the file.
79 GNUNET_HashCode file_id;
85 * Signature of a function that is called whenever a datastore
86 * request can be processed (or an entry put on the queue times out).
89 * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
91 typedef void (*RequestFunction)(void *cls,
96 * Doubly-linked list of our requests for the datastore.
98 struct DatastoreRequestQueue
102 * This is a doubly-linked list.
104 struct DatastoreRequestQueue *next;
107 * This is a doubly-linked list.
109 struct DatastoreRequestQueue *prev;
112 * Function to call (will issue the request).
122 * When should this request time-out because we don't care anymore?
124 struct GNUNET_TIME_Absolute timeout;
127 * ID of task used for signaling timeout.
129 GNUNET_SCHEDULER_TaskIdentifier task;
135 * Closure for processing START_SEARCH messages from a client.
137 struct LocalGetContext
141 * This is a doubly-linked list.
143 struct LocalGetContext *next;
146 * This is a doubly-linked list.
148 struct LocalGetContext *prev;
151 * Client that initiated the search.
153 struct GNUNET_SERVER_Client *client;
156 * Array of results that we've already received
159 GNUNET_HashCode *results;
162 * Bloomfilter over all results (for fast query construction);
163 * NULL if we don't have any results.
165 struct GNUNET_CONTAINER_BloomFilter *results_bf;
168 * DS request associated with this operation.
170 struct DatastoreRequestQueue *req;
173 * Current result message to transmit to client (or NULL).
175 struct ContentMessage *result;
178 * Type of the content that we're looking for.
184 * Desired anonymity level.
186 uint32_t anonymity_level;
189 * Number of results actually stored in the results array.
191 unsigned int results_used;
194 * Size of the results array in memory.
196 unsigned int results_size;
199 * If the request is for a DBLOCK or IBLOCK, this is the identity of
200 * the peer that is known to have a response. Set to all-zeros if
201 * such a target is not known (note that even if OUR anonymity
202 * level is >0 we may happen to know the responder's identity;
203 * nevertheless, we should probably not use it for a DHT-lookup
204 * or similar blunt actions in order to avoid exposing ourselves).
206 struct GNUNET_PeerIdentity target;
209 * If the request is for an SBLOCK, this is the identity of the
210 * pseudonym to which the SBLOCK belongs.
212 GNUNET_HashCode namespace;
215 * Hash of the keyword (aka query) for KBLOCKs; Hash of
216 * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
217 * and hash of the identifier XORed with the target for
218 * SBLOCKS (aka query).
220 GNUNET_HashCode query;
226 * Possible routing policies for an FS-GET request.
231 * Simply drop the request.
233 ROUTING_POLICY_NONE = 0,
236 * Answer it if we can from local datastore.
238 ROUTING_POLICY_ANSWER = 1,
241 * Forward the request to other peers (if possible).
243 ROUTING_POLICY_FORWARD = 2,
246 * Forward to other peers, and ask them to route
247 * the response via ourselves.
249 ROUTING_POLICY_INDIRECT = 6,
252 * Do everything we could possibly do (that would
255 ROUTING_POLICY_ALL = 7
260 * Internal context we use for our initial processing
263 struct ProcessGetContext
266 * The search query (used for datastore lookup).
268 GNUNET_HashCode query;
271 * Which peer we should forward the response to.
273 struct GNUNET_PeerIdentity reply_to;
276 * Namespace for the result (only set for SKS requests)
278 GNUNET_HashCode namespace;
281 * Peer that we should forward the query to if possible
282 * (since that peer likely has the content).
284 struct GNUNET_PeerIdentity prime_target;
287 * When did we receive this request?
289 struct GNUNET_TIME_Absolute start_time;
292 * Our entry in the DRQ (non-NULL while we wait for our
293 * turn to interact with the local database).
295 struct DatastoreRequestQueue *drq;
298 * Filter used to eliminate duplicate
299 * results. Can be NULL if we are
300 * not yet filtering any results.
302 struct GNUNET_CONTAINER_BloomFilter *bf;
305 * Bitmap describing which of the optional
306 * hash codes / peer identities were given to us.
311 * Desired block type.
316 * Priority of the request.
321 * In what ways are we going to process
324 enum RoutingPolicy policy;
327 * Time-to-live for the request (value
333 * Number to mingle hashes for bloom-filter
339 * Number of results that were found so far.
341 unsigned int results_found;
346 * Information we keep for each pending reply.
351 * This is a linked list.
353 struct PendingReply *next;
356 * Size of the reply; actual reply message follows
357 * at the end of this struct.
365 * All requests from a client are
366 * kept in a doubly-linked list.
368 struct ClientRequestList;
372 * Information we keep for each pending request. We should try to
373 * keep this struct as small as possible since its memory consumption
374 * is key to how many requests we can have pending at once.
376 struct PendingRequest
380 * ID of a client making a request, NULL if this entry is for a
383 struct GNUNET_SERVER_Client *client;
386 * If this request was made by a client,
387 * this is our entry in the client request
388 * list; otherwise NULL.
390 struct ClientRequestList *crl_entry;
393 * If this is a namespace query, pointer to the hash of the public
394 * key of the namespace; otherwise NULL.
396 GNUNET_HashCode *namespace;
399 * Bloomfilter we use to filter out replies that we don't care about
400 * (anymore). NULL as long as we are interested in all replies.
402 struct GNUNET_CONTAINER_BloomFilter *bf;
405 * Replies that we have received but were unable to forward yet
406 * (typically non-null only if we have a pending transmission
407 * request with the client or the respective peer).
409 struct PendingReply *replies_pending;
412 * Pending transmission request with the core service for the target
413 * peer (for processing of 'replies_pending').
415 struct GNUNET_CORE_TransmitHandle *cth;
418 * Pending transmission request for the target client (for processing of
419 * 'replies_pending').
421 struct GNUNET_CONNECTION_TransmitHandle *th;
424 * Hash code of all replies that we have seen so far (only valid
425 * if client is not NULL since we only track replies like this for
428 GNUNET_HashCode *replies_seen;
431 * When did we first see this request (form this peer), or, if our
432 * client is initiating, when did we last initiate a search?
434 struct GNUNET_TIME_Absolute start_time;
437 * The query that this request is for.
439 GNUNET_HashCode query;
442 * (Interned) Peer identifier (only valid if "client" is NULL)
443 * that identifies a peer that gave us this request.
445 GNUNET_PEER_Id source_pid;
448 * (Interned) Peer identifier that identifies a preferred target
451 GNUNET_PEER_Id target_pid;
454 * (Interned) Peer identifiers of peers that have already
455 * received our query for this content.
457 GNUNET_PEER_Id *used_pids;
460 * Desired anonymity level; only valid for requests from a local client.
462 uint32_t anonymity_level;
465 * How many entries in "used_pids" are actually valid?
467 unsigned int used_pids_off;
470 * How long is the "used_pids" array?
472 unsigned int used_pids_size;
475 * How many entries in "replies_seen" are actually valid?
477 unsigned int replies_seen_off;
480 * How long is the "replies_seen" array?
482 unsigned int replies_seen_size;
485 * Priority with which this request was made. If one of our clients
486 * made the request, then this is the current priority that we are
487 * using when initiating the request. This value is used when
488 * we decide to reward other peers with trust for providing a reply.
493 * Priority points left for us to spend when forwarding this request
496 uint32_t remaining_priority;
499 * Number to mingle hashes for bloom-filter
505 * TTL with which we saw this request (or, if we initiated, TTL that
506 * we used for the request).
511 * Type of the content that this request is for.
519 * All requests from a client are
520 * kept in a doubly-linked list.
522 struct ClientRequestList
525 * This is a doubly-linked list.
527 struct ClientRequestList *next;
530 * This is a doubly-linked list.
532 struct ClientRequestList *prev;
535 * A request from this client.
537 struct PendingRequest *req;
543 * Linked list of all clients that we are
544 * currently processing requests for.
550 * This is a linked list.
552 struct ClientList *next;
555 * What client is this entry for?
557 struct GNUNET_SERVER_Client* client;
560 * Head of the DLL of requests from this client.
562 struct ClientRequestList *head;
565 * Tail of the DLL of requests from this client.
567 struct ClientRequestList *tail;
573 * Closure for "process_reply" function.
575 struct ProcessReplyClosure
578 * The data for the reply.
583 * When the reply expires.
585 struct GNUNET_TIME_Absolute expiration;
598 * How much was this reply worth to us?
605 * Our connection to the datastore.
607 static struct GNUNET_DATASTORE_Handle *dsh;
612 static struct GNUNET_SCHEDULER_Handle *sched;
617 const struct GNUNET_CONFIGURATION_Handle *cfg;
620 * Handle to the core service (NULL until we've connected to it).
622 struct GNUNET_CORE_Handle *core;
625 * Head of doubly-linked LGC list.
627 static struct LocalGetContext *lgc_head;
630 * Tail of doubly-linked LGC list.
632 static struct LocalGetContext *lgc_tail;
635 * Head of request queue for the datastore, sorted by timeout.
637 static struct DatastoreRequestQueue *drq_head;
640 * Tail of request queue for the datastore.
642 static struct DatastoreRequestQueue *drq_tail;
645 * Linked list of indexed files.
647 static struct IndexInfo *indexed_files;
650 * Maps hash over content of indexed files to the respective filename.
651 * The filenames are pointers into the indexed_files linked list and
652 * do not need to be freed.
654 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
657 * Map of query hash codes to requests.
659 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
662 * Map of peer IDs to requests (for those requests coming
665 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
668 * Linked list of all of our clients and their requests.
670 static struct ClientList *clients;
673 * Heap with the request that will expire next at the top. Contains
674 * pointers of type "struct PendingRequest*"; these will *also* be
675 * aliased from the "requests_by_peer" data structures and the
676 * "requests_by_query" table. Note that requests from our clients
677 * don't expire and are thus NOT in the "requests_by_expiration"
678 * (or the "requests_by_peer" tables).
680 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
683 * FIXME: set from configuration.
685 static uint64_t max_pending_requests = 32;
688 * Write the current index information list to disk.
693 struct GNUNET_BIO_WriteHandle *wh;
695 struct IndexInfo *pos;
698 GNUNET_CONFIGURATION_get_value_filename (cfg,
703 GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
704 _("Configuration option `%s' in section `%s' missing.\n"),
709 wh = GNUNET_BIO_write_open (fn);
712 GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
713 _("Could not open `%s'.\n"),
722 GNUNET_BIO_write (wh,
724 sizeof (GNUNET_HashCode))) ||
726 GNUNET_BIO_write_string (wh,
732 GNUNET_BIO_write_close (wh))
734 GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
735 _("Error writing `%s'.\n"),
745 * Read index information from disk.
750 struct GNUNET_BIO_ReadHandle *rh;
752 struct IndexInfo *pos;
759 GNUNET_CONFIGURATION_get_value_filename (cfg,
764 GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
765 _("Configuration option `%s' in section `%s' missing.\n"),
770 rh = GNUNET_BIO_read_open (fn);
773 GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
774 _("Could not open `%s'.\n"),
780 while ( (GNUNET_OK ==
782 "Hash of indexed file",
784 sizeof (GNUNET_HashCode))) &&
786 GNUNET_BIO_read_string (rh,
787 "Name of indexed file",
791 slen = strlen (fname) + 1;
792 pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
794 pos->filename = (const char *) &pos[1];
795 memcpy (&pos[1], fname, slen);
797 GNUNET_CONTAINER_multihashmap_put (ifm,
799 (void*) pos->filename,
800 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
806 pos->next = indexed_files;
811 GNUNET_BIO_read_close (rh, &emsg))
818 * We've validated the hash of the file we're about to
819 * index. Signal success to the client and update
820 * our internal data structures.
822 * @param ii the index info entry for the request
825 signal_index_ok (struct IndexInfo *ii)
828 GNUNET_CONTAINER_multihashmap_put (ifm,
830 (void*) ii->filename,
831 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
833 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
834 _("Index request received for file `%s' is indexed as `%s'. Permitting anyway.\n"),
836 (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
838 GNUNET_SERVER_transmit_context_append (ii->tc,
840 GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
841 GNUNET_SERVER_transmit_context_run (ii->tc,
842 GNUNET_TIME_UNIT_MINUTES);
846 ii->next = indexed_files;
849 GNUNET_SERVER_transmit_context_append (ii->tc,
851 GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
852 GNUNET_SERVER_transmit_context_run (ii->tc,
853 GNUNET_TIME_UNIT_MINUTES);
859 * Function called once the hash computation over an
860 * indexed file has completed.
862 * @param cls closure, our publishing context
863 * @param res resulting hash, NULL on error
866 hash_for_index_val (void *cls,
867 const GNUNET_HashCode *
870 struct IndexInfo *ii = cls;
872 if ( (res == NULL) ||
875 sizeof(GNUNET_HashCode))) )
877 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
878 _("Hash mismatch trying to index file `%s'\n"),
880 GNUNET_SERVER_transmit_context_append (ii->tc,
882 GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
883 GNUNET_SERVER_transmit_context_run (ii->tc,
884 GNUNET_TIME_UNIT_MINUTES);
888 signal_index_ok (ii);
893 * Handle INDEX_START-message.
896 * @param client identification of the client
897 * @param message the actual message
900 handle_index_start (void *cls,
901 struct GNUNET_SERVER_Client *client,
902 const struct GNUNET_MessageHeader *message)
904 const struct IndexStartMessage *ism;
907 struct IndexInfo *ii;
914 msize = ntohs(message->size);
915 if ( (msize <= sizeof (struct IndexStartMessage)) ||
916 ( ((const char *)message)[msize-1] != '\0') )
919 GNUNET_SERVER_receive_done (client,
923 ism = (const struct IndexStartMessage*) message;
924 fn = (const char*) &ism[1];
925 dev = ntohl (ism->device);
926 ino = GNUNET_ntohll (ism->inode);
927 ism = (const struct IndexStartMessage*) message;
928 slen = strlen (fn) + 1;
929 ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
930 ii->filename = (const char*) &ii[1];
931 memcpy (&ii[1], fn, slen);
932 ii->file_id = ism->file_id;
933 ii->tc = GNUNET_SERVER_transmit_context_create (client);
936 (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
942 /* fast validation OK! */
943 signal_index_ok (ii);
946 /* slow validation, need to hash full file (again) */
947 GNUNET_CRYPTO_hash_file (sched,
948 GNUNET_SCHEDULER_PRIORITY_IDLE,
958 * Handle INDEX_LIST_GET-message.
961 * @param client identification of the client
962 * @param message the actual message
965 handle_index_list_get (void *cls,
966 struct GNUNET_SERVER_Client *client,
967 const struct GNUNET_MessageHeader *message)
969 struct GNUNET_SERVER_TransmitContext *tc;
970 struct IndexInfoMessage *iim;
971 char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
974 struct GNUNET_MessageHeader *msg;
975 struct IndexInfo *pos;
977 tc = GNUNET_SERVER_transmit_context_create (client);
978 iim = (struct IndexInfoMessage*) buf;
984 iim->file_id = pos->file_id;
986 slen = strlen (fn) + 1;
987 if (slen + sizeof (struct IndexInfoMessage) >
988 GNUNET_SERVER_MAX_MESSAGE_SIZE)
993 memcpy (&iim[1], fn, slen);
994 GNUNET_SERVER_transmit_context_append
997 sizeof (struct IndexInfoMessage)
998 - sizeof (struct GNUNET_MessageHeader) + slen,
999 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
1002 GNUNET_SERVER_transmit_context_append (tc,
1004 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
1005 GNUNET_SERVER_transmit_context_run (tc,
1006 GNUNET_TIME_UNIT_MINUTES);
1011 * Handle UNINDEX-message.
1013 * @param cls closure
1014 * @param client identification of the client
1015 * @param message the actual message
1018 handle_unindex (void *cls,
1019 struct GNUNET_SERVER_Client *client,
1020 const struct GNUNET_MessageHeader *message)
1022 const struct UnindexMessage *um;
1023 struct IndexInfo *pos;
1024 struct IndexInfo *prev;
1025 struct IndexInfo *next;
1026 struct GNUNET_SERVER_TransmitContext *tc;
1029 um = (const struct UnindexMessage*) message;
1032 pos = indexed_files;
1036 if (0 == memcmp (&pos->file_id,
1038 sizeof (GNUNET_HashCode)))
1041 indexed_files = pos->next;
1043 prev->next = pos->next;
1053 if (GNUNET_YES == found)
1054 write_index_list ();
1055 tc = GNUNET_SERVER_transmit_context_create (client);
1056 GNUNET_SERVER_transmit_context_append (tc,
1058 GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
1059 GNUNET_SERVER_transmit_context_run (tc,
1060 GNUNET_TIME_UNIT_MINUTES);
1065 * Run the next DS request in our
1066 * queue, we're done with the current one.
1071 struct DatastoreRequestQueue *e;
1073 while (NULL != (e = drq_head))
1075 if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
1077 if (e->task != GNUNET_SCHEDULER_NO_TASK)
1078 GNUNET_SCHEDULER_cancel (sched, e->task);
1079 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1080 e->req (e->req_cls, GNUNET_NO);
1085 if (e->task != GNUNET_SCHEDULER_NO_TASK)
1086 GNUNET_SCHEDULER_cancel (sched, e->task);
1087 e->task = GNUNET_SCHEDULER_NO_TASK;
1088 e->req (e->req_cls, GNUNET_YES);
1089 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1095 * A datastore request had to be timed out.
1097 * @param cls closure (of type "struct DatastoreRequestQueue*")
1098 * @param tc task context, unused
1101 timeout_ds_request (void *cls,
1102 const struct GNUNET_SCHEDULER_TaskContext *tc)
1104 struct DatastoreRequestQueue *e = cls;
1106 e->task = GNUNET_SCHEDULER_NO_TASK;
1107 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1108 e->req (e->req_cls, GNUNET_NO);
1114 * Queue a request for the datastore.
1116 * @param deadline by when the request should run
1117 * @param fun function to call once the request can be run
1118 * @param fun_cls closure for fun
1120 static struct DatastoreRequestQueue *
1121 queue_ds_request (struct GNUNET_TIME_Relative deadline,
1122 RequestFunction fun,
1125 struct DatastoreRequestQueue *e;
1126 struct DatastoreRequestQueue *bef;
1128 if (drq_head == NULL)
1130 /* no other requests pending, run immediately */
1131 fun (fun_cls, GNUNET_OK);
1134 e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
1135 e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
1137 e->req_cls = fun_cls;
1138 if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1140 /* local request, highest prio, put at head of queue
1141 regardless of deadline */
1147 while ( (NULL != bef) &&
1148 (e->timeout.value < bef->timeout.value) )
1151 GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
1152 if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1154 e->task = GNUNET_SCHEDULER_add_delayed (sched,
1156 GNUNET_SCHEDULER_PRIORITY_BACKGROUND,
1157 GNUNET_SCHEDULER_NO_TASK,
1159 &timeout_ds_request,
1166 * Free the state associated with a local get context.
1168 * @param lgc the lgc to free
1171 local_get_context_free (struct LocalGetContext *lgc)
1173 GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1174 GNUNET_SERVER_client_drop (lgc->client);
1175 GNUNET_free_non_null (lgc->results);
1176 if (lgc->results_bf != NULL)
1177 GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
1178 if (lgc->req != NULL)
1180 if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
1181 GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
1182 GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1183 GNUNET_free (lgc->req);
1190 * We're able to transmit the next (local) result to the client.
1191 * Do it and ask the datastore for more. Or, on error, tell
1192 * the datastore to stop giving us more.
1194 * @param cls our closure (struct LocalGetContext)
1195 * @param max maximum number of bytes we can transmit
1196 * @param buf where to copy our message
1197 * @return number of bytes copied to buf
1200 transmit_local_result (void *cls,
1204 struct LocalGetContext *lgc = cls;
1210 GNUNET_free (lgc->result);
1212 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1215 msize = ntohs (lgc->result->header.size);
1216 GNUNET_assert (max >= msize);
1217 memcpy (buf, lgc->result, msize);
1218 GNUNET_free (lgc->result);
1220 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1226 * Continuation called from datastore's remove
1230 * @param success did the deletion work?
1231 * @param msg error message
1234 remove_cont (void *cls,
1238 if (GNUNET_OK != success)
1239 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1240 _("Failed to delete bogus block: %s\n"),
1242 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1247 * Mingle hash with the mingle_number to
1248 * produce different bits.
1251 mingle_hash (const GNUNET_HashCode * in,
1252 int32_t mingle_number,
1253 GNUNET_HashCode * hc)
1257 GNUNET_CRYPTO_hash (&mingle_number,
1260 GNUNET_CRYPTO_hash_xor (&m, in, hc);
1265 * We've received an on-demand encoded block
1266 * from the datastore. Attempt to do on-demand
1267 * encoding and (if successful), call the
1268 * continuation with the resulting block. On
1269 * error, clean up and ask the datastore for
1272 * @param key key for the content
1273 * @param size number of bytes in data
1274 * @param data content stored
1275 * @param type type of the content
1276 * @param priority priority of the content
1277 * @param anonymity anonymity-level for the content
1278 * @param expiration expiration time for the content
1279 * @param uid unique identifier for the datum;
1280 * maybe 0 if no unique identifier is available
1281 * @param cont function to call with the actual block
1282 * @param cont_cls closure for cont
1285 handle_on_demand_block (const GNUNET_HashCode * key,
1291 struct GNUNET_TIME_Absolute
1292 expiration, uint64_t uid,
1293 GNUNET_DATASTORE_Iterator cont,
1296 const struct OnDemandBlock *odb;
1297 GNUNET_HashCode nkey;
1298 struct GNUNET_CRYPTO_AesSessionKey skey;
1299 struct GNUNET_CRYPTO_AesInitializationVector iv;
1300 GNUNET_HashCode query;
1302 char ndata[DBLOCK_SIZE];
1303 char edata[DBLOCK_SIZE];
1305 struct GNUNET_DISK_FileHandle *fh;
1308 if (size != sizeof (struct OnDemandBlock))
1311 GNUNET_DATASTORE_remove (dsh,
1317 GNUNET_TIME_UNIT_FOREVER_REL);
1320 odb = (const struct OnDemandBlock*) data;
1321 off = GNUNET_ntohll (odb->offset);
1322 fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1325 if ( (NULL == fn) ||
1326 (NULL == (fh = GNUNET_DISK_file_open (fn,
1327 GNUNET_DISK_OPEN_READ))) ||
1329 GNUNET_DISK_file_seek (fh,
1331 GNUNET_DISK_SEEK_SET)) ||
1333 (nsize = GNUNET_DISK_file_read (fh,
1337 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1338 _("Could not access indexed file `%s' at offset %llu: %s\n"),
1339 GNUNET_h2s (&odb->file_id),
1340 (unsigned long long) off,
1343 GNUNET_DISK_file_close (fh);
1344 /* FIXME: if this happens often, we need
1345 to remove the OnDemand block from the DS! */
1346 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1349 GNUNET_DISK_file_close (fh);
1350 GNUNET_CRYPTO_hash (ndata,
1353 GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1354 GNUNET_CRYPTO_aes_encrypt (ndata,
1359 GNUNET_CRYPTO_hash (edata,
1362 if (0 != memcmp (&query,
1364 sizeof (GNUNET_HashCode)))
1366 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1367 _("Indexed file `%s' changed at offset %llu\n"),
1369 (unsigned long long) off);
1370 /* FIXME: if this happens often, we need
1371 to remove the OnDemand block from the DS! */
1372 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1379 GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1388 * How many bytes should a bloomfilter be if we have already seen
1389 * entry_count responses? Note that BLOOMFILTER_K gives us the number
1390 * of bits set per entry. Furthermore, we should not re-size the
1391 * filter too often (to keep it cheap).
1393 * Since other peers will also add entries but not resize the filter,
1394 * we should generally pick a slightly larger size than what the
1395 * strict math would suggest.
1397 * @return must be a power of two and smaller or equal to 2^15.
1400 compute_bloomfilter_size (unsigned int entry_count)
1403 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1404 uint16_t max = 1 << 15;
1406 if (entry_count > max)
1409 while ((size < max) && (size < ideal))
1418 * Recalculate our bloom filter for filtering replies.
1420 * @param count number of entries we are filtering right now
1421 * @param mingle set to our new mingling value
1422 * @param entries the entries to filter
1423 * @return updated bloomfilter, NULL for none
1425 static struct GNUNET_CONTAINER_BloomFilter *
1426 refresh_bloomfilter (unsigned int count,
1428 const GNUNET_HashCode *entries)
1430 struct GNUNET_CONTAINER_BloomFilter *bf;
1433 GNUNET_HashCode mhash;
1437 nsize = compute_bloomfilter_size (count);
1438 *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1439 bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
1442 for (i=0;i<count;i++)
1444 mingle_hash (&entries[i], *mingle, &mhash);
1445 GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1452 * We're processing (local) results for a search request
1453 * from a (local) client. Pass applicable results to the
1454 * client and if we are done either clean up (operation
1455 * complete) or switch to P2P search (more results possible).
1457 * @param cls our closure (struct LocalGetContext)
1458 * @param key key for the content
1459 * @param size number of bytes in data
1460 * @param data content stored
1461 * @param type type of the content
1462 * @param priority priority of the content
1463 * @param anonymity anonymity-level for the content
1464 * @param expiration expiration time for the content
1465 * @param uid unique identifier for the datum;
1466 * maybe 0 if no unique identifier is available
1469 process_local_get_result (void *cls,
1470 const GNUNET_HashCode * key,
1476 struct GNUNET_TIME_Absolute
1480 struct LocalGetContext *lgc = cls;
1481 struct PendingRequest *pr;
1482 struct ClientRequestList *crl;
1483 struct ClientList *cl;
1489 /* no further results from datastore; continue
1490 processing further requests from the client and
1491 allow the next task to use the datastore; also,
1492 switch to P2P requests or clean up our state. */
1494 GNUNET_SERVER_receive_done (lgc->client,
1496 if ( (lgc->results_used == 0) ||
1497 (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1498 (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1499 (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1502 while ( (NULL != cl) &&
1503 (cl->client != lgc->client) )
1507 cl = GNUNET_malloc (sizeof (struct ClientList));
1508 cl->client = lgc->client;
1512 crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1513 GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1514 pr = GNUNET_malloc (sizeof (struct PendingRequest));
1515 pr->client = lgc->client;
1516 GNUNET_SERVER_client_keep (pr->client);
1517 pr->crl_entry = crl;
1519 if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1521 pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1522 *pr->namespace = lgc->namespace;
1524 pr->replies_seen = lgc->results;
1525 lgc->results = NULL;
1526 pr->start_time = GNUNET_TIME_absolute_get ();
1527 pr->query = lgc->query;
1528 pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1529 pr->replies_seen_off = lgc->results_used;
1530 pr->replies_seen_size = lgc->results_size;
1531 lgc->results_size = 0;
1532 pr->type = lgc->type;
1533 pr->anonymity_level = lgc->anonymity_level;
1534 pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1537 GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1540 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1542 // FIXME: trigger some processing NOW!
1543 local_get_context_free (lgc);
1546 /* got all possible results, clean up! */
1547 local_get_context_free (lgc);
1550 if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1552 handle_on_demand_block (key, size, data, type, priority,
1553 anonymity, expiration, uid,
1554 &process_local_get_result,
1558 if (type != lgc->type)
1560 /* this should be virtually impossible to reach (DBLOCK
1561 query hash being identical to KBLOCK/SBLOCK query hash);
1562 nevertheless, if it happens, the correct thing is to
1563 simply skip the result. */
1564 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1567 /* check if this is a result we've alredy
1569 for (i=0;i<lgc->results_used;i++)
1570 if (0 == memcmp (key,
1572 sizeof (GNUNET_HashCode)))
1574 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1577 if (lgc->results_used == lgc->results_size)
1578 GNUNET_array_grow (lgc->results,
1580 lgc->results_size * 2 + 2);
1581 GNUNET_CRYPTO_hash (data,
1583 &lgc->results[lgc->results_used++]);
1584 msize = size + sizeof (struct ContentMessage);
1585 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1586 lgc->result = GNUNET_malloc (msize);
1587 lgc->result->header.size = htons (msize);
1588 lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
1589 lgc->result->type = htonl (type);
1590 lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
1591 memcpy (&lgc->result[1],
1594 GNUNET_SERVER_notify_transmit_ready (lgc->client,
1596 GNUNET_TIME_UNIT_FOREVER_REL,
1597 &transmit_local_result,
1603 * We're processing a search request from a local
1604 * client. Now it is our turn to query the datastore.
1606 * @param cls our closure (struct LocalGetContext)
1610 transmit_local_get (void *cls,
1611 const struct GNUNET_SCHEDULER_TaskContext *tc)
1613 struct LocalGetContext *lgc = cls;
1617 if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1618 type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1619 GNUNET_DATASTORE_get (dsh,
1622 &process_local_get_result,
1624 GNUNET_TIME_UNIT_FOREVER_REL);
1629 * We're processing a search request from a local
1630 * client. Now it is our turn to query the datastore.
1632 * @param cls our closure (struct LocalGetContext)
1633 * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
1636 transmit_local_get_ready (void *cls,
1639 struct LocalGetContext *lgc = cls;
1641 GNUNET_assert (GNUNET_OK == ok);
1642 GNUNET_SCHEDULER_add_continuation (sched,
1644 &transmit_local_get,
1646 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1651 * Handle START_SEARCH-message (search request from client).
1653 * @param cls closure
1654 * @param client identification of the client
1655 * @param message the actual message
1658 handle_start_search (void *cls,
1659 struct GNUNET_SERVER_Client *client,
1660 const struct GNUNET_MessageHeader *message)
1662 const struct SearchMessage *sm;
1663 struct LocalGetContext *lgc;
1667 msize = ntohs (message->size);
1668 if ( (msize < sizeof (struct SearchMessage)) ||
1669 (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
1672 GNUNET_SERVER_receive_done (client,
1676 sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
1677 sm = (const struct SearchMessage*) message;
1678 GNUNET_SERVER_client_keep (client);
1679 lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
1682 lgc->results_used = sc;
1683 GNUNET_array_grow (lgc->results,
1686 memcpy (lgc->results,
1688 sc * sizeof (GNUNET_HashCode));
1690 lgc->client = client;
1691 lgc->type = ntohl (sm->type);
1692 lgc->anonymity_level = ntohl (sm->anonymity_level);
1695 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
1696 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
1697 lgc->target.hashPubKey = sm->target;
1699 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
1700 lgc->namespace = sm->target;
1705 lgc->query = sm->query;
1706 GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
1707 lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
1708 &transmit_local_get_ready,
1714 * List of handlers for the messages understood by this
1717 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1718 {&handle_index_start, NULL,
1719 GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
1720 {&handle_index_list_get, NULL,
1721 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
1722 {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
1723 sizeof (struct UnindexMessage) },
1724 {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
1731 * Clean up the memory used by the PendingRequest
1732 * structure (except for the client or peer list
1733 * that the request may be part of).
1735 * @param pr request to clean up
1738 destroy_pending_request (struct PendingRequest *pr)
1740 struct PendingReply *reply;
1742 GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
1745 // FIXME: not sure how this can work (efficiently)
1746 // also, what does the return value mean?
1747 if (pr->client != NULL)
1748 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
1751 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1752 if (NULL != pr->cth)
1753 GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
1755 GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
1756 while (NULL != (reply = pr->replies_pending))
1758 pr->replies_pending = reply->next;
1759 GNUNET_free (reply);
1761 GNUNET_PEER_change_rc (pr->source_pid, -1);
1762 GNUNET_PEER_change_rc (pr->target_pid, -1);
1763 GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1764 GNUNET_free_non_null (pr->used_pids);
1765 GNUNET_free_non_null (pr->replies_seen);
1766 GNUNET_free_non_null (pr->namespace);
1772 * A client disconnected. Remove all of its pending queries.
1774 * @param cls closure, NULL
1775 * @param client identification of the client
1778 handle_client_disconnect (void *cls,
1779 struct GNUNET_SERVER_Client
1782 struct LocalGetContext *lgc;
1783 struct ClientList *cpos;
1784 struct ClientList *cprev;
1785 struct ClientRequestList *rl;
1788 while ( (NULL != lgc) &&
1789 (lgc->client != client) )
1792 local_get_context_free (lgc);
1795 while ( (NULL != cpos) &&
1796 (clients->client != client) )
1804 clients = cpos->next;
1806 cprev->next = cpos->next;
1807 while (NULL != (rl = cpos->head))
1809 cpos->head = rl->next;
1810 destroy_pending_request (rl->req);
1819 * Task run during shutdown.
1825 shutdown_task (void *cls,
1826 const struct GNUNET_SCHEDULER_TaskContext *tc)
1828 struct IndexInfo *pos;
1831 GNUNET_CORE_disconnect (core);
1832 GNUNET_DATASTORE_disconnect (dsh,
1835 // FIXME: iterate over maps to free entries!
1836 GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
1837 requests_by_query = NULL;
1838 GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
1839 requests_by_peer = NULL;
1840 GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
1841 requests_by_expiration = NULL;
1842 GNUNET_CONTAINER_multihashmap_destroy (ifm);
1844 while (NULL != (pos = indexed_files))
1846 indexed_files = pos->next;
1853 * Free (each) request made by the peer.
1855 * @param cls closure, points to peer that the request belongs to
1856 * @param key current key code
1857 * @param value value in the hash map
1858 * @return GNUNET_YES (we should continue to iterate)
1861 destroy_request (void *cls,
1862 const GNUNET_HashCode * key,
1865 const struct GNUNET_PeerIdentity * peer = cls;
1866 struct PendingRequest *pr = value;
1868 GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
1871 destroy_pending_request (pr);
1877 * Method called whenever a peer disconnects.
1879 * @param cls closure, not used
1880 * @param peer peer identity this notification is about
1883 peer_disconnect_handler (void *cls,
1885 GNUNET_PeerIdentity * peer)
1887 GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
1895 * We're processing a GET request from
1896 * another peer and have decided to forward
1897 * it to other peers.
1899 * @param cls our "struct ProcessGetContext *"
1903 forward_get_request (void *cls,
1904 const struct GNUNET_SCHEDULER_TaskContext *tc)
1906 struct ProcessGetContext *pgc = cls;
1907 struct PendingRequest *pr;
1908 struct PendingRequest *eer;
1909 struct GNUNET_PeerIdentity target;
1911 pr = GNUNET_malloc (sizeof (struct PendingRequest));
1912 if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
1914 pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
1915 *pr->namespace = pgc->namespace;
1919 pr->start_time = pgc->start_time;
1920 pr->query = pgc->query;
1921 pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
1922 if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
1923 pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
1924 pr->anonymity_level = 1; /* default */
1925 pr->priority = pgc->priority;
1926 pr->remaining_priority = pr->priority;
1927 pr->mingle = pgc->mingle;
1929 pr->type = pgc->type;
1930 GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1933 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1934 GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
1935 &pgc->reply_to.hashPubKey,
1937 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1938 GNUNET_CONTAINER_heap_insert (requests_by_expiration,
1940 pr->start_time.value + pr->ttl);
1941 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
1943 /* expire oldest request! */
1944 eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
1945 GNUNET_PEER_resolve (eer->source_pid,
1947 GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
1950 destroy_pending_request (eer);
1952 // FIXME: trigger actual forwarding NOW!
1958 * Transmit the given message by copying it to
1959 * the target buffer "buf". "buf" will be
1960 * NULL and "size" zero if the socket was closed for
1961 * writing in the meantime. In that case, only
1965 * @param cls closure, pointer to the message
1966 * @param size number of bytes available in buf
1967 * @param buf where the callee should write the message
1968 * @return number of bytes written to buf
1971 transmit_message (void *cls,
1972 size_t size, void *buf)
1974 struct GNUNET_MessageHeader *msg = cls;
1980 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1981 "Dropping reply, core too busy.\n");
1986 msize = ntohs (msg->size);
1987 GNUNET_assert (size >= msize);
1988 memcpy (buf, msg, msize);
1995 * Test if the load on this peer is too high
1996 * to even consider processing the query at
1999 * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
2002 test_load_too_high ()
2004 return GNUNET_NO; // FIXME
2009 * We're processing (local) results for a search request
2010 * from another peer. Pass applicable results to the
2011 * peer and if we are done either clean up (operation
2012 * complete) or forward to other peers (more results possible).
2014 * @param cls our closure (struct LocalGetContext)
2015 * @param key key for the content
2016 * @param size number of bytes in data
2017 * @param data content stored
2018 * @param type type of the content
2019 * @param priority priority of the content
2020 * @param anonymity anonymity-level for the content
2021 * @param expiration expiration time for the content
2022 * @param uid unique identifier for the datum;
2023 * maybe 0 if no unique identifier is available
2026 process_p2p_get_result (void *cls,
2027 const GNUNET_HashCode * key,
2033 struct GNUNET_TIME_Absolute
2037 struct ProcessGetContext *pgc = cls;
2038 GNUNET_HashCode dhash;
2039 GNUNET_HashCode mhash;
2040 struct PutMessage *reply;
2044 /* no more results */
2045 if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) == ROUTING_POLICY_FORWARD) &&
2046 ( (0 == pgc->results_found) ||
2047 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2048 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2049 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2051 GNUNET_SCHEDULER_add_continuation (sched,
2053 &forward_get_request,
2055 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2059 if (pgc->bf != NULL)
2060 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2066 if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2068 handle_on_demand_block (key, size, data, type, priority,
2069 anonymity, expiration, uid,
2070 &process_p2p_get_result,
2074 /* check for duplicates */
2075 GNUNET_CRYPTO_hash (data, size, &dhash);
2076 mingle_hash (&dhash,
2079 if ( (pgc->bf != NULL) &&
2081 GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2085 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2086 "Result from datastore filtered by bloomfilter.\n");
2088 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2091 pgc->results_found++;
2092 if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2093 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2094 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2096 if (pgc->bf == NULL)
2097 pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2100 GNUNET_CONTAINER_bloomfilter_add (pgc->bf,
2104 reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2105 reply->header.size = htons (sizeof (struct PutMessage) + size);
2106 reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2107 reply->type = htonl (type);
2108 reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2109 memcpy (&reply[1], data, size);
2110 GNUNET_CORE_notify_transmit_ready (core,
2112 ACCEPTABLE_REPLY_DELAY,
2114 sizeof (struct PutMessage) + size,
2117 if ( (GNUNET_YES == test_load_too_high()) ||
2118 (pgc->results_found > 5 + 2 * pgc->priority) )
2120 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2121 pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2124 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2129 * We're processing a GET request from another peer. Give it to our
2132 * @param cls our "struct ProcessGetContext"
2133 * @param ok did we get a datastore slice or not?
2136 ds_get_request (void *cls,
2139 struct ProcessGetContext *pgc = cls;
2141 struct GNUNET_TIME_Relative timeout;
2143 if (GNUNET_OK != ok)
2145 /* no point in doing P2P stuff if we can't even do local */
2150 if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2151 type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2152 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2153 (pgc->priority + 1));
2154 GNUNET_DATASTORE_get (dsh,
2157 &process_p2p_get_result,
2164 * The priority level imposes a bound on the maximum
2165 * value for the ttl that can be requested.
2167 * @param ttl_in requested ttl
2168 * @param priority given priority
2169 * @return ttl_in if ttl_in is below the limit,
2170 * otherwise the ttl-limit for the given priority
2173 bound_ttl (int32_t ttl_in, uint32_t prio)
2175 unsigned long long allowed;
2179 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
2180 if (ttl_in > allowed)
2182 if (allowed >= (1 << 30))
2191 * We've received a request with the specified
2192 * priority. Bound it according to how much
2193 * we trust the given peer.
2195 * @param prio_in requested priority
2196 * @param peer the peer making the request
2197 * @return effective priority
2200 bound_priority (uint32_t prio_in,
2201 const struct GNUNET_PeerIdentity *peer)
2208 * Handle P2P "GET" request.
2210 * @param cls closure, always NULL
2211 * @param peer the other peer involved (sender or receiver, NULL
2212 * for loopback messages where we are both sender and receiver)
2213 * @param message the actual message
2214 * @return GNUNET_OK to keep the connection open,
2215 * GNUNET_SYSERR to close it (signal serious error)
2218 handle_p2p_get (void *cls,
2219 const struct GNUNET_PeerIdentity *other,
2220 const struct GNUNET_MessageHeader *message)
2223 const struct GetMessage *gm;
2225 const GNUNET_HashCode *opt;
2226 struct ProcessGetContext *pgc;
2229 uint32_t ttl_decrement;
2234 msize = ntohs(message->size);
2235 if (msize < sizeof (struct GetMessage))
2237 GNUNET_break_op (0);
2238 return GNUNET_SYSERR;
2240 gm = (const struct GetMessage*) message;
2241 bm = ntohl (gm->hash_bitmap);
2249 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2251 GNUNET_break_op (0);
2252 return GNUNET_SYSERR;
2254 opt = (const GNUNET_HashCode*) &gm[1];
2255 bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2256 pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2258 pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2261 pgc->type = ntohl (gm->type);
2262 pgc->bm = ntohl (gm->hash_bitmap);
2263 pgc->mingle = gm->filter_mutator;
2265 if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2266 pgc->reply_to.hashPubKey = opt[bits++];
2268 pgc->reply_to = *other;
2269 if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2270 pgc->namespace = opt[bits++];
2271 else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2273 GNUNET_break_op (0);
2275 return GNUNET_SYSERR;
2277 if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2278 pgc->prime_target.hashPubKey = opt[bits++];
2279 /* note that we can really only check load here since otherwise
2280 peers could find out that we are overloaded by being disconnected
2281 after sending us a malformed query... */
2282 if (GNUNET_YES == test_load_too_high ())
2284 if (NULL != pgc->bf)
2285 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2288 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2289 "Dropping query from `%s', this peer is too busy.\n",
2290 GNUNET_h2s (other));
2294 net_load_up = 50; // FIXME
2295 net_load_down = 50; // FIXME
2296 pgc->policy = ROUTING_POLICY_NONE;
2297 if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2298 (net_load_down < IDLE_LOAD_THRESHOLD) )
2300 pgc->policy |= ROUTING_POLICY_ALL;
2301 pgc->priority = 0; /* no charge */
2305 pgc->priority = bound_priority (ntohl (gm->priority), other);
2307 IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2309 IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2311 pgc->policy |= ROUTING_POLICY_ALL;
2315 // FIXME: is this sound?
2316 if (net_load_up < 90 + 10 * pgc->priority)
2317 pgc->policy |= ROUTING_POLICY_FORWARD;
2318 if (net_load_down < 90 + 10 * pgc->priority)
2319 pgc->policy |= ROUTING_POLICY_ANSWER;
2322 if (pgc->policy == ROUTING_POLICY_NONE)
2325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2326 "Dropping query from `%s', network saturated.\n",
2327 GNUNET_h2s (other));
2329 if (NULL != pgc->bf)
2330 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2332 return GNUNET_OK; /* drop */
2334 if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2335 pgc->priority = 0; /* kill the priority (we cannot benefit) */
2336 pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2337 /* decrement ttl (always) */
2338 ttl_decrement = 2 * TTL_DECREMENT +
2339 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2341 if ( (pgc->ttl < 0) &&
2342 (pgc->ttl - ttl_decrement > 0) )
2345 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2346 "Dropping query from `%s' due to TTL underflow.\n",
2347 GNUNET_h2s (other));
2349 /* integer underflow => drop (should be very rare)! */
2350 if (NULL != pgc->bf)
2351 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2355 pgc->ttl -= ttl_decrement;
2356 pgc->start_time = GNUNET_TIME_absolute_get ();
2357 preference = (double) pgc->priority;
2358 if (preference < QUERY_BANDWIDTH_VALUE)
2359 preference = QUERY_BANDWIDTH_VALUE;
2360 // FIXME: also reserve bandwidth for reply?
2361 GNUNET_CORE_peer_configure (core,
2363 GNUNET_TIME_UNIT_FOREVER_REL,
2364 0, 0, preference, NULL, NULL);
2365 if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2366 pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2370 GNUNET_SCHEDULER_add_continuation (sched,
2372 &forward_get_request,
2374 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2380 * Function called to notify us that we can now transmit a reply to a
2381 * client or peer. "buf" will be NULL and "size" zero if the socket was
2382 * closed for writing in the meantime.
2384 * @param cls closure, points to a "struct PendingRequest*" with
2385 * one or more pending replies
2386 * @param size number of bytes available in buf
2387 * @param buf where the callee should write the message
2388 * @return number of bytes written to buf
2391 transmit_result (void *cls,
2395 struct PendingRequest *pr = cls;
2397 struct PendingReply *reply;
2401 while (NULL != (reply = pr->replies_pending))
2403 if ( (reply->msize + ret < ret) ||
2404 (reply->msize + ret > size) )
2406 pr->replies_pending = reply->next;
2407 memcpy (&cbuf[ret], &reply[1], reply->msize);
2408 ret += reply->msize;
2416 * Iterator over pending requests.
2418 * @param cls response (struct ProcessReplyClosure)
2419 * @param key our query
2420 * @param value value in the hash map (meta-info about the query)
2421 * @return GNUNET_YES (we should continue to iterate)
2424 process_reply (void *cls,
2425 const GNUNET_HashCode * key,
2428 struct ProcessReplyClosure *prq = cls;
2429 struct PendingRequest *pr = value;
2430 struct PendingRequest *eer;
2431 struct PendingReply *reply;
2432 struct PutMessage *pm;
2433 struct ContentMessage *cm;
2434 GNUNET_HashCode chash;
2435 GNUNET_HashCode mhash;
2436 struct GNUNET_PeerIdentity target;
2439 struct GNUNET_TIME_Relative max_delay;
2441 GNUNET_CRYPTO_hash (prq->data,
2446 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2447 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2448 if (0 != memcmp (&chash,
2450 sizeof (GNUNET_HashCode)))
2452 GNUNET_break_op (0);
2456 case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2457 // FIXME: validate KBlock!
2460 mingle_hash (&chash, pr->mingle, &mhash);
2461 if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2463 return GNUNET_YES; /* duplicate */
2464 GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2468 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2469 // FIXME: validate SBlock!
2472 mingle_hash (&chash, pr->mingle, &mhash);
2473 if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2475 return GNUNET_YES; /* duplicate */
2476 GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2480 case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2481 // FIXME: validate SKBlock!
2484 prio = pr->priority;
2485 prq->priority += pr->remaining_priority;
2486 pr->remaining_priority = 0;
2487 if (pr->client != NULL)
2489 if (pr->replies_seen_size == pr->replies_seen_off)
2490 GNUNET_array_grow (pr->replies_seen,
2491 pr->replies_seen_size,
2492 pr->replies_seen_size * 2 + 4);
2493 pr->replies_seen[pr->replies_seen_off++] = chash;
2494 // FIXME: possibly recalculate BF!
2496 if (pr->client == NULL)
2498 msize = sizeof (struct ContentMessage) + prq->size;
2499 reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
2500 reply->msize = msize;
2501 cm = (struct ContentMessage*) &reply[1];
2502 cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2503 cm->header.size = htons (msize);
2504 cm->type = htonl (prq->type);
2505 cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2506 reply->next = pr->replies_pending;
2507 pr->replies_pending = reply;
2508 memcpy (&reply[1], prq->data, prq->size);
2509 if (pr->cth != NULL)
2511 max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2512 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
2514 /* estimate expiration time from time difference between
2515 first request that will be discarded and this request */
2516 eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2517 max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
2520 GNUNET_PEER_resolve (pr->source_pid,
2522 pr->cth = GNUNET_CORE_notify_transmit_ready (core,
2529 if (NULL == pr->cth)
2531 // FIXME: now what? discard?
2536 msize = sizeof (struct PutMessage) + prq->size;
2537 reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
2538 reply->msize = msize;
2539 reply->next = pr->replies_pending;
2540 pm = (struct PutMessage*) &reply[1];
2541 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2542 pm->header.size = htons (msize);
2543 pm->type = htonl (prq->type);
2544 pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
2545 pr->replies_pending = reply;
2546 memcpy (&reply[1], prq->data, prq->size);
2549 pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
2551 GNUNET_TIME_UNIT_FOREVER_REL,
2556 // FIXME: need to try again later (not much
2557 // to do here specifically, but we need to
2558 // check somewhere else to handle this case!)
2561 // FIXME: implement hot-path routing statistics keeping!
2567 * Handle P2P "PUT" request.
2569 * @param cls closure, always NULL
2570 * @param peer the other peer involved (sender or receiver, NULL
2571 * for loopback messages where we are both sender and receiver)
2572 * @param message the actual message
2573 * @return GNUNET_OK to keep the connection open,
2574 * GNUNET_SYSERR to close it (signal serious error)
2577 handle_p2p_put (void *cls,
2578 const struct GNUNET_PeerIdentity *other,
2579 const struct GNUNET_MessageHeader *message)
2581 const struct PutMessage *put;
2585 struct GNUNET_TIME_Absolute expiration;
2586 GNUNET_HashCode query;
2587 const struct KBlock *kb;
2588 struct ProcessReplyClosure prq;
2590 msize = ntohs (message->size);
2591 if (msize < sizeof (struct PutMessage))
2594 return GNUNET_SYSERR;
2596 put = (const struct PutMessage*) message;
2597 dsize = msize - sizeof (struct PutMessage);
2598 type = ntohl (put->type);
2599 expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
2601 /* first, validate! */
2604 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2605 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2606 GNUNET_CRYPTO_hash (&put[1], dsize, &query);
2608 case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2609 if (dsize < sizeof (struct KBlock))
2611 GNUNET_break_op (0);
2612 return GNUNET_SYSERR;
2614 kb = (const struct KBlock*) &put[1];
2615 // FIXME -- validation code below broken...
2616 if ( (dsize != ntohs (kb->purpose.size) + 42) ||
2618 GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
2623 GNUNET_break_op (0);
2624 return GNUNET_SYSERR;
2626 GNUNET_CRYPTO_hash (&kb->keyspace,
2627 sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2630 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2631 // FIXME -- validate SBLOCK!
2634 case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2635 // FIXME -- validate SKBLOCK!
2639 /* unknown block type */
2640 GNUNET_break_op (0);
2641 return GNUNET_SYSERR;
2644 /* now, lookup 'query' */
2645 prq.data = (const void*) &put[1];
2648 prq.expiration = expiration;
2650 GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
2654 // FIXME: if migration is on and load is low,
2655 // queue to store data in datastore;
2656 // use "prq.priority" for that!
2662 * List of handlers for P2P messages
2663 * that we care about.
2665 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2668 GNUNET_MESSAGE_TYPE_FS_GET, 0 },
2670 GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
2676 * Task that will try to initiate a connection with the
2683 core_connect_task (void *cls,
2684 const struct GNUNET_SCHEDULER_TaskContext *tc);
2688 * Function called by the core after we've
2692 core_start_cb (void *cls,
2693 struct GNUNET_CORE_Handle * server,
2694 const struct GNUNET_PeerIdentity *
2697 GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
2702 GNUNET_SCHEDULER_add_delayed (sched,
2704 GNUNET_SCHEDULER_PRIORITY_HIGH,
2705 GNUNET_SCHEDULER_NO_TASK,
2706 GNUNET_TIME_UNIT_SECONDS,
2716 * Task that will try to initiate a connection with the
2723 core_connect_task (void *cls,
2724 const struct GNUNET_SCHEDULER_TaskContext *tc)
2726 GNUNET_CORE_connect (sched,
2728 GNUNET_TIME_UNIT_FOREVER_REL,
2732 &peer_disconnect_handler,
2741 * Process fs requests.
2743 * @param cls closure
2744 * @param sched scheduler to use
2745 * @param server the initialized server
2746 * @param cfg configuration to use
2750 struct GNUNET_SCHEDULER_Handle *s,
2751 struct GNUNET_SERVER_Handle *server,
2752 const struct GNUNET_CONFIGURATION_Handle *c)
2757 ifm = GNUNET_CONTAINER_multihashmap_create (128);
2758 requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2759 requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2760 requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2762 dsh = GNUNET_DATASTORE_connect (cfg,
2766 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2767 _("Failed to connect to datastore service.\n"));
2770 GNUNET_SERVER_disconnect_notify (server,
2771 &handle_client_disconnect,
2773 GNUNET_SERVER_add_handlers (server, handlers);
2774 core_connect_task (NULL, NULL);
2775 GNUNET_SCHEDULER_add_delayed (sched,
2777 GNUNET_SCHEDULER_PRIORITY_IDLE,
2778 GNUNET_SCHEDULER_NO_TASK,
2779 GNUNET_TIME_UNIT_FOREVER_REL,
2786 * The main function for the fs service.
2788 * @param argc number of arguments from the command line
2789 * @param argv command line arguments
2790 * @return 0 ok, 1 on error
2793 main (int argc, char *const *argv)
2795 return (GNUNET_OK ==
2796 GNUNET_SERVICE_run (argc,
2798 "fs", &run, NULL, NULL, NULL)) ? 0 : 1;
2801 /* end of gnunet-service-fs.c */