2 This file is part of GNUnet.
3 (C) 2009 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs.c
23 * @brief program that provides the file-sharing service
24 * @author Christian Grothoff
27 * - fix gazillion of minor FIXME's
28 * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
29 * core to transmit to another peer; need to make sure this is bounded overall...
30 * - randomly delay processing for improved anonymity (can wait)
31 * - content migration (put in local DS) (can wait)
32 * - handle some special cases when forwarding replies based on tracked requests (can wait)
33 * - tracking of success correlations for hot-path routing (can wait)
34 * - various load-based actions (can wait)
35 * - validation of KSBLOCKS (can wait)
36 * - remove on-demand blocks if they keep failing (can wait)
37 * - check that we decrement PIDs always where necessary (can wait)
38 * - find out how to use core-pulling instead of pushing (at least for some cases)
42 #include "gnunet_core_service.h"
43 #include "gnunet_datastore_service.h"
44 #include "gnunet_peer_lib.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_util_lib.h"
50 #define DEBUG_FS GNUNET_NO
54 * In-memory information about indexed files (also available
61 * This is a linked list.
63 struct IndexInfo *next;
66 * Name of the indexed file. Memory allocated
67 * at the end of this struct (do not free).
72 * Context for transmitting confirmation to client,
73 * NULL if we've done this already.
75 struct GNUNET_SERVER_TransmitContext *tc;
78 * Hash of the contents of the file.
80 GNUNET_HashCode file_id;
86 * Signature of a function that is called whenever a datastore
87 * request can be processed (or an entry put on the queue times out).
90 * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
92 typedef void (*RequestFunction)(void *cls,
97 * Doubly-linked list of our requests for the datastore.
99 struct DatastoreRequestQueue
103 * This is a doubly-linked list.
105 struct DatastoreRequestQueue *next;
108 * This is a doubly-linked list.
110 struct DatastoreRequestQueue *prev;
113 * Function to call (will issue the request).
123 * When should this request time-out because we don't care anymore?
125 struct GNUNET_TIME_Absolute timeout;
128 * ID of task used for signaling timeout.
130 GNUNET_SCHEDULER_TaskIdentifier task;
136 * Closure for processing START_SEARCH messages from a client.
138 struct LocalGetContext
142 * This is a doubly-linked list.
144 struct LocalGetContext *next;
147 * This is a doubly-linked list.
149 struct LocalGetContext *prev;
152 * Client that initiated the search.
154 struct GNUNET_SERVER_Client *client;
157 * Array of results that we've already received
160 GNUNET_HashCode *results;
163 * Bloomfilter over all results (for fast query construction);
164 * NULL if we don't have any results.
166 * FIXME: this member is not used, is that OK? If so, it should
169 struct GNUNET_CONTAINER_BloomFilter *results_bf;
172 * DS request associated with this operation.
174 struct DatastoreRequestQueue *req;
177 * Current result message to transmit to client (or NULL).
179 struct ContentMessage *result;
182 * Type of the content that we're looking for.
188 * Desired anonymity level.
190 uint32_t anonymity_level;
193 * Number of results actually stored in the results array.
195 unsigned int results_used;
198 * Size of the results array in memory.
200 unsigned int results_size;
203 * Size (in bytes) of the 'results_bf' bloomfilter.
205 * FIXME: this member is not used, is that OK? If so, it should
208 size_t results_bf_size;
211 * If the request is for a DBLOCK or IBLOCK, this is the identity of
212 * the peer that is known to have a response. Set to all-zeros if
213 * such a target is not known (note that even if OUR anonymity
214 * level is >0 we may happen to know the responder's identity;
215 * nevertheless, we should probably not use it for a DHT-lookup
216 * or similar blunt actions in order to avoid exposing ourselves).
218 struct GNUNET_PeerIdentity target;
221 * If the request is for an SBLOCK, this is the identity of the
222 * pseudonym to which the SBLOCK belongs.
224 GNUNET_HashCode namespace;
227 * Hash of the keyword (aka query) for KBLOCKs; Hash of
228 * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
229 * and hash of the identifier XORed with the target for
230 * SBLOCKS (aka query).
232 GNUNET_HashCode query;
238 * Possible routing policies for an FS-GET request.
243 * Simply drop the request.
245 ROUTING_POLICY_NONE = 0,
248 * Answer it if we can from local datastore.
250 ROUTING_POLICY_ANSWER = 1,
253 * Forward the request to other peers (if possible).
255 ROUTING_POLICY_FORWARD = 2,
258 * Forward to other peers, and ask them to route
259 * the response via ourselves.
261 ROUTING_POLICY_INDIRECT = 6,
264 * Do everything we could possibly do (that would
267 ROUTING_POLICY_ALL = 7
272 * Internal context we use for our initial processing
275 struct ProcessGetContext
278 * The search query (used for datastore lookup).
280 GNUNET_HashCode query;
283 * Which peer we should forward the response to.
285 struct GNUNET_PeerIdentity reply_to;
288 * Namespace for the result (only set for SKS requests)
290 GNUNET_HashCode namespace;
293 * Peer that we should forward the query to if possible
294 * (since that peer likely has the content).
296 struct GNUNET_PeerIdentity prime_target;
299 * When did we receive this request?
301 struct GNUNET_TIME_Absolute start_time;
304 * Our entry in the DRQ (non-NULL while we wait for our
305 * turn to interact with the local database).
307 struct DatastoreRequestQueue *drq;
310 * Filter used to eliminate duplicate results. Can be NULL if we
311 * are not yet filtering any results.
313 struct GNUNET_CONTAINER_BloomFilter *bf;
316 * Bitmap describing which of the optional
317 * hash codes / peer identities were given to us.
322 * Desired block type.
327 * Priority of the request.
332 * Size of the 'bf' (in bytes).
337 * In what ways are we going to process
340 enum RoutingPolicy policy;
343 * Time-to-live for the request (value
349 * Number to mingle hashes for bloom-filter
355 * Number of results that were found so far.
357 unsigned int results_found;
362 * Information we keep for each pending reply.
367 * This is a linked list.
369 struct PendingReply *next;
372 * Size of the reply; actual reply message follows
373 * at the end of this struct.
381 * All requests from a client are kept in a doubly-linked list.
383 struct ClientRequestList;
387 * Information we keep for each pending request. We should try to
388 * keep this struct as small as possible since its memory consumption
389 * is key to how many requests we can have pending at once.
391 struct PendingRequest
395 * ID of a client making a request, NULL if this entry is for a
398 struct GNUNET_SERVER_Client *client;
401 * If this request was made by a client,
402 * this is our entry in the client request
403 * list; otherwise NULL.
405 struct ClientRequestList *crl_entry;
408 * If this is a namespace query, pointer to the hash of the public
409 * key of the namespace; otherwise NULL.
411 GNUNET_HashCode *namespace;
414 * Bloomfilter we use to filter out replies that we don't care about
415 * (anymore). NULL as long as we are interested in all replies.
417 struct GNUNET_CONTAINER_BloomFilter *bf;
420 * Replies that we have received but were unable to forward yet
421 * (typically non-null only if we have a pending transmission
422 * request with the client or the respective peer).
424 struct PendingReply *replies_pending;
427 * Pending transmission request with the core service for the target
428 * peer (for processing of 'replies_pending') or Handle for a
429 * pending query-request for P2P-transmission with the core service.
430 * If non-NULL, this request must be cancelled should this struct be
433 struct GNUNET_CORE_TransmitHandle *cth;
436 * Pending transmission request for the target client (for processing of
437 * 'replies_pending').
439 struct GNUNET_CONNECTION_TransmitHandle *th;
442 * Hash code of all replies that we have seen so far (only valid
443 * if client is not NULL since we only track replies like this for
446 GNUNET_HashCode *replies_seen;
449 * When did we first see this request (form this peer), or, if our
450 * client is initiating, when did we last initiate a search?
452 struct GNUNET_TIME_Absolute start_time;
455 * The query that this request is for.
457 GNUNET_HashCode query;
460 * The task responsible for transmitting queries
463 GNUNET_SCHEDULER_TaskIdentifier task;
466 * (Interned) Peer identifier (only valid if "client" is NULL)
467 * that identifies a peer that gave us this request.
469 GNUNET_PEER_Id source_pid;
472 * (Interned) Peer identifier that identifies a preferred target
475 GNUNET_PEER_Id target_pid;
478 * (Interned) Peer identifiers of peers that have already
479 * received our query for this content.
481 GNUNET_PEER_Id *used_pids;
484 * Size of the 'bf' (in bytes).
489 * Desired anonymity level; only valid for requests from a local client.
491 uint32_t anonymity_level;
494 * How many entries in "used_pids" are actually valid?
496 unsigned int used_pids_off;
499 * How long is the "used_pids" array?
501 unsigned int used_pids_size;
504 * How many entries in "replies_seen" are actually valid?
506 unsigned int replies_seen_off;
509 * How long is the "replies_seen" array?
511 unsigned int replies_seen_size;
514 * Priority with which this request was made. If one of our clients
515 * made the request, then this is the current priority that we are
516 * using when initiating the request. This value is used when
517 * we decide to reward other peers with trust for providing a reply.
522 * Priority points left for us to spend when forwarding this request
525 uint32_t remaining_priority;
528 * Number to mingle hashes for bloom-filter
534 * TTL with which we saw this request (or, if we initiated, TTL that
535 * we used for the request).
540 * Type of the content that this request is for.
548 * All requests from a client are kept in a doubly-linked list.
550 struct ClientRequestList
553 * This is a doubly-linked list.
555 struct ClientRequestList *next;
558 * This is a doubly-linked list.
560 struct ClientRequestList *prev;
563 * A request from this client.
565 struct PendingRequest *req;
568 * Client list with the head and tail of this DLL.
570 struct ClientList *cl;
575 * Linked list of all clients that we are
576 * currently processing requests for.
582 * This is a linked list.
584 struct ClientList *next;
587 * What client is this entry for?
589 struct GNUNET_SERVER_Client* client;
592 * Head of the DLL of requests from this client.
594 struct ClientRequestList *head;
597 * Tail of the DLL of requests from this client.
599 struct ClientRequestList *tail;
605 * Closure for "process_reply" function.
607 struct ProcessReplyClosure
610 * The data for the reply.
615 * When the reply expires.
617 struct GNUNET_TIME_Absolute expiration;
625 * Namespace that this reply belongs to
626 * (if it is of type SBLOCK).
628 GNUNET_HashCode namespace;
636 * How much was this reply worth to us?
643 * Information about a peer that we are connected to.
644 * We track data that is useful for determining which
645 * peers should receive our requests.
651 * List of the last clients for which this peer
652 * successfully answered a query.
654 struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
657 * List of the last PIDs for which
658 * this peer successfully answered a query;
659 * We use 0 to indicate no successful reply.
661 GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
664 * Average delay between sending the peer a request and
665 * getting a reply (only calculated over the requests for
666 * which we actually got a reply). Calculated
667 * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
669 struct GNUNET_TIME_Relative avg_delay;
672 * Average priority of successful replies. Calculated
673 * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
678 * The peer's identity.
683 * Number of requests we have currently pending
684 * with this peer (that is, requests that were
685 * transmitted so recently that we would not retransmit
688 unsigned int pending_requests;
691 * Which offset in "last_p2p_replies" will be updated next?
692 * (we go round-robin).
694 unsigned int last_p2p_replies_woff;
697 * Which offset in "last_client_replies" will be updated next?
698 * (we go round-robin).
700 unsigned int last_client_replies_woff;
706 * Our connection to the datastore.
708 static struct GNUNET_DATASTORE_Handle *dsh;
713 static struct GNUNET_SCHEDULER_Handle *sched;
718 const struct GNUNET_CONFIGURATION_Handle *cfg;
721 * Handle to the core service (NULL until we've connected to it).
723 struct GNUNET_CORE_Handle *core;
726 * Head of doubly-linked LGC list.
728 static struct LocalGetContext *lgc_head;
731 * Tail of doubly-linked LGC list.
733 static struct LocalGetContext *lgc_tail;
736 * Head of request queue for the datastore, sorted by timeout.
738 static struct DatastoreRequestQueue *drq_head;
741 * Tail of request queue for the datastore.
743 static struct DatastoreRequestQueue *drq_tail;
746 * Linked list of indexed files.
748 static struct IndexInfo *indexed_files;
751 * Maps hash over content of indexed files to the respective filename.
752 * The filenames are pointers into the indexed_files linked list and
753 * do not need to be freed.
755 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
758 * Map of query hash codes to requests.
760 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
763 * Map of peer IDs to requests (for those requests coming
766 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
769 * Linked list of all of our clients and their requests.
771 static struct ClientList *clients;
774 * Heap with the request that will expire next at the top. Contains
775 * pointers of type "struct PendingRequest*"; these will *also* be
776 * aliased from the "requests_by_peer" data structures and the
777 * "requests_by_query" table. Note that requests from our clients
778 * don't expire and are thus NOT in the "requests_by_expiration"
779 * (or the "requests_by_peer" tables).
781 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
784 * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
786 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
789 * Maximum number of requests (from other peers) that we're
790 * willing to have pending at any given point in time.
791 * FIXME: set from configuration (and 32 is a tiny value for testing only).
793 static uint64_t max_pending_requests = 32;
796 * Write the current index information list to disk.
801 struct GNUNET_BIO_WriteHandle *wh;
803 struct IndexInfo *pos;
806 GNUNET_CONFIGURATION_get_value_filename (cfg,
811 GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
812 _("Configuration option `%s' in section `%s' missing.\n"),
817 wh = GNUNET_BIO_write_open (fn);
820 GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
821 _("Could not open `%s'.\n"),
830 GNUNET_BIO_write (wh,
832 sizeof (GNUNET_HashCode))) ||
834 GNUNET_BIO_write_string (wh,
840 GNUNET_BIO_write_close (wh))
842 GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
843 _("Error writing `%s'.\n"),
853 * Read index information from disk.
858 struct GNUNET_BIO_ReadHandle *rh;
860 struct IndexInfo *pos;
867 GNUNET_CONFIGURATION_get_value_filename (cfg,
872 GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
873 _("Configuration option `%s' in section `%s' missing.\n"),
878 if (GNUNET_NO == GNUNET_DISK_file_test (fn))
880 /* no index info yet */
884 rh = GNUNET_BIO_read_open (fn);
887 GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
888 _("Could not open `%s'.\n"),
894 while ( (GNUNET_OK ==
896 "Hash of indexed file",
898 sizeof (GNUNET_HashCode))) &&
900 GNUNET_BIO_read_string (rh,
901 "Name of indexed file",
905 slen = strlen (fname) + 1;
906 pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
908 pos->filename = (const char *) &pos[1];
909 memcpy (&pos[1], fname, slen);
911 GNUNET_CONTAINER_multihashmap_put (ifm,
913 (void*) pos->filename,
914 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
920 pos->next = indexed_files;
926 GNUNET_BIO_read_close (rh, &emsg))
933 * We've validated the hash of the file we're about to
934 * index. Signal success to the client and update
935 * our internal data structures.
937 * @param ii the index info entry for the request
940 signal_index_ok (struct IndexInfo *ii)
943 GNUNET_CONTAINER_multihashmap_put (ifm,
945 (void*) ii->filename,
946 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
948 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
949 _("Index request received for file `%s' is indexed as `%s'. Permitting anyway.\n"),
951 (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
953 GNUNET_SERVER_transmit_context_append (ii->tc,
955 GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
956 GNUNET_SERVER_transmit_context_run (ii->tc,
957 GNUNET_TIME_UNIT_MINUTES);
961 ii->next = indexed_files;
964 GNUNET_SERVER_transmit_context_append (ii->tc,
966 GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
967 GNUNET_SERVER_transmit_context_run (ii->tc,
968 GNUNET_TIME_UNIT_MINUTES);
974 * Function called once the hash computation over an
975 * indexed file has completed.
977 * @param cls closure, our publishing context
978 * @param res resulting hash, NULL on error
981 hash_for_index_val (void *cls,
982 const GNUNET_HashCode *
985 struct IndexInfo *ii = cls;
987 if ( (res == NULL) ||
990 sizeof(GNUNET_HashCode))) )
992 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
993 _("Hash mismatch trying to index file `%s' which has hash `%s'\n"),
997 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
999 GNUNET_h2s (&ii->file_id));
1001 GNUNET_SERVER_transmit_context_append (ii->tc,
1003 GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
1004 GNUNET_SERVER_transmit_context_run (ii->tc,
1005 GNUNET_TIME_UNIT_MINUTES);
1009 signal_index_ok (ii);
1014 * Handle INDEX_START-message.
1016 * @param cls closure
1017 * @param client identification of the client
1018 * @param message the actual message
1021 handle_index_start (void *cls,
1022 struct GNUNET_SERVER_Client *client,
1023 const struct GNUNET_MessageHeader *message)
1025 const struct IndexStartMessage *ism;
1028 struct IndexInfo *ii;
1035 msize = ntohs(message->size);
1036 if ( (msize <= sizeof (struct IndexStartMessage)) ||
1037 ( ((const char *)message)[msize-1] != '\0') )
1040 GNUNET_SERVER_receive_done (client,
1044 ism = (const struct IndexStartMessage*) message;
1045 fn = (const char*) &ism[1];
1046 dev = ntohl (ism->device);
1047 ino = GNUNET_ntohll (ism->inode);
1048 ism = (const struct IndexStartMessage*) message;
1049 slen = strlen (fn) + 1;
1050 ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
1051 ii->filename = (const char*) &ii[1];
1052 memcpy (&ii[1], fn, slen);
1053 ii->file_id = ism->file_id;
1054 ii->tc = GNUNET_SERVER_transmit_context_create (client);
1055 if ( ( (dev != 0) ||
1057 (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
1063 /* fast validation OK! */
1064 signal_index_ok (ii);
1068 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1069 "Mismatch in file identifiers (%llu != %llu or %u != %u), need to hash.\n",
1070 (unsigned long long) ino,
1071 (unsigned long long) myino,
1073 (unsigned int) mydev);
1075 /* slow validation, need to hash full file (again) */
1076 GNUNET_CRYPTO_hash_file (sched,
1077 GNUNET_SCHEDULER_PRIORITY_IDLE,
1081 &hash_for_index_val,
1087 * Handle INDEX_LIST_GET-message.
1089 * @param cls closure
1090 * @param client identification of the client
1091 * @param message the actual message
1094 handle_index_list_get (void *cls,
1095 struct GNUNET_SERVER_Client *client,
1096 const struct GNUNET_MessageHeader *message)
1098 struct GNUNET_SERVER_TransmitContext *tc;
1099 struct IndexInfoMessage *iim;
1100 char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1103 struct GNUNET_MessageHeader *msg;
1104 struct IndexInfo *pos;
1106 tc = GNUNET_SERVER_transmit_context_create (client);
1107 iim = (struct IndexInfoMessage*) buf;
1109 pos = indexed_files;
1113 iim->file_id = pos->file_id;
1115 slen = strlen (fn) + 1;
1116 if (slen + sizeof (struct IndexInfoMessage) >
1117 GNUNET_SERVER_MAX_MESSAGE_SIZE)
1122 memcpy (&iim[1], fn, slen);
1123 GNUNET_SERVER_transmit_context_append
1126 sizeof (struct IndexInfoMessage)
1127 - sizeof (struct GNUNET_MessageHeader) + slen,
1128 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
1131 GNUNET_SERVER_transmit_context_append (tc,
1133 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
1134 GNUNET_SERVER_transmit_context_run (tc,
1135 GNUNET_TIME_UNIT_MINUTES);
1140 * Handle UNINDEX-message.
1142 * @param cls closure
1143 * @param client identification of the client
1144 * @param message the actual message
1147 handle_unindex (void *cls,
1148 struct GNUNET_SERVER_Client *client,
1149 const struct GNUNET_MessageHeader *message)
1151 const struct UnindexMessage *um;
1152 struct IndexInfo *pos;
1153 struct IndexInfo *prev;
1154 struct IndexInfo *next;
1155 struct GNUNET_SERVER_TransmitContext *tc;
1158 um = (const struct UnindexMessage*) message;
1161 pos = indexed_files;
1165 if (0 == memcmp (&pos->file_id,
1167 sizeof (GNUNET_HashCode)))
1170 indexed_files = pos->next;
1172 prev->next = pos->next;
1183 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1184 "Client requested unindexing of file `%s': %s\n",
1185 GNUNET_h2s (&um->file_id),
1186 found ? "found" : "not found");
1188 if (GNUNET_YES == found)
1189 write_index_list ();
1190 tc = GNUNET_SERVER_transmit_context_create (client);
1191 GNUNET_SERVER_transmit_context_append (tc,
1193 GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
1194 GNUNET_SERVER_transmit_context_run (tc,
1195 GNUNET_TIME_UNIT_MINUTES);
1200 * Run the next DS request in our
1201 * queue, we're done with the current one.
1206 struct DatastoreRequestQueue *e;
1208 while (NULL != (e = drq_head))
1210 if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
1212 if (e->task != GNUNET_SCHEDULER_NO_TASK)
1213 GNUNET_SCHEDULER_cancel (sched, e->task);
1214 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1215 e->req (e->req_cls, GNUNET_NO);
1220 if (e->task != GNUNET_SCHEDULER_NO_TASK)
1221 GNUNET_SCHEDULER_cancel (sched, e->task);
1222 e->task = GNUNET_SCHEDULER_NO_TASK;
1223 e->req (e->req_cls, GNUNET_YES);
1224 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1230 * A datastore request had to be timed out.
1232 * @param cls closure (of type "struct DatastoreRequestQueue*")
1233 * @param tc task context, unused
1236 timeout_ds_request (void *cls,
1237 const struct GNUNET_SCHEDULER_TaskContext *tc)
1239 struct DatastoreRequestQueue *e = cls;
1241 e->task = GNUNET_SCHEDULER_NO_TASK;
1242 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1243 e->req (e->req_cls, GNUNET_NO);
1249 * Queue a request for the datastore.
1251 * @param deadline by when the request should run
1252 * @param fun function to call once the request can be run
1253 * @param fun_cls closure for fun
1255 static struct DatastoreRequestQueue *
1256 queue_ds_request (struct GNUNET_TIME_Relative deadline,
1257 RequestFunction fun,
1260 struct DatastoreRequestQueue *e;
1261 struct DatastoreRequestQueue *bef;
1263 if (drq_head == NULL)
1265 /* no other requests pending, run immediately */
1266 fun (fun_cls, GNUNET_OK);
1269 e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
1270 e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
1272 e->req_cls = fun_cls;
1273 if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1275 /* local request, highest prio, put at head of queue
1276 regardless of deadline */
1282 while ( (NULL != bef) &&
1283 (e->timeout.value < bef->timeout.value) )
1286 GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
1287 if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1289 e->task = GNUNET_SCHEDULER_add_delayed (sched,
1291 GNUNET_SCHEDULER_PRIORITY_BACKGROUND,
1292 GNUNET_SCHEDULER_NO_TASK,
1294 &timeout_ds_request,
1301 * Free the state associated with a local get context.
1303 * @param lgc the lgc to free
1306 local_get_context_free (struct LocalGetContext *lgc)
1308 GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1309 GNUNET_SERVER_client_drop (lgc->client);
1310 GNUNET_free_non_null (lgc->results);
1311 if (lgc->results_bf != NULL)
1312 GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
1313 if (lgc->req != NULL)
1315 if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
1316 GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
1317 GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1318 GNUNET_free (lgc->req);
1325 * We're able to transmit the next (local) result to the client.
1326 * Do it and ask the datastore for more. Or, on error, tell
1327 * the datastore to stop giving us more.
1329 * @param cls our closure (struct LocalGetContext)
1330 * @param max maximum number of bytes we can transmit
1331 * @param buf where to copy our message
1332 * @return number of bytes copied to buf
1335 transmit_local_result (void *cls,
1339 struct LocalGetContext *lgc = cls;
1345 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1346 "Failed to transmit result to local client, aborting datastore iteration.\n");
1349 GNUNET_free (lgc->result);
1351 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1354 msize = ntohs (lgc->result->header.size);
1356 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1357 "Transmitting %u bytes of result to local client.\n",
1360 GNUNET_assert (max >= msize);
1361 memcpy (buf, lgc->result, msize);
1362 GNUNET_free (lgc->result);
1364 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1370 * Continuation called from datastore's remove
1374 * @param success did the deletion work?
1375 * @param msg error message
1378 remove_cont (void *cls,
1382 if (GNUNET_OK != success)
1383 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1384 _("Failed to delete bogus block: %s\n"),
1386 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1391 * Mingle hash with the mingle_number to
1392 * produce different bits.
1395 mingle_hash (const GNUNET_HashCode * in,
1396 int32_t mingle_number,
1397 GNUNET_HashCode * hc)
1401 GNUNET_CRYPTO_hash (&mingle_number,
1404 GNUNET_CRYPTO_hash_xor (&m, in, hc);
1409 * We've received an on-demand encoded block
1410 * from the datastore. Attempt to do on-demand
1411 * encoding and (if successful), call the
1412 * continuation with the resulting block. On
1413 * error, clean up and ask the datastore for
1416 * @param key key for the content
1417 * @param size number of bytes in data
1418 * @param data content stored
1419 * @param type type of the content
1420 * @param priority priority of the content
1421 * @param anonymity anonymity-level for the content
1422 * @param expiration expiration time for the content
1423 * @param uid unique identifier for the datum;
1424 * maybe 0 if no unique identifier is available
1425 * @param cont function to call with the actual block
1426 * @param cont_cls closure for cont
1429 handle_on_demand_block (const GNUNET_HashCode * key,
1435 struct GNUNET_TIME_Absolute
1436 expiration, uint64_t uid,
1437 GNUNET_DATASTORE_Iterator cont,
1440 const struct OnDemandBlock *odb;
1441 GNUNET_HashCode nkey;
1442 struct GNUNET_CRYPTO_AesSessionKey skey;
1443 struct GNUNET_CRYPTO_AesInitializationVector iv;
1444 GNUNET_HashCode query;
1446 char ndata[DBLOCK_SIZE];
1447 char edata[DBLOCK_SIZE];
1449 struct GNUNET_DISK_FileHandle *fh;
1452 if (size != sizeof (struct OnDemandBlock))
1455 GNUNET_DATASTORE_remove (dsh,
1461 GNUNET_TIME_UNIT_FOREVER_REL);
1464 odb = (const struct OnDemandBlock*) data;
1465 off = GNUNET_ntohll (odb->offset);
1466 fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1469 if ( (NULL == fn) ||
1470 (NULL == (fh = GNUNET_DISK_file_open (fn,
1471 GNUNET_DISK_OPEN_READ,
1472 GNUNET_DISK_PERM_NONE))) ||
1474 GNUNET_DISK_file_seek (fh,
1476 GNUNET_DISK_SEEK_SET)) ||
1478 (nsize = GNUNET_DISK_file_read (fh,
1482 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1483 _("Could not access indexed file `%s' at offset %llu: %s\n"),
1484 GNUNET_h2s (&odb->file_id),
1485 (unsigned long long) off,
1488 GNUNET_DISK_file_close (fh);
1489 /* FIXME: if this happens often, we need
1490 to remove the OnDemand block from the DS! */
1491 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1494 GNUNET_DISK_file_close (fh);
1495 GNUNET_CRYPTO_hash (ndata,
1498 GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1499 GNUNET_CRYPTO_aes_encrypt (ndata,
1504 GNUNET_CRYPTO_hash (edata,
1507 if (0 != memcmp (&query,
1509 sizeof (GNUNET_HashCode)))
1511 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1512 _("Indexed file `%s' changed at offset %llu\n"),
1514 (unsigned long long) off);
1515 /* FIXME: if this happens often, we need
1516 to remove the OnDemand block from the DS! */
1517 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1524 GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1533 * How many bytes should a bloomfilter be if we have already seen
1534 * entry_count responses? Note that BLOOMFILTER_K gives us the number
1535 * of bits set per entry. Furthermore, we should not re-size the
1536 * filter too often (to keep it cheap).
1538 * Since other peers will also add entries but not resize the filter,
1539 * we should generally pick a slightly larger size than what the
1540 * strict math would suggest.
1542 * @return must be a power of two and smaller or equal to 2^15.
1545 compute_bloomfilter_size (unsigned int entry_count)
1548 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1549 uint16_t max = 1 << 15;
1551 if (entry_count > max)
1554 while ((size < max) && (size < ideal))
1563 * Recalculate our bloom filter for filtering replies.
1565 * @param count number of entries we are filtering right now
1566 * @param mingle set to our new mingling value
1567 * @param bf_size set to the size of the bloomfilter
1568 * @param entries the entries to filter
1569 * @return updated bloomfilter, NULL for none
1571 static struct GNUNET_CONTAINER_BloomFilter *
1572 refresh_bloomfilter (unsigned int count,
1575 const GNUNET_HashCode *entries)
1577 struct GNUNET_CONTAINER_BloomFilter *bf;
1580 GNUNET_HashCode mhash;
1584 nsize = compute_bloomfilter_size (count);
1585 *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1587 bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
1590 for (i=0;i<count;i++)
1592 mingle_hash (&entries[i], *mingle, &mhash);
1593 GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1600 * Closure used for "target_peer_select_cb".
1602 struct PeerSelectionContext
1605 * The request for which we are selecting
1608 struct PendingRequest *pr;
1611 * Current "prime" target.
1613 struct GNUNET_PeerIdentity target;
1616 * How much do we like this target?
1618 double target_score;
1624 * Function called for each connected peer to determine
1625 * which one(s) would make good targets for forwarding.
1627 * @param cls closure (struct PeerSelectionContext)
1628 * @param key current key code (peer identity)
1629 * @param value value in the hash map (struct ConnectedPeer)
1630 * @return GNUNET_YES if we should continue to
1635 target_peer_select_cb (void *cls,
1636 const GNUNET_HashCode * key,
1639 struct PeerSelectionContext *psc = cls;
1640 struct ConnectedPeer *cp = value;
1641 struct PendingRequest *pr = psc->pr;
1645 /* 1) check if we have already (recently) forwarded to this peer */
1646 for (i=0;i<pr->used_pids_off;i++)
1647 if (pr->used_pids[i] == cp->pid)
1648 return GNUNET_YES; /* skip */
1649 // 2) calculate how much we'd like to forward to this peer
1650 score = 0; // FIXME!
1652 /* store best-fit in closure */
1653 if (score > psc->target_score)
1655 psc->target_score = score;
1656 psc->target.hashPubKey = *key;
1663 * We use a random delay to make the timing of requests
1664 * less predictable. This function returns such a random
1667 * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1669 static struct GNUNET_TIME_Relative
1670 get_processing_delay ()
1672 return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1673 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1679 * Task that is run for each request with the
1680 * goal of forwarding the associated query to
1681 * other peers. The task should re-schedule
1682 * itself to be re-run once the TTL has expired.
1683 * (or at a later time if more peers should
1684 * be queried earlier).
1686 * @param cls the requests "struct PendingRequest*"
1687 * @param tc task context (unused)
1690 forward_request_task (void *cls,
1691 const struct GNUNET_SCHEDULER_TaskContext *tc);
1695 * We've selected a peer for forwarding of a query.
1696 * Construct the message and then re-schedule the
1697 * task to forward again to (other) peers.
1699 * @param cls closure
1700 * @param size number of bytes available in buf
1701 * @param buf where the callee should write the message
1702 * @return number of bytes written to buf
1705 transmit_request_cb (void *cls,
1709 struct PendingRequest *pr = cls;
1710 struct GetMessage *gm;
1711 GNUNET_HashCode *ext;
1717 /* (1) check for timeout */
1720 /* timeout, try another peer immediately again */
1721 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1723 GNUNET_SCHEDULER_PRIORITY_IDLE,
1724 GNUNET_SCHEDULER_NO_TASK,
1725 GNUNET_TIME_UNIT_ZERO,
1726 &forward_request_task,
1730 /* (2) build query message */
1731 k = 0; // FIXME: count hash codes!
1732 msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1733 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1734 gm = (struct GetMessage*) buf;
1735 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1736 gm->header.size = htons (msize);
1737 gm->type = htonl (pr->type);
1738 pr->remaining_priority /= 2;
1739 gm->priority = htonl (pr->remaining_priority);
1740 gm->ttl = htonl (pr->ttl);
1741 gm->filter_mutator = htonl(pr->mingle);
1742 gm->hash_bitmap = htonl (42);
1743 gm->query = pr->query;
1744 ext = (GNUNET_HashCode*) &gm[1];
1745 // FIXME: setup "ext[0]..[k-1]"
1746 bfdata = (char *) &ext[k];
1748 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1752 /* (3) schedule job to do it again (or another peer, etc.) */
1753 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1755 GNUNET_SCHEDULER_PRIORITY_IDLE,
1756 GNUNET_SCHEDULER_NO_TASK,
1757 get_processing_delay (), // FIXME!
1758 &forward_request_task,
1766 * Function called after we've tried to reserve
1767 * a certain amount of bandwidth for a reply.
1768 * Check if we succeeded and if so send our query.
1770 * @param cls the requests "struct PendingRequest*"
1771 * @param peer identifies the peer
1772 * @param latency current latency estimate, "FOREVER" if we have been
1774 * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1775 * @param bpm_out set to the current bandwidth limit (sending) for this peer
1776 * @param amount set to the amount that was actually reserved or unreserved
1777 * @param preference current traffic preference for the given peer
1780 target_reservation_cb (void *cls,
1782 GNUNET_PeerIdentity * peer,
1783 unsigned int bpm_in,
1784 unsigned int bpm_out,
1785 struct GNUNET_TIME_Relative
1786 latency, int amount,
1787 unsigned long long preference)
1789 struct PendingRequest *pr = cls;
1792 struct GNUNET_TIME_Relative maxdelay;
1794 GNUNET_assert (peer != NULL);
1795 if ( (amount != DBLOCK_SIZE) ||
1798 /* try again later; FIXME: we may need to un-reserve "amount"? */
1799 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1801 GNUNET_SCHEDULER_PRIORITY_IDLE,
1802 GNUNET_SCHEDULER_NO_TASK,
1803 get_processing_delay (), // FIXME: longer?
1804 &forward_request_task,
1808 // (2) transmit, update ttl/priority
1809 // FIXME: calculate priority, maxdelay, size properly!
1812 maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
1813 pr->cth = GNUNET_CORE_notify_transmit_ready (core,
1818 &transmit_request_cb,
1820 if (pr->cth == NULL)
1822 /* try again later */
1823 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1825 GNUNET_SCHEDULER_PRIORITY_IDLE,
1826 GNUNET_SCHEDULER_NO_TASK,
1827 get_processing_delay (), // FIXME: longer?
1828 &forward_request_task,
1835 * Task that is run for each request with the
1836 * goal of forwarding the associated query to
1837 * other peers. The task should re-schedule
1838 * itself to be re-run once the TTL has expired.
1839 * (or at a later time if more peers should
1840 * be queried earlier).
1842 * @param cls the requests "struct PendingRequest*"
1843 * @param tc task context (unused)
1846 forward_request_task (void *cls,
1847 const struct GNUNET_SCHEDULER_TaskContext *tc)
1849 struct PendingRequest *pr = cls;
1850 struct PeerSelectionContext psc;
1852 pr->task = GNUNET_SCHEDULER_NO_TASK;
1853 if (pr->cth != NULL)
1855 /* we're busy transmitting a result, wait a bit */
1856 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1858 GNUNET_SCHEDULER_PRIORITY_IDLE,
1859 GNUNET_SCHEDULER_NO_TASK,
1860 get_processing_delay (),
1861 &forward_request_task,
1865 /* (1) select target */
1867 psc.target_score = DBL_MIN;
1868 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1869 &target_peer_select_cb,
1871 if (psc.target_score == DBL_MIN)
1873 /* no possible target found, wait some time */
1874 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1876 GNUNET_SCHEDULER_PRIORITY_IDLE,
1877 GNUNET_SCHEDULER_NO_TASK,
1878 get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
1879 &forward_request_task,
1883 /* (2) reserve reply bandwidth */
1884 // FIXME: need a way to cancel; this
1885 // async operation is problematic (segv-problematic)
1886 // if "pr" is destroyed while it happens!
1887 GNUNET_CORE_peer_configure (core,
1889 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1891 DBLOCK_SIZE, // FIXME: make dependent on type?
1893 &target_reservation_cb,
1899 * We're processing (local) results for a search request
1900 * from a (local) client. Pass applicable results to the
1901 * client and if we are done either clean up (operation
1902 * complete) or switch to P2P search (more results possible).
1904 * @param cls our closure (struct LocalGetContext)
1905 * @param key key for the content
1906 * @param size number of bytes in data
1907 * @param data content stored
1908 * @param type type of the content
1909 * @param priority priority of the content
1910 * @param anonymity anonymity-level for the content
1911 * @param expiration expiration time for the content
1912 * @param uid unique identifier for the datum;
1913 * maybe 0 if no unique identifier is available
1916 process_local_get_result (void *cls,
1917 const GNUNET_HashCode * key,
1923 struct GNUNET_TIME_Absolute
1927 struct LocalGetContext *lgc = cls;
1928 struct PendingRequest *pr;
1929 struct ClientRequestList *crl;
1930 struct ClientList *cl;
1937 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1938 "Received last result for `%s' from local datastore, deciding what to do next.\n",
1939 GNUNET_h2s (&lgc->query));
1941 /* no further results from datastore; continue
1942 processing further requests from the client and
1943 allow the next task to use the datastore; also,
1944 switch to P2P requests or clean up our state. */
1946 GNUNET_SERVER_receive_done (lgc->client,
1948 if ( (lgc->results_used == 0) ||
1949 (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1950 (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1951 (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1954 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1955 "Forwarding query for `%s' to network.\n",
1956 GNUNET_h2s (&lgc->query));
1959 while ( (NULL != cl) &&
1960 (cl->client != lgc->client) )
1964 cl = GNUNET_malloc (sizeof (struct ClientList));
1965 cl->client = lgc->client;
1969 crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1971 GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1972 pr = GNUNET_malloc (sizeof (struct PendingRequest));
1973 pr->client = lgc->client;
1974 GNUNET_SERVER_client_keep (pr->client);
1975 pr->crl_entry = crl;
1977 if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1979 pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1980 *pr->namespace = lgc->namespace;
1982 pr->replies_seen = lgc->results;
1983 lgc->results = NULL;
1984 pr->start_time = GNUNET_TIME_absolute_get ();
1985 pr->query = lgc->query;
1986 pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1987 pr->replies_seen_off = lgc->results_used;
1988 pr->replies_seen_size = lgc->results_size;
1989 lgc->results_size = 0;
1990 pr->type = lgc->type;
1991 pr->anonymity_level = lgc->anonymity_level;
1992 pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1996 GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1999 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2000 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2002 GNUNET_SCHEDULER_PRIORITY_IDLE,
2003 GNUNET_SCHEDULER_NO_TASK,
2004 get_processing_delay (),
2005 &forward_request_task,
2007 local_get_context_free (lgc);
2010 /* got all possible results, clean up! */
2012 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2013 "Found all possible results for query for `%s', done!\n",
2014 GNUNET_h2s (&lgc->query));
2016 local_get_context_free (lgc);
2019 if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2022 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2023 "Received on-demand block for `%s' from local datastore, fetching data.\n",
2024 GNUNET_h2s (&lgc->query));
2026 handle_on_demand_block (key, size, data, type, priority,
2027 anonymity, expiration, uid,
2028 &process_local_get_result,
2032 if ( (type != lgc->type) &&
2033 (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) )
2035 /* this should be virtually impossible to reach (DBLOCK
2036 query hash being identical to KBLOCK/SBLOCK query hash);
2037 nevertheless, if it happens, the correct thing is to
2038 simply skip the result. */
2040 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2041 "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n",
2044 GNUNET_h2s (&lgc->query));
2046 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2049 /* check if this is a result we've alredy
2051 for (i=0;i<lgc->results_used;i++)
2052 if (0 == memcmp (key,
2054 sizeof (GNUNET_HashCode)))
2057 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2058 "Received duplicate result for `%s' from local datastore, ignoring.\n",
2059 GNUNET_h2s (&lgc->query));
2061 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2064 if (lgc->results_used == lgc->results_size)
2065 GNUNET_array_grow (lgc->results,
2067 lgc->results_size * 2 + 2);
2068 GNUNET_CRYPTO_hash (data,
2070 &lgc->results[lgc->results_used++]);
2071 msize = size + sizeof (struct ContentMessage);
2072 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2073 lgc->result = GNUNET_malloc (msize);
2074 lgc->result->header.size = htons (msize);
2075 lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2076 lgc->result->type = htonl (type);
2077 lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
2078 memcpy (&lgc->result[1],
2082 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2083 "Received new result for `%s' from local datastore, passing to client.\n",
2084 GNUNET_h2s (&lgc->query));
2086 GNUNET_SERVER_notify_transmit_ready (lgc->client,
2088 GNUNET_TIME_UNIT_FOREVER_REL,
2089 &transmit_local_result,
2095 * We're processing a search request from a local
2096 * client. Now it is our turn to query the datastore.
2098 * @param cls our closure (struct LocalGetContext)
2102 transmit_local_get (void *cls,
2103 const struct GNUNET_SCHEDULER_TaskContext *tc)
2105 struct LocalGetContext *lgc = cls;
2109 if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2110 type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2111 GNUNET_DATASTORE_get (dsh,
2114 &process_local_get_result,
2116 GNUNET_TIME_UNIT_FOREVER_REL);
2121 * We're processing a search request from a local
2122 * client. Now it is our turn to query the datastore.
2124 * @param cls our closure (struct LocalGetContext)
2125 * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
2128 transmit_local_get_ready (void *cls,
2131 struct LocalGetContext *lgc = cls;
2133 GNUNET_assert (GNUNET_OK == ok);
2134 GNUNET_SCHEDULER_add_continuation (sched,
2136 &transmit_local_get,
2138 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2143 * Handle START_SEARCH-message (search request from client).
2145 * @param cls closure
2146 * @param client identification of the client
2147 * @param message the actual message
2150 handle_start_search (void *cls,
2151 struct GNUNET_SERVER_Client *client,
2152 const struct GNUNET_MessageHeader *message)
2154 const struct SearchMessage *sm;
2155 struct LocalGetContext *lgc;
2159 msize = ntohs (message->size);
2160 if ( (msize < sizeof (struct SearchMessage)) ||
2161 (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2164 GNUNET_SERVER_receive_done (client,
2168 sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2169 sm = (const struct SearchMessage*) message;
2170 GNUNET_SERVER_client_keep (client);
2171 lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
2174 lgc->results_used = sc;
2175 GNUNET_array_grow (lgc->results,
2178 memcpy (lgc->results,
2180 sc * sizeof (GNUNET_HashCode));
2182 lgc->client = client;
2183 lgc->type = ntohl (sm->type);
2184 lgc->anonymity_level = ntohl (sm->anonymity_level);
2187 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2188 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2189 lgc->target.hashPubKey = sm->target;
2191 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2192 lgc->namespace = sm->target;
2197 lgc->query = sm->query;
2198 GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
2199 lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
2200 &transmit_local_get_ready,
2206 * List of handlers for the messages understood by this
2209 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2210 {&handle_index_start, NULL,
2211 GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
2212 {&handle_index_list_get, NULL,
2213 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
2214 {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
2215 sizeof (struct UnindexMessage) },
2216 {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
2223 * Clean up the memory used by the PendingRequest structure (except
2224 * for the client or peer list that the request may be part of).
2226 * @param pr request to clean up
2229 destroy_pending_request (struct PendingRequest *pr)
2231 struct PendingReply *reply;
2232 struct ClientList *cl;
2234 GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
2237 // FIXME: not sure how this can work (efficiently)
2238 // also, what does the return value mean?
2239 if (pr->client == NULL)
2241 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
2246 cl = pr->crl_entry->cl;
2247 GNUNET_CONTAINER_DLL_remove (cl->head,
2251 if (GNUNET_SCHEDULER_NO_TASK != pr->task)
2252 GNUNET_SCHEDULER_cancel (sched, pr->task);
2253 if (NULL != pr->cth)
2254 GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
2256 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2258 GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
2259 while (NULL != (reply = pr->replies_pending))
2261 pr->replies_pending = reply->next;
2262 GNUNET_free (reply);
2264 GNUNET_PEER_change_rc (pr->source_pid, -1);
2265 GNUNET_PEER_change_rc (pr->target_pid, -1);
2266 GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
2267 GNUNET_free_non_null (pr->used_pids);
2268 GNUNET_free_non_null (pr->replies_seen);
2269 GNUNET_free_non_null (pr->namespace);
2275 * A client disconnected. Remove all of its pending queries.
2277 * @param cls closure, NULL
2278 * @param client identification of the client
2281 handle_client_disconnect (void *cls,
2282 struct GNUNET_SERVER_Client
2285 struct LocalGetContext *lgc;
2286 struct ClientList *cpos;
2287 struct ClientList *cprev;
2288 struct ClientRequestList *rl;
2291 while ( (NULL != lgc) &&
2292 (lgc->client != client) )
2295 local_get_context_free (lgc);
2298 while ( (NULL != cpos) &&
2299 (clients->client != client) )
2307 clients = cpos->next;
2309 cprev->next = cpos->next;
2310 while (NULL != (rl = cpos->head))
2312 cpos->head = rl->next;
2313 destroy_pending_request (rl->req);
2322 * Iterator over entries in the "requests_by_query" map
2323 * that frees all the entries.
2325 * @param cls closure, NULL
2326 * @param key current key code (the query, unused)
2327 * @param value value in the hash map, of type "struct PendingRequest*"
2328 * @return GNUNET_YES (we should continue to iterate)
2331 destroy_pending_request_cb (void *cls,
2332 const GNUNET_HashCode * key,
2335 struct PendingRequest *pr = value;
2337 destroy_pending_request (pr);
2343 * Task run during shutdown.
2349 shutdown_task (void *cls,
2350 const struct GNUNET_SCHEDULER_TaskContext *tc)
2352 struct IndexInfo *pos;
2355 GNUNET_CORE_disconnect (core);
2356 GNUNET_DATASTORE_disconnect (dsh,
2359 GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
2360 &destroy_pending_request_cb,
2362 while (clients != NULL)
2363 handle_client_disconnect (NULL,
2365 GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
2366 requests_by_query = NULL;
2367 GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
2368 requests_by_peer = NULL;
2369 GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
2370 requests_by_expiration = NULL;
2371 // FIXME: iterate over entries and free individually?
2372 // (or do we get disconnect notifications?)
2373 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2374 connected_peers = NULL;
2375 GNUNET_CONTAINER_multihashmap_destroy (ifm);
2377 while (NULL != (pos = indexed_files))
2379 indexed_files = pos->next;
2386 * Free (each) request made by the peer.
2388 * @param cls closure, points to peer that the request belongs to
2389 * @param key current key code
2390 * @param value value in the hash map
2391 * @return GNUNET_YES (we should continue to iterate)
2394 destroy_request (void *cls,
2395 const GNUNET_HashCode * key,
2398 const struct GNUNET_PeerIdentity * peer = cls;
2399 struct PendingRequest *pr = value;
2401 GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2404 destroy_pending_request (pr);
2411 * Method called whenever a given peer connects.
2413 * @param cls closure, not used
2414 * @param peer peer identity this notification is about
2417 peer_connect_handler (void *cls,
2419 GNUNET_PeerIdentity * peer)
2421 struct ConnectedPeer *cp;
2423 cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
2424 cp->pid = GNUNET_PEER_intern (peer);
2425 GNUNET_CONTAINER_multihashmap_put (connected_peers,
2428 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2433 * Method called whenever a peer disconnects.
2435 * @param cls closure, not used
2436 * @param peer peer identity this notification is about
2439 peer_disconnect_handler (void *cls,
2441 GNUNET_PeerIdentity * peer)
2443 struct ConnectedPeer *cp;
2445 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2447 GNUNET_PEER_change_rc (cp->pid, -1);
2448 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
2450 GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
2458 * We're processing a GET request from
2459 * another peer and have decided to forward
2460 * it to other peers.
2462 * @param cls our "struct ProcessGetContext *"
2466 forward_get_request (void *cls,
2467 const struct GNUNET_SCHEDULER_TaskContext *tc)
2469 struct ProcessGetContext *pgc = cls;
2470 struct PendingRequest *pr;
2471 struct PendingRequest *eer;
2472 struct GNUNET_PeerIdentity target;
2474 pr = GNUNET_malloc (sizeof (struct PendingRequest));
2475 if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
2477 pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
2478 *pr->namespace = pgc->namespace;
2481 pr->bf_size = pgc->bf_size;
2483 pr->start_time = pgc->start_time;
2484 pr->query = pgc->query;
2485 pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
2486 if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
2487 pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
2488 pr->anonymity_level = 1; /* default */
2489 pr->priority = pgc->priority;
2490 pr->remaining_priority = pr->priority;
2491 pr->mingle = pgc->mingle;
2493 pr->type = pgc->type;
2494 GNUNET_CONTAINER_multihashmap_put (requests_by_query,
2497 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2498 GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
2499 &pgc->reply_to.hashPubKey,
2501 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2502 GNUNET_CONTAINER_heap_insert (requests_by_expiration,
2504 pr->start_time.value + pr->ttl);
2505 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
2507 /* expire oldest request! */
2508 eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2509 GNUNET_PEER_resolve (eer->source_pid,
2511 GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2514 destroy_pending_request (eer);
2516 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2518 GNUNET_SCHEDULER_PRIORITY_IDLE,
2519 GNUNET_SCHEDULER_NO_TASK,
2520 get_processing_delay (),
2521 &forward_request_task,
2526 * Transmit the given message by copying it to
2527 * the target buffer "buf". "buf" will be
2528 * NULL and "size" zero if the socket was closed for
2529 * writing in the meantime. In that case, only
2533 * @param cls closure, pointer to the message
2534 * @param size number of bytes available in buf
2535 * @param buf where the callee should write the message
2536 * @return number of bytes written to buf
2539 transmit_message (void *cls,
2540 size_t size, void *buf)
2542 struct GNUNET_MessageHeader *msg = cls;
2548 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2549 "Dropping reply, core too busy.\n");
2554 msize = ntohs (msg->size);
2555 GNUNET_assert (size >= msize);
2556 memcpy (buf, msg, msize);
2563 * Test if the load on this peer is too high
2564 * to even consider processing the query at
2567 * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
2570 test_load_too_high ()
2572 return GNUNET_NO; // FIXME
2577 * We're processing (local) results for a search request
2578 * from another peer. Pass applicable results to the
2579 * peer and if we are done either clean up (operation
2580 * complete) or forward to other peers (more results possible).
2582 * @param cls our closure (struct LocalGetContext)
2583 * @param key key for the content
2584 * @param size number of bytes in data
2585 * @param data content stored
2586 * @param type type of the content
2587 * @param priority priority of the content
2588 * @param anonymity anonymity-level for the content
2589 * @param expiration expiration time for the content
2590 * @param uid unique identifier for the datum;
2591 * maybe 0 if no unique identifier is available
2594 process_p2p_get_result (void *cls,
2595 const GNUNET_HashCode * key,
2601 struct GNUNET_TIME_Absolute
2605 struct ProcessGetContext *pgc = cls;
2606 GNUNET_HashCode dhash;
2607 GNUNET_HashCode mhash;
2608 struct PutMessage *reply;
2612 /* no more results */
2613 if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) == ROUTING_POLICY_FORWARD) &&
2614 ( (0 == pgc->results_found) ||
2615 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2616 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2617 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2619 GNUNET_SCHEDULER_add_continuation (sched,
2621 &forward_get_request,
2623 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2627 if (pgc->bf != NULL)
2628 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2634 if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2636 handle_on_demand_block (key, size, data, type, priority,
2637 anonymity, expiration, uid,
2638 &process_p2p_get_result,
2642 /* check for duplicates */
2643 GNUNET_CRYPTO_hash (data, size, &dhash);
2644 mingle_hash (&dhash,
2647 if ( (pgc->bf != NULL) &&
2649 GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2653 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2654 "Result from datastore filtered by bloomfilter.\n");
2656 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2659 pgc->results_found++;
2660 if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2661 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2662 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2664 if (pgc->bf == NULL)
2667 pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2671 GNUNET_CONTAINER_bloomfilter_add (pgc->bf,
2675 reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2676 reply->header.size = htons (sizeof (struct PutMessage) + size);
2677 reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2678 reply->type = htonl (type);
2679 reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2680 memcpy (&reply[1], data, size);
2681 GNUNET_CORE_notify_transmit_ready (core,
2683 ACCEPTABLE_REPLY_DELAY,
2685 sizeof (struct PutMessage) + size,
2688 if ( (GNUNET_YES == test_load_too_high()) ||
2689 (pgc->results_found > 5 + 2 * pgc->priority) )
2691 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2692 pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2695 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2700 * We're processing a GET request from another peer. Give it to our
2703 * @param cls our "struct ProcessGetContext"
2704 * @param ok did we get a datastore slice or not?
2707 ds_get_request (void *cls,
2710 struct ProcessGetContext *pgc = cls;
2712 struct GNUNET_TIME_Relative timeout;
2714 if (GNUNET_OK != ok)
2716 /* no point in doing P2P stuff if we can't even do local */
2721 if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2722 type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2723 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2724 (pgc->priority + 1));
2725 GNUNET_DATASTORE_get (dsh,
2728 &process_p2p_get_result,
2735 * The priority level imposes a bound on the maximum
2736 * value for the ttl that can be requested.
2738 * @param ttl_in requested ttl
2739 * @param prio given priority
2740 * @return ttl_in if ttl_in is below the limit,
2741 * otherwise the ttl-limit for the given priority
2744 bound_ttl (int32_t ttl_in, uint32_t prio)
2746 unsigned long long allowed;
2750 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
2751 if (ttl_in > allowed)
2753 if (allowed >= (1 << 30))
2762 * We've received a request with the specified
2763 * priority. Bound it according to how much
2764 * we trust the given peer.
2766 * @param prio_in requested priority
2767 * @param peer the peer making the request
2768 * @return effective priority
2771 bound_priority (uint32_t prio_in,
2772 const struct GNUNET_PeerIdentity *peer)
2779 * Handle P2P "GET" request.
2781 * @param cls closure, always NULL
2782 * @param other the other peer involved (sender or receiver, NULL
2783 * for loopback messages where we are both sender and receiver)
2784 * @param message the actual message
2785 * @return GNUNET_OK to keep the connection open,
2786 * GNUNET_SYSERR to close it (signal serious error)
2789 handle_p2p_get (void *cls,
2790 const struct GNUNET_PeerIdentity *other,
2791 const struct GNUNET_MessageHeader *message)
2794 const struct GetMessage *gm;
2796 const GNUNET_HashCode *opt;
2797 struct ProcessGetContext *pgc;
2800 uint32_t ttl_decrement;
2805 msize = ntohs(message->size);
2806 if (msize < sizeof (struct GetMessage))
2808 GNUNET_break_op (0);
2809 return GNUNET_SYSERR;
2811 gm = (const struct GetMessage*) message;
2812 bm = ntohl (gm->hash_bitmap);
2820 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2822 GNUNET_break_op (0);
2823 return GNUNET_SYSERR;
2825 opt = (const GNUNET_HashCode*) &gm[1];
2826 bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2827 pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2830 pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2833 pgc->bf_size = bfsize;
2835 pgc->type = ntohl (gm->type);
2836 pgc->bm = ntohl (gm->hash_bitmap);
2837 pgc->mingle = gm->filter_mutator;
2839 if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2840 pgc->reply_to.hashPubKey = opt[bits++];
2842 pgc->reply_to = *other;
2843 if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2844 pgc->namespace = opt[bits++];
2845 else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2847 GNUNET_break_op (0);
2849 return GNUNET_SYSERR;
2851 if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2852 pgc->prime_target.hashPubKey = opt[bits++];
2853 /* note that we can really only check load here since otherwise
2854 peers could find out that we are overloaded by being disconnected
2855 after sending us a malformed query... */
2856 if (GNUNET_YES == test_load_too_high ())
2858 if (NULL != pgc->bf)
2859 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2862 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2863 "Dropping query from `%s', this peer is too busy.\n",
2864 GNUNET_i2s (other));
2868 net_load_up = 50; // FIXME
2869 net_load_down = 50; // FIXME
2870 pgc->policy = ROUTING_POLICY_NONE;
2871 if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2872 (net_load_down < IDLE_LOAD_THRESHOLD) )
2874 pgc->policy |= ROUTING_POLICY_ALL;
2875 pgc->priority = 0; /* no charge */
2879 pgc->priority = bound_priority (ntohl (gm->priority), other);
2881 IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2883 IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2885 pgc->policy |= ROUTING_POLICY_ALL;
2889 // FIXME: is this sound?
2890 if (net_load_up < 90 + 10 * pgc->priority)
2891 pgc->policy |= ROUTING_POLICY_FORWARD;
2892 if (net_load_down < 90 + 10 * pgc->priority)
2893 pgc->policy |= ROUTING_POLICY_ANSWER;
2896 if (pgc->policy == ROUTING_POLICY_NONE)
2899 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2900 "Dropping query from `%s', network saturated.\n",
2901 GNUNET_i2s (other));
2903 if (NULL != pgc->bf)
2904 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2906 return GNUNET_OK; /* drop */
2908 if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2909 pgc->priority = 0; /* kill the priority (we cannot benefit) */
2910 pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2911 /* decrement ttl (always) */
2912 ttl_decrement = 2 * TTL_DECREMENT +
2913 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2915 if ( (pgc->ttl < 0) &&
2916 (pgc->ttl - ttl_decrement > 0) )
2919 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2920 "Dropping query from `%s' due to TTL underflow.\n",
2921 GNUNET_i2s (other));
2923 /* integer underflow => drop (should be very rare)! */
2924 if (NULL != pgc->bf)
2925 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2929 pgc->ttl -= ttl_decrement;
2930 pgc->start_time = GNUNET_TIME_absolute_get ();
2931 preference = (double) pgc->priority;
2932 if (preference < QUERY_BANDWIDTH_VALUE)
2933 preference = QUERY_BANDWIDTH_VALUE;
2934 // FIXME: also reserve bandwidth for reply?
2935 GNUNET_CORE_peer_configure (core,
2937 GNUNET_TIME_UNIT_FOREVER_REL,
2938 0, 0, preference, NULL, NULL);
2939 if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2940 pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2944 GNUNET_SCHEDULER_add_continuation (sched,
2946 &forward_get_request,
2948 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2954 * Function called to notify us that we can now transmit a reply to a
2955 * client or peer. "buf" will be NULL and "size" zero if the socket was
2956 * closed for writing in the meantime.
2958 * @param cls closure, points to a "struct PendingRequest*" with
2959 * one or more pending replies
2960 * @param size number of bytes available in buf
2961 * @param buf where the callee should write the message
2962 * @return number of bytes written to buf
2965 transmit_result (void *cls,
2969 struct PendingRequest *pr = cls;
2971 struct PendingReply *reply;
2975 while (NULL != (reply = pr->replies_pending))
2977 if ( (reply->msize + ret < ret) ||
2978 (reply->msize + ret > size) )
2980 pr->replies_pending = reply->next;
2981 memcpy (&cbuf[ret], &reply[1], reply->msize);
2982 ret += reply->msize;
2983 GNUNET_free (reply);
2990 * Iterator over pending requests.
2992 * @param cls response (struct ProcessReplyClosure)
2993 * @param key our query
2994 * @param value value in the hash map (meta-info about the query)
2995 * @return GNUNET_YES (we should continue to iterate)
2998 process_reply (void *cls,
2999 const GNUNET_HashCode * key,
3002 struct ProcessReplyClosure *prq = cls;
3003 struct PendingRequest *pr = value;
3004 struct PendingRequest *eer;
3005 struct PendingReply *reply;
3006 struct PutMessage *pm;
3007 struct ContentMessage *cm;
3008 GNUNET_HashCode chash;
3009 GNUNET_HashCode mhash;
3010 struct GNUNET_PeerIdentity target;
3013 struct GNUNET_TIME_Relative max_delay;
3015 GNUNET_CRYPTO_hash (prq->data,
3020 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3021 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3023 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3024 /* FIXME: does prq->namespace match our expectations? */
3025 /* then: fall-through??? */
3026 case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3029 mingle_hash (&chash, pr->mingle, &mhash);
3030 if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
3032 return GNUNET_YES; /* duplicate */
3033 GNUNET_CONTAINER_bloomfilter_add (pr->bf,
3037 case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3038 // FIXME: any checks against duplicates for SKBlocks?
3041 prio = pr->priority;
3042 prq->priority += pr->remaining_priority;
3043 pr->remaining_priority = 0;
3044 if (pr->client != NULL)
3046 if (pr->replies_seen_size == pr->replies_seen_off)
3047 GNUNET_array_grow (pr->replies_seen,
3048 pr->replies_seen_size,
3049 pr->replies_seen_size * 2 + 4);
3050 pr->replies_seen[pr->replies_seen_off++] = chash;
3051 // FIXME: possibly recalculate BF!
3053 if (pr->client == NULL)
3055 msize = sizeof (struct ContentMessage) + prq->size;
3056 reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3057 reply->msize = msize;
3058 cm = (struct ContentMessage*) &reply[1];
3059 cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
3060 cm->header.size = htons (msize);
3061 cm->type = htonl (prq->type);
3062 cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3063 reply->next = pr->replies_pending;
3064 pr->replies_pending = reply;
3065 memcpy (&reply[1], prq->data, prq->size);
3066 if (pr->cth != NULL)
3068 max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
3069 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
3071 /* estimate expiration time from time difference between
3072 first request that will be discarded and this request */
3073 eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
3074 max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
3077 GNUNET_PEER_resolve (pr->source_pid,
3079 pr->cth = GNUNET_CORE_notify_transmit_ready (core,
3086 if (NULL == pr->cth)
3088 // FIXME: now what? discard?
3093 msize = sizeof (struct PutMessage) + prq->size;
3094 reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3095 reply->msize = msize;
3096 reply->next = pr->replies_pending;
3097 pm = (struct PutMessage*) &reply[1];
3098 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3099 pm->header.size = htons (msize);
3100 pm->type = htonl (prq->type);
3101 pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
3102 pr->replies_pending = reply;
3103 memcpy (&reply[1], prq->data, prq->size);
3106 pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
3108 GNUNET_TIME_UNIT_FOREVER_REL,
3113 // FIXME: need to try again later (not much
3114 // to do here specifically, but we need to
3115 // check somewhere else to handle this case!)
3118 // FIXME: implement hot-path routing statistics keeping!
3124 * Check if the given KBlock is well-formed.
3126 * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
3127 * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
3128 * @param query where to store the query that this block answers
3129 * @return GNUNET_OK if this is actually a well-formed KBlock
3132 check_kblock (const struct KBlock *kb,
3134 GNUNET_HashCode *query)
3136 if (dsize < sizeof (struct KBlock))
3138 GNUNET_break_op (0);
3139 return GNUNET_SYSERR;
3141 if (dsize - sizeof (struct KBlock) !=
3142 ntohs (kb->purpose.size)
3143 - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose)
3144 - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) )
3146 GNUNET_break_op (0);
3147 return GNUNET_SYSERR;
3150 GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
3155 GNUNET_break_op (0);
3156 return GNUNET_SYSERR;
3159 GNUNET_CRYPTO_hash (&kb->keyspace,
3160 sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3167 * Check if the given SBlock is well-formed.
3169 * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
3170 * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
3171 * @param query where to store the query that this block answers
3172 * @param namespace where to store the namespace that this block belongs to
3173 * @return GNUNET_OK if this is actually a well-formed SBlock
3176 check_sblock (const struct SBlock *sb,
3178 GNUNET_HashCode *query,
3179 GNUNET_HashCode *namespace)
3181 if (dsize < sizeof (struct SBlock))
3183 GNUNET_break_op (0);
3184 return GNUNET_SYSERR;
3187 ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
3189 GNUNET_break_op (0);
3190 return GNUNET_SYSERR;
3193 GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
3198 GNUNET_break_op (0);
3199 return GNUNET_SYSERR;
3202 *query = sb->identifier;
3203 if (namespace != NULL)
3204 GNUNET_CRYPTO_hash (&sb->subspace,
3205 sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3212 * Handle P2P "PUT" request.
3214 * @param cls closure, always NULL
3215 * @param other the other peer involved (sender or receiver, NULL
3216 * for loopback messages where we are both sender and receiver)
3217 * @param message the actual message
3218 * @return GNUNET_OK to keep the connection open,
3219 * GNUNET_SYSERR to close it (signal serious error)
3222 handle_p2p_put (void *cls,
3223 const struct GNUNET_PeerIdentity *other,
3224 const struct GNUNET_MessageHeader *message)
3226 const struct PutMessage *put;
3230 struct GNUNET_TIME_Absolute expiration;
3231 GNUNET_HashCode query;
3232 struct ProcessReplyClosure prq;
3234 msize = ntohs (message->size);
3235 if (msize < sizeof (struct PutMessage))
3238 return GNUNET_SYSERR;
3240 put = (const struct PutMessage*) message;
3241 dsize = msize - sizeof (struct PutMessage);
3242 type = ntohl (put->type);
3243 expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
3245 /* first, validate! */
3248 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3249 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3250 GNUNET_CRYPTO_hash (&put[1], dsize, &query);
3252 case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3254 check_kblock ((const struct KBlock*) &put[1],
3257 return GNUNET_SYSERR;
3259 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3261 check_sblock ((const struct SBlock*) &put[1],
3265 return GNUNET_SYSERR;
3267 case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3268 // FIXME -- validate SKBLOCK!
3272 /* unknown block type */
3273 GNUNET_break_op (0);
3274 return GNUNET_SYSERR;
3277 /* now, lookup 'query' */
3278 prq.data = (const void*) &put[1];
3281 prq.expiration = expiration;
3283 GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
3287 // FIXME: if migration is on and load is low,
3288 // queue to store data in datastore;
3289 // use "prq.priority" for that!
3295 * List of handlers for P2P messages
3296 * that we care about.
3298 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3301 GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3303 GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3309 * Task that will try to initiate a connection with the
3316 core_connect_task (void *cls,
3317 const struct GNUNET_SCHEDULER_TaskContext *tc);
3321 * Function called by the core after we've
3324 * @param cls closure, unused
3325 * @param server handle to the core service
3326 * @param my_identity our peer identity (unused)
3327 * @param publicKey our public key (unused)
3330 core_start_cb (void *cls,
3331 struct GNUNET_CORE_Handle * server,
3332 const struct GNUNET_PeerIdentity *
3335 GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
3340 GNUNET_SCHEDULER_add_delayed (sched,
3342 GNUNET_SCHEDULER_PRIORITY_HIGH,
3343 GNUNET_SCHEDULER_NO_TASK,
3344 GNUNET_TIME_UNIT_SECONDS,
3354 * Task that will try to initiate a connection with the
3361 core_connect_task (void *cls,
3362 const struct GNUNET_SCHEDULER_TaskContext *tc)
3364 GNUNET_CORE_connect (sched,
3366 GNUNET_TIME_UNIT_FOREVER_REL,
3369 &peer_connect_handler,
3370 &peer_disconnect_handler,
3379 * Process fs requests.
3381 * @param cls closure
3382 * @param s scheduler to use
3383 * @param server the initialized server
3384 * @param c configuration to use
3388 struct GNUNET_SCHEDULER_Handle *s,
3389 struct GNUNET_SERVER_Handle *server,
3390 const struct GNUNET_CONFIGURATION_Handle *c)
3395 ifm = GNUNET_CONTAINER_multihashmap_create (128);
3396 requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3397 requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3398 connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
3399 requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3401 dsh = GNUNET_DATASTORE_connect (cfg,
3405 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3406 _("Failed to connect to datastore service.\n"));
3409 GNUNET_SERVER_disconnect_notify (server,
3410 &handle_client_disconnect,
3412 GNUNET_SERVER_add_handlers (server, handlers);
3413 core_connect_task (NULL, NULL);
3414 GNUNET_SCHEDULER_add_delayed (sched,
3416 GNUNET_SCHEDULER_PRIORITY_IDLE,
3417 GNUNET_SCHEDULER_NO_TASK,
3418 GNUNET_TIME_UNIT_FOREVER_REL,
3425 * The main function for the fs service.
3427 * @param argc number of arguments from the command line
3428 * @param argv command line arguments
3429 * @return 0 ok, 1 on error
3432 main (int argc, char *const *argv)
3434 return (GNUNET_OK ==
3435 GNUNET_SERVICE_run (argc,
3437 "fs", &run, NULL, NULL, NULL)) ? 0 : 1;
3440 /* end of gnunet-service-fs.c */