2 This file is part of GNUnet.
3 (C) 2009 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs.c
23 * @brief program that provides the file-sharing service
24 * @author Christian Grothoff
27 * - fix gazillion of minor FIXME's
28 * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
29 * core to transmit to another peer; need to make sure this is bounded overall...
30 * - randomly delay processing for improved anonymity (can wait)
31 * - content migration (put in local DS) (can wait)
32 * - handle some special cases when forwarding replies based on tracked requests (can wait)
33 * - tracking of success correlations for hot-path routing (can wait)
34 * - various load-based actions (can wait)
35 * - validation of KSBLOCKS (can wait)
36 * - remove on-demand blocks if they keep failing (can wait)
37 * - check that we decrement PIDs always where necessary (can wait)
38 * - find out how to use core-pulling instead of pushing (at least for some cases)
42 #include "gnunet_core_service.h"
43 #include "gnunet_datastore_service.h"
44 #include "gnunet_peer_lib.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_util_lib.h"
48 #include "gnunet-service-fs_indexing.h"
51 #define DEBUG_FS GNUNET_NO
56 * Signature of a function that is called whenever a datastore
57 * request can be processed (or an entry put on the queue times out).
60 * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
62 typedef void (*RequestFunction)(void *cls,
67 * Doubly-linked list of our requests for the datastore.
69 struct DatastoreRequestQueue
73 * This is a doubly-linked list.
75 struct DatastoreRequestQueue *next;
78 * This is a doubly-linked list.
80 struct DatastoreRequestQueue *prev;
83 * Function to call (will issue the request).
93 * When should this request time-out because we don't care anymore?
95 struct GNUNET_TIME_Absolute timeout;
98 * ID of task used for signaling timeout.
100 GNUNET_SCHEDULER_TaskIdentifier task;
106 * Closure for processing START_SEARCH messages from a client.
108 struct LocalGetContext
112 * This is a doubly-linked list.
114 struct LocalGetContext *next;
117 * This is a doubly-linked list.
119 struct LocalGetContext *prev;
122 * Client that initiated the search.
124 struct GNUNET_SERVER_Client *client;
127 * Array of results that we've already received
130 GNUNET_HashCode *results;
133 * Bloomfilter over all results (for fast query construction);
134 * NULL if we don't have any results.
136 * FIXME: this member is not used, is that OK? If so, it should
139 struct GNUNET_CONTAINER_BloomFilter *results_bf;
142 * DS request associated with this operation.
144 struct DatastoreRequestQueue *req;
147 * Current result message to transmit to client (or NULL).
149 struct ContentMessage *result;
152 * Type of the content that we're looking for.
158 * Desired anonymity level.
160 uint32_t anonymity_level;
163 * Number of results actually stored in the results array.
165 unsigned int results_used;
168 * Size of the results array in memory.
170 unsigned int results_size;
173 * Size (in bytes) of the 'results_bf' bloomfilter.
175 * FIXME: this member is not used, is that OK? If so, it should
178 size_t results_bf_size;
181 * If the request is for a DBLOCK or IBLOCK, this is the identity of
182 * the peer that is known to have a response. Set to all-zeros if
183 * such a target is not known (note that even if OUR anonymity
184 * level is >0 we may happen to know the responder's identity;
185 * nevertheless, we should probably not use it for a DHT-lookup
186 * or similar blunt actions in order to avoid exposing ourselves).
188 struct GNUNET_PeerIdentity target;
191 * If the request is for an SBLOCK, this is the identity of the
192 * pseudonym to which the SBLOCK belongs.
194 GNUNET_HashCode namespace;
197 * Hash of the keyword (aka query) for KBLOCKs; Hash of
198 * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
199 * and hash of the identifier XORed with the target for
200 * SBLOCKS (aka query).
202 GNUNET_HashCode query;
208 * Possible routing policies for an FS-GET request.
213 * Simply drop the request.
215 ROUTING_POLICY_NONE = 0,
218 * Answer it if we can from local datastore.
220 ROUTING_POLICY_ANSWER = 1,
223 * Forward the request to other peers (if possible).
225 ROUTING_POLICY_FORWARD = 2,
228 * Forward to other peers, and ask them to route
229 * the response via ourselves.
231 ROUTING_POLICY_INDIRECT = 6,
234 * Do everything we could possibly do (that would
237 ROUTING_POLICY_ALL = 7
242 * Internal context we use for our initial processing
245 struct ProcessGetContext
248 * The search query (used for datastore lookup).
250 GNUNET_HashCode query;
253 * Which peer we should forward the response to.
255 struct GNUNET_PeerIdentity reply_to;
258 * Namespace for the result (only set for SKS requests)
260 GNUNET_HashCode namespace;
263 * Peer that we should forward the query to if possible
264 * (since that peer likely has the content).
266 struct GNUNET_PeerIdentity prime_target;
269 * When did we receive this request?
271 struct GNUNET_TIME_Absolute start_time;
274 * Our entry in the DRQ (non-NULL while we wait for our
275 * turn to interact with the local database).
277 struct DatastoreRequestQueue *drq;
280 * Filter used to eliminate duplicate results. Can be NULL if we
281 * are not yet filtering any results.
283 struct GNUNET_CONTAINER_BloomFilter *bf;
286 * Bitmap describing which of the optional
287 * hash codes / peer identities were given to us.
292 * Desired block type.
297 * Priority of the request.
302 * Size of the 'bf' (in bytes).
307 * In what ways are we going to process
310 enum RoutingPolicy policy;
313 * Time-to-live for the request (value
319 * Number to mingle hashes for bloom-filter
325 * Number of results that were found so far.
327 unsigned int results_found;
332 * Information we keep for each pending reply. The
333 * actual message follows at the end of this struct.
335 struct PendingMessage
338 * This is a linked list.
340 struct PendingMessage *next;
343 * Size of the reply; actual reply message follows
344 * at the end of this struct.
349 * How important is this message for us?
357 * All requests from a client are kept in a doubly-linked list.
359 struct ClientRequestList;
363 * Information we keep for each pending request. We should try to
364 * keep this struct as small as possible since its memory consumption
365 * is key to how many requests we can have pending at once.
367 struct PendingRequest
371 * ID of a client making a request, NULL if this entry is for a
374 struct GNUNET_SERVER_Client *client;
377 * If this request was made by a client,
378 * this is our entry in the client request
379 * list; otherwise NULL.
381 struct ClientRequestList *crl_entry;
384 * If this is a namespace query, pointer to the hash of the public
385 * key of the namespace; otherwise NULL.
387 GNUNET_HashCode *namespace;
390 * Bloomfilter we use to filter out replies that we don't care about
391 * (anymore). NULL as long as we are interested in all replies.
393 struct GNUNET_CONTAINER_BloomFilter *bf;
396 * Context of our GNUNET_CORE_peer_change_preference call.
398 struct GNUNET_CORE_InformationRequestContext *irc;
401 * Replies that we have received but were unable to forward yet
402 * (typically non-null only if we have a pending transmission
403 * request with the client or the respective peer).
405 struct PendingMessage *replies_pending;
408 * Pending transmission request with the core service for the target
409 * peer (for processing of 'replies_pending') or Handle for a
410 * pending query-request for P2P-transmission with the core service.
411 * If non-NULL, this request must be cancelled should this struct be
414 struct GNUNET_CORE_TransmitHandle *cth;
417 * Pending transmission request for the target client (for processing of
418 * 'replies_pending').
420 struct GNUNET_CONNECTION_TransmitHandle *th;
423 * Hash code of all replies that we have seen so far (only valid
424 * if client is not NULL since we only track replies like this for
427 GNUNET_HashCode *replies_seen;
430 * Node in the heap representing this entry.
432 struct GNUNET_CONTAINER_HeapNode *hnode;
435 * When did we first see this request (form this peer), or, if our
436 * client is initiating, when did we last initiate a search?
438 struct GNUNET_TIME_Absolute start_time;
441 * The query that this request is for.
443 GNUNET_HashCode query;
446 * The task responsible for transmitting queries
449 GNUNET_SCHEDULER_TaskIdentifier task;
452 * (Interned) Peer identifier (only valid if "client" is NULL)
453 * that identifies a peer that gave us this request.
455 GNUNET_PEER_Id source_pid;
458 * (Interned) Peer identifier that identifies a preferred target
461 GNUNET_PEER_Id target_pid;
464 * (Interned) Peer identifiers of peers that have already
465 * received our query for this content.
467 GNUNET_PEER_Id *used_pids;
470 * Size of the 'bf' (in bytes).
475 * Desired anonymity level; only valid for requests from a local client.
477 uint32_t anonymity_level;
480 * How many entries in "used_pids" are actually valid?
482 unsigned int used_pids_off;
485 * How long is the "used_pids" array?
487 unsigned int used_pids_size;
490 * How many entries in "replies_seen" are actually valid?
492 unsigned int replies_seen_off;
495 * How long is the "replies_seen" array?
497 unsigned int replies_seen_size;
500 * Priority with which this request was made. If one of our clients
501 * made the request, then this is the current priority that we are
502 * using when initiating the request. This value is used when
503 * we decide to reward other peers with trust for providing a reply.
508 * Priority points left for us to spend when forwarding this request
511 uint32_t remaining_priority;
514 * Number to mingle hashes for bloom-filter
520 * TTL with which we saw this request (or, if we initiated, TTL that
521 * we used for the request).
526 * Type of the content that this request is for.
534 * All requests from a client are kept in a doubly-linked list.
536 struct ClientRequestList
539 * This is a doubly-linked list.
541 struct ClientRequestList *next;
544 * This is a doubly-linked list.
546 struct ClientRequestList *prev;
549 * A request from this client.
551 struct PendingRequest *req;
554 * Client list with the head and tail of this DLL.
556 struct ClientList *cl;
561 * Linked list of all clients that we are
562 * currently processing requests for.
568 * This is a linked list.
570 struct ClientList *next;
573 * What client is this entry for?
575 struct GNUNET_SERVER_Client* client;
578 * Head of the DLL of requests from this client.
580 struct ClientRequestList *head;
583 * Tail of the DLL of requests from this client.
585 struct ClientRequestList *tail;
591 * Closure for "process_reply" function.
593 struct ProcessReplyClosure
596 * The data for the reply.
601 * When the reply expires.
603 struct GNUNET_TIME_Absolute expiration;
611 * Namespace that this reply belongs to
612 * (if it is of type SBLOCK).
614 GNUNET_HashCode namespace;
622 * How much was this reply worth to us?
629 * Information about a peer that we are connected to.
630 * We track data that is useful for determining which
631 * peers should receive our requests.
637 * List of the last clients for which this peer
638 * successfully answered a query.
640 struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
643 * List of the last PIDs for which
644 * this peer successfully answered a query;
645 * We use 0 to indicate no successful reply.
647 GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
650 * Average delay between sending the peer a request and
651 * getting a reply (only calculated over the requests for
652 * which we actually got a reply). Calculated
653 * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
655 struct GNUNET_TIME_Relative avg_delay;
658 * Handle for an active request for transmission to this
661 struct GNUNET_CORE_PeerRequestHandle *prh;
664 * Messages (replies, queries, content migration) we would like to
665 * send to this peer in the near future. Sorted by priority.
667 struct PendingMessage *pending_messages;
670 * Average priority of successful replies. Calculated
671 * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
676 * The peer's identity.
681 * Number of requests we have currently pending with this peer (that
682 * is, requests that were transmitted so recently that we would not
683 * retransmit them right now).
685 unsigned int pending_requests;
688 * Which offset in "last_p2p_replies" will be updated next?
689 * (we go round-robin).
691 unsigned int last_p2p_replies_woff;
694 * Which offset in "last_client_replies" will be updated next?
695 * (we go round-robin).
697 unsigned int last_client_replies_woff;
703 * Our connection to the datastore.
705 static struct GNUNET_DATASTORE_Handle *dsh;
710 static struct GNUNET_SCHEDULER_Handle *sched;
715 const struct GNUNET_CONFIGURATION_Handle *cfg;
718 * Handle to the core service (NULL until we've connected to it).
720 struct GNUNET_CORE_Handle *core;
723 * Head of doubly-linked LGC list.
725 static struct LocalGetContext *lgc_head;
728 * Tail of doubly-linked LGC list.
730 static struct LocalGetContext *lgc_tail;
733 * Head of request queue for the datastore, sorted by timeout.
735 static struct DatastoreRequestQueue *drq_head;
738 * Tail of request queue for the datastore.
740 static struct DatastoreRequestQueue *drq_tail;
743 * Map of query hash codes to requests.
745 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
748 * Map of peer IDs to requests (for those requests coming
751 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
754 * Linked list of all of our clients and their requests.
756 static struct ClientList *clients;
759 * Heap with the request that will expire next at the top. Contains
760 * pointers of type "struct PendingRequest*"; these will *also* be
761 * aliased from the "requests_by_peer" data structures and the
762 * "requests_by_query" table. Note that requests from our clients
763 * don't expire and are thus NOT in the "requests_by_expiration"
764 * (or the "requests_by_peer" tables).
766 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
769 * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
771 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
774 * Maximum number of requests (from other peers) that we're
775 * willing to have pending at any given point in time.
776 * FIXME: set from configuration (and 32 is a tiny value for testing only).
778 static uint64_t max_pending_requests = 32;
786 * Run the next DS request in our
787 * queue, we're done with the current one.
792 struct DatastoreRequestQueue *e;
794 while (NULL != (e = drq_head))
796 if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
798 if (e->task != GNUNET_SCHEDULER_NO_TASK)
799 GNUNET_SCHEDULER_cancel (sched, e->task);
800 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
801 e->req (e->req_cls, GNUNET_NO);
806 if (e->task != GNUNET_SCHEDULER_NO_TASK)
807 GNUNET_SCHEDULER_cancel (sched, e->task);
808 e->task = GNUNET_SCHEDULER_NO_TASK;
809 e->req (e->req_cls, GNUNET_YES);
810 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
816 * A datastore request had to be timed out.
818 * @param cls closure (of type "struct DatastoreRequestQueue*")
819 * @param tc task context, unused
822 timeout_ds_request (void *cls,
823 const struct GNUNET_SCHEDULER_TaskContext *tc)
825 struct DatastoreRequestQueue *e = cls;
827 e->task = GNUNET_SCHEDULER_NO_TASK;
828 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
829 e->req (e->req_cls, GNUNET_NO);
835 * Queue a request for the datastore.
837 * @param deadline by when the request should run
838 * @param fun function to call once the request can be run
839 * @param fun_cls closure for fun
841 static struct DatastoreRequestQueue *
842 queue_ds_request (struct GNUNET_TIME_Relative deadline,
846 struct DatastoreRequestQueue *e;
847 struct DatastoreRequestQueue *bef;
849 if (drq_head == NULL)
851 /* no other requests pending, run immediately */
852 fun (fun_cls, GNUNET_OK);
855 e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
856 e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
858 e->req_cls = fun_cls;
859 if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
861 /* local request, highest prio, put at head of queue
862 regardless of deadline */
868 while ( (NULL != bef) &&
869 (e->timeout.value < bef->timeout.value) )
872 GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
873 if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
875 e->task = GNUNET_SCHEDULER_add_delayed (sched,
884 * Free the state associated with a local get context.
886 * @param lgc the lgc to free
889 local_get_context_free (struct LocalGetContext *lgc)
891 GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
892 GNUNET_SERVER_client_drop (lgc->client);
893 GNUNET_free_non_null (lgc->results);
894 if (lgc->results_bf != NULL)
895 GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
896 if (lgc->req != NULL)
898 if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
899 GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
900 GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
901 GNUNET_free (lgc->req);
908 * We're able to transmit the next (local) result to the client.
909 * Do it and ask the datastore for more. Or, on error, tell
910 * the datastore to stop giving us more.
912 * @param cls our closure (struct LocalGetContext)
913 * @param max maximum number of bytes we can transmit
914 * @param buf where to copy our message
915 * @return number of bytes copied to buf
918 transmit_local_result (void *cls,
922 struct LocalGetContext *lgc = cls;
928 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
929 "Failed to transmit result to local client, aborting datastore iteration.\n");
932 GNUNET_free (lgc->result);
934 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
937 msize = ntohs (lgc->result->header.size);
939 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
940 "Transmitting %u bytes of result to local client.\n",
943 GNUNET_assert (max >= msize);
944 memcpy (buf, lgc->result, msize);
945 GNUNET_free (lgc->result);
947 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
953 * Mingle hash with the mingle_number to
954 * produce different bits.
957 mingle_hash (const GNUNET_HashCode * in,
958 int32_t mingle_number,
959 GNUNET_HashCode * hc)
963 GNUNET_CRYPTO_hash (&mingle_number,
966 GNUNET_CRYPTO_hash_xor (&m, in, hc);
971 * How many bytes should a bloomfilter be if we have already seen
972 * entry_count responses? Note that BLOOMFILTER_K gives us the number
973 * of bits set per entry. Furthermore, we should not re-size the
974 * filter too often (to keep it cheap).
976 * Since other peers will also add entries but not resize the filter,
977 * we should generally pick a slightly larger size than what the
978 * strict math would suggest.
980 * @return must be a power of two and smaller or equal to 2^15.
983 compute_bloomfilter_size (unsigned int entry_count)
986 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
987 uint16_t max = 1 << 15;
989 if (entry_count > max)
992 while ((size < max) && (size < ideal))
1001 * Recalculate our bloom filter for filtering replies.
1003 * @param count number of entries we are filtering right now
1004 * @param mingle set to our new mingling value
1005 * @param bf_size set to the size of the bloomfilter
1006 * @param entries the entries to filter
1007 * @return updated bloomfilter, NULL for none
1009 static struct GNUNET_CONTAINER_BloomFilter *
1010 refresh_bloomfilter (unsigned int count,
1013 const GNUNET_HashCode *entries)
1015 struct GNUNET_CONTAINER_BloomFilter *bf;
1018 GNUNET_HashCode mhash;
1022 nsize = compute_bloomfilter_size (count);
1023 *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1025 bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
1028 for (i=0;i<count;i++)
1030 mingle_hash (&entries[i], *mingle, &mhash);
1031 GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1038 * Closure used for "target_peer_select_cb".
1040 struct PeerSelectionContext
1043 * The request for which we are selecting
1046 struct PendingRequest *pr;
1049 * Current "prime" target.
1051 struct GNUNET_PeerIdentity target;
1054 * How much do we like this target?
1056 double target_score;
1062 * Function called for each connected peer to determine
1063 * which one(s) would make good targets for forwarding.
1065 * @param cls closure (struct PeerSelectionContext)
1066 * @param key current key code (peer identity)
1067 * @param value value in the hash map (struct ConnectedPeer)
1068 * @return GNUNET_YES if we should continue to
1073 target_peer_select_cb (void *cls,
1074 const GNUNET_HashCode * key,
1077 struct PeerSelectionContext *psc = cls;
1078 struct ConnectedPeer *cp = value;
1079 struct PendingRequest *pr = psc->pr;
1083 /* 1) check if we have already (recently) forwarded to this peer */
1084 for (i=0;i<pr->used_pids_off;i++)
1085 if (pr->used_pids[i] == cp->pid)
1086 return GNUNET_YES; /* skip */
1087 // 2) calculate how much we'd like to forward to this peer
1088 score = 0; // FIXME!
1090 /* store best-fit in closure */
1091 if (score > psc->target_score)
1093 psc->target_score = score;
1094 psc->target.hashPubKey = *key;
1101 * We use a random delay to make the timing of requests
1102 * less predictable. This function returns such a random
1105 * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1107 static struct GNUNET_TIME_Relative
1108 get_processing_delay ()
1110 return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1111 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1117 * Task that is run for each request with the
1118 * goal of forwarding the associated query to
1119 * other peers. The task should re-schedule
1120 * itself to be re-run once the TTL has expired.
1121 * (or at a later time if more peers should
1122 * be queried earlier).
1124 * @param cls the requests "struct PendingRequest*"
1125 * @param tc task context (unused)
1128 forward_request_task (void *cls,
1129 const struct GNUNET_SCHEDULER_TaskContext *tc);
1133 * We've selected a peer for forwarding of a query.
1134 * Construct the message and then re-schedule the
1135 * task to forward again to (other) peers.
1137 * @param cls closure
1138 * @param size number of bytes available in buf
1139 * @param buf where the callee should write the message
1140 * @return number of bytes written to buf
1143 transmit_request_cb (void *cls,
1147 struct PendingRequest *pr = cls;
1148 struct GetMessage *gm;
1149 GNUNET_HashCode *ext;
1155 /* (1) check for timeout */
1158 /* timeout, try another peer immediately again */
1159 pr->task = GNUNET_SCHEDULER_add_with_priority (sched,
1160 GNUNET_SCHEDULER_PRIORITY_IDLE,
1161 &forward_request_task,
1165 /* (2) build query message */
1166 k = 0; // FIXME: count hash codes!
1167 msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1168 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1169 gm = (struct GetMessage*) buf;
1170 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1171 gm->header.size = htons (msize);
1172 gm->type = htonl (pr->type);
1173 pr->remaining_priority /= 2;
1174 gm->priority = htonl (pr->remaining_priority);
1175 gm->ttl = htonl (pr->ttl);
1176 gm->filter_mutator = htonl(pr->mingle);
1177 gm->hash_bitmap = htonl (42);
1178 gm->query = pr->query;
1179 ext = (GNUNET_HashCode*) &gm[1];
1180 // FIXME: setup "ext[0]..[k-1]"
1181 bfdata = (char *) &ext[k];
1183 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1187 /* (3) schedule job to do it again (or another peer, etc.) */
1188 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1189 get_processing_delay (), // FIXME!
1190 &forward_request_task,
1198 * Function called after we've tried to reserve
1199 * a certain amount of bandwidth for a reply.
1200 * Check if we succeeded and if so send our query.
1202 * @param cls the requests "struct PendingRequest*"
1203 * @param peer identifies the peer
1204 * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1205 * @param bpm_out set to the current bandwidth limit (sending) for this peer
1206 * @param amount set to the amount that was actually reserved or unreserved
1207 * @param preference current traffic preference for the given peer
1210 target_reservation_cb (void *cls,
1212 GNUNET_PeerIdentity * peer,
1213 unsigned int bpm_in,
1214 unsigned int bpm_out,
1216 uint64_t preference)
1218 struct PendingRequest *pr = cls;
1221 struct GNUNET_TIME_Relative maxdelay;
1224 GNUNET_assert (peer != NULL);
1225 if ( (amount != DBLOCK_SIZE) ||
1228 /* try again later; FIXME: we may need to un-reserve "amount"? */
1229 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1230 get_processing_delay (), // FIXME: longer?
1231 &forward_request_task,
1235 // (2) transmit, update ttl/priority
1236 // FIXME: calculate priority, maxdelay, size properly!
1239 maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
1240 pr->cth = GNUNET_CORE_notify_transmit_ready (core,
1245 &transmit_request_cb,
1247 if (pr->cth == NULL)
1249 /* try again later */
1250 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1251 get_processing_delay (), // FIXME: longer?
1252 &forward_request_task,
1259 * Task that is run for each request with the
1260 * goal of forwarding the associated query to
1261 * other peers. The task should re-schedule
1262 * itself to be re-run once the TTL has expired.
1263 * (or at a later time if more peers should
1264 * be queried earlier).
1266 * @param cls the requests "struct PendingRequest*"
1267 * @param tc task context (unused)
1270 forward_request_task (void *cls,
1271 const struct GNUNET_SCHEDULER_TaskContext *tc)
1273 struct PendingRequest *pr = cls;
1274 struct PeerSelectionContext psc;
1276 pr->task = GNUNET_SCHEDULER_NO_TASK;
1277 if (pr->cth != NULL)
1279 /* we're busy transmitting a result, wait a bit */
1280 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1281 get_processing_delay (),
1282 &forward_request_task,
1286 /* (1) select target */
1288 psc.target_score = DBL_MIN;
1289 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1290 &target_peer_select_cb,
1292 if (psc.target_score == DBL_MIN)
1294 /* no possible target found, wait some time */
1295 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1296 get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
1297 &forward_request_task,
1301 /* (2) reserve reply bandwidth */
1302 GNUNET_assert (NULL == pr->irc);
1303 pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
1305 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1307 DBLOCK_SIZE, // FIXME: make dependent on type?
1309 &target_reservation_cb,
1315 * We're processing (local) results for a search request
1316 * from a (local) client. Pass applicable results to the
1317 * client and if we are done either clean up (operation
1318 * complete) or switch to P2P search (more results possible).
1320 * @param cls our closure (struct LocalGetContext)
1321 * @param key key for the content
1322 * @param size number of bytes in data
1323 * @param data content stored
1324 * @param type type of the content
1325 * @param priority priority of the content
1326 * @param anonymity anonymity-level for the content
1327 * @param expiration expiration time for the content
1328 * @param uid unique identifier for the datum;
1329 * maybe 0 if no unique identifier is available
1332 process_local_get_result (void *cls,
1333 const GNUNET_HashCode * key,
1339 struct GNUNET_TIME_Absolute
1343 struct LocalGetContext *lgc = cls;
1344 struct PendingRequest *pr;
1345 struct ClientRequestList *crl;
1346 struct ClientList *cl;
1353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1354 "Received last result for `%s' from local datastore, deciding what to do next.\n",
1355 GNUNET_h2s (&lgc->query));
1357 /* no further results from datastore; continue
1358 processing further requests from the client and
1359 allow the next task to use the datastore; also,
1360 switch to P2P requests or clean up our state. */
1362 GNUNET_SERVER_receive_done (lgc->client,
1364 if ( (lgc->results_used == 0) ||
1365 (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1366 (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1367 (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1370 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1371 "Forwarding query for `%s' to network.\n",
1372 GNUNET_h2s (&lgc->query));
1375 while ( (NULL != cl) &&
1376 (cl->client != lgc->client) )
1380 cl = GNUNET_malloc (sizeof (struct ClientList));
1381 cl->client = lgc->client;
1385 crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1387 GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1388 pr = GNUNET_malloc (sizeof (struct PendingRequest));
1389 pr->client = lgc->client;
1390 GNUNET_SERVER_client_keep (pr->client);
1391 pr->crl_entry = crl;
1393 if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1395 pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1396 *pr->namespace = lgc->namespace;
1398 pr->replies_seen = lgc->results;
1399 lgc->results = NULL;
1400 pr->start_time = GNUNET_TIME_absolute_get ();
1401 pr->query = lgc->query;
1402 pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1403 pr->replies_seen_off = lgc->results_used;
1404 pr->replies_seen_size = lgc->results_size;
1405 lgc->results_size = 0;
1406 pr->type = lgc->type;
1407 pr->anonymity_level = lgc->anonymity_level;
1408 pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1412 GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1415 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1416 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1417 get_processing_delay (),
1418 &forward_request_task,
1420 local_get_context_free (lgc);
1423 /* got all possible results, clean up! */
1425 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1426 "Found all possible results for query for `%s', done!\n",
1427 GNUNET_h2s (&lgc->query));
1429 local_get_context_free (lgc);
1432 if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1435 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1436 "Received on-demand block for `%s' from local datastore, fetching data.\n",
1437 GNUNET_h2s (&lgc->query));
1439 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
1440 anonymity, expiration, uid,
1442 &process_local_get_result,
1446 if ( (type != lgc->type) &&
1447 (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) )
1449 /* this should be virtually impossible to reach (DBLOCK
1450 query hash being identical to KBLOCK/SBLOCK query hash);
1451 nevertheless, if it happens, the correct thing is to
1452 simply skip the result. */
1454 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1455 "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n",
1458 GNUNET_h2s (&lgc->query));
1460 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1463 /* check if this is a result we've alredy
1465 for (i=0;i<lgc->results_used;i++)
1466 if (0 == memcmp (key,
1468 sizeof (GNUNET_HashCode)))
1471 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1472 "Received duplicate result for `%s' from local datastore, ignoring.\n",
1473 GNUNET_h2s (&lgc->query));
1475 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1478 if (lgc->results_used == lgc->results_size)
1479 GNUNET_array_grow (lgc->results,
1481 lgc->results_size * 2 + 2);
1482 GNUNET_CRYPTO_hash (data,
1484 &lgc->results[lgc->results_used++]);
1485 msize = size + sizeof (struct ContentMessage);
1486 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1487 lgc->result = GNUNET_malloc (msize);
1488 lgc->result->header.size = htons (msize);
1489 lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
1490 lgc->result->type = htonl (type);
1491 lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
1492 memcpy (&lgc->result[1],
1496 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1497 "Received new result for `%s' from local datastore, passing to client.\n",
1498 GNUNET_h2s (&lgc->query));
1500 GNUNET_SERVER_notify_transmit_ready (lgc->client,
1502 GNUNET_TIME_UNIT_FOREVER_REL,
1503 &transmit_local_result,
1509 * We're processing a search request from a local
1510 * client. Now it is our turn to query the datastore.
1512 * @param cls our closure (struct LocalGetContext)
1516 transmit_local_get (void *cls,
1517 const struct GNUNET_SCHEDULER_TaskContext *tc)
1519 struct LocalGetContext *lgc = cls;
1523 if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1524 type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1525 GNUNET_DATASTORE_get (dsh,
1528 &process_local_get_result,
1530 GNUNET_TIME_UNIT_FOREVER_REL);
1535 * We're processing a search request from a local
1536 * client. Now it is our turn to query the datastore.
1538 * @param cls our closure (struct LocalGetContext)
1539 * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
1542 transmit_local_get_ready (void *cls,
1545 struct LocalGetContext *lgc = cls;
1547 GNUNET_assert (GNUNET_OK == ok);
1548 GNUNET_SCHEDULER_add_continuation (sched,
1549 &transmit_local_get,
1551 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1556 * Handle START_SEARCH-message (search request from client).
1558 * @param cls closure
1559 * @param client identification of the client
1560 * @param message the actual message
1563 handle_start_search (void *cls,
1564 struct GNUNET_SERVER_Client *client,
1565 const struct GNUNET_MessageHeader *message)
1567 const struct SearchMessage *sm;
1568 struct LocalGetContext *lgc;
1572 msize = ntohs (message->size);
1573 if ( (msize < sizeof (struct SearchMessage)) ||
1574 (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
1577 GNUNET_SERVER_receive_done (client,
1581 sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
1582 sm = (const struct SearchMessage*) message;
1583 GNUNET_SERVER_client_keep (client);
1584 lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
1587 lgc->results_used = sc;
1588 GNUNET_array_grow (lgc->results,
1591 memcpy (lgc->results,
1593 sc * sizeof (GNUNET_HashCode));
1595 lgc->client = client;
1596 lgc->type = ntohl (sm->type);
1597 lgc->anonymity_level = ntohl (sm->anonymity_level);
1600 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
1601 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
1602 lgc->target.hashPubKey = sm->target;
1604 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
1605 lgc->namespace = sm->target;
1610 lgc->query = sm->query;
1611 GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
1612 lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
1613 &transmit_local_get_ready,
1619 * List of handlers for the messages understood by this
1622 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1623 {&GNUNET_FS_handle_index_start, NULL,
1624 GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
1625 {&GNUNET_FS_handle_index_list_get, NULL,
1626 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
1627 {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
1628 sizeof (struct UnindexMessage) },
1629 {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
1636 * Clean up the memory used by the PendingRequest structure (except
1637 * for the client or peer list that the request may be part of).
1639 * @param pr request to clean up
1642 destroy_pending_request (struct PendingRequest *pr)
1644 struct PendingMessage *reply;
1645 struct ClientList *cl;
1647 GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
1650 // FIXME: not sure how this can work (efficiently)
1651 // also, what does the return value mean?
1652 if (pr->irc != NULL)
1654 GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1657 if (pr->client == NULL)
1659 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
1664 cl = pr->crl_entry->cl;
1665 GNUNET_CONTAINER_DLL_remove (cl->head,
1669 if (GNUNET_SCHEDULER_NO_TASK != pr->task)
1670 GNUNET_SCHEDULER_cancel (sched, pr->task);
1671 if (NULL != pr->cth)
1672 GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
1674 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1676 GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
1677 while (NULL != (reply = pr->replies_pending))
1679 pr->replies_pending = reply->next;
1680 GNUNET_free (reply);
1682 GNUNET_PEER_change_rc (pr->source_pid, -1);
1683 GNUNET_PEER_change_rc (pr->target_pid, -1);
1684 GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1685 GNUNET_free_non_null (pr->used_pids);
1686 GNUNET_free_non_null (pr->replies_seen);
1687 GNUNET_free_non_null (pr->namespace);
1693 * A client disconnected. Remove all of its pending queries.
1695 * @param cls closure, NULL
1696 * @param client identification of the client
1699 handle_client_disconnect (void *cls,
1700 struct GNUNET_SERVER_Client
1703 struct LocalGetContext *lgc;
1704 struct ClientList *cpos;
1705 struct ClientList *cprev;
1706 struct ClientRequestList *rl;
1711 while ( (NULL != lgc) &&
1712 (lgc->client != client) )
1715 local_get_context_free (lgc);
1718 while ( (NULL != cpos) &&
1719 (clients->client != client) )
1727 clients = cpos->next;
1729 cprev->next = cpos->next;
1730 while (NULL != (rl = cpos->head))
1732 cpos->head = rl->next;
1733 destroy_pending_request (rl->req);
1742 * Iterator over entries in the "requests_by_query" map
1743 * that frees all the entries.
1745 * @param cls closure, NULL
1746 * @param key current key code (the query, unused)
1747 * @param value value in the hash map, of type "struct PendingRequest*"
1748 * @return GNUNET_YES (we should continue to iterate)
1751 destroy_pending_request_cb (void *cls,
1752 const GNUNET_HashCode * key,
1755 struct PendingRequest *pr = value;
1757 destroy_pending_request (pr);
1763 * Task run during shutdown.
1769 shutdown_task (void *cls,
1770 const struct GNUNET_SCHEDULER_TaskContext *tc)
1774 GNUNET_CORE_disconnect (core);
1779 GNUNET_DATASTORE_disconnect (dsh,
1783 GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
1784 &destroy_pending_request_cb,
1786 while (clients != NULL)
1787 handle_client_disconnect (NULL,
1789 GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
1790 requests_by_query = NULL;
1791 GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
1792 requests_by_peer = NULL;
1793 GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
1794 requests_by_expiration = NULL;
1795 // FIXME: iterate over entries and free individually?
1796 // (or do we get disconnect notifications?)
1797 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1798 connected_peers = NULL;
1803 * Free (each) request made by the peer.
1805 * @param cls closure, points to peer that the request belongs to
1806 * @param key current key code
1807 * @param value value in the hash map
1808 * @return GNUNET_YES (we should continue to iterate)
1811 destroy_request (void *cls,
1812 const GNUNET_HashCode * key,
1815 const struct GNUNET_PeerIdentity * peer = cls;
1816 struct PendingRequest *pr = value;
1818 GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
1821 destroy_pending_request (pr);
1828 * Method called whenever a given peer connects.
1830 * @param cls closure, not used
1831 * @param peer peer identity this notification is about
1832 * @param latency reported latency of the connection with 'other'
1833 * @param distance reported distance (DV) to 'other'
1836 peer_connect_handler (void *cls,
1838 GNUNET_PeerIdentity * peer,
1839 struct GNUNET_TIME_Relative latency,
1842 struct ConnectedPeer *cp;
1844 cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1845 cp->pid = GNUNET_PEER_intern (peer);
1846 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1849 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1854 * Method called whenever a peer disconnects.
1856 * @param cls closure, not used
1857 * @param peer peer identity this notification is about
1860 peer_disconnect_handler (void *cls,
1862 GNUNET_PeerIdentity * peer)
1864 struct ConnectedPeer *cp;
1866 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1868 GNUNET_PEER_change_rc (cp->pid, -1);
1869 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1871 GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
1879 * We're processing a GET request from
1880 * another peer and have decided to forward
1881 * it to other peers.
1883 * @param cls our "struct ProcessGetContext *"
1887 forward_get_request (void *cls,
1888 const struct GNUNET_SCHEDULER_TaskContext *tc)
1890 struct ProcessGetContext *pgc = cls;
1891 struct PendingRequest *pr;
1892 struct PendingRequest *eer;
1893 struct GNUNET_PeerIdentity target;
1895 pr = GNUNET_malloc (sizeof (struct PendingRequest));
1896 if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
1898 pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
1899 *pr->namespace = pgc->namespace;
1902 pr->bf_size = pgc->bf_size;
1904 pr->start_time = pgc->start_time;
1905 pr->query = pgc->query;
1906 pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
1907 if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
1908 pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
1909 pr->anonymity_level = 1; /* default */
1910 pr->priority = pgc->priority;
1911 pr->remaining_priority = pr->priority;
1912 pr->mingle = pgc->mingle;
1914 pr->type = pgc->type;
1915 GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1918 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1919 GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
1920 &pgc->reply_to.hashPubKey,
1922 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1923 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration,
1925 pr->start_time.value + pr->ttl);
1926 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
1928 /* expire oldest request! */
1929 eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
1930 GNUNET_PEER_resolve (eer->source_pid,
1932 GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
1935 destroy_pending_request (eer);
1937 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1938 get_processing_delay (),
1939 &forward_request_task,
1944 * Transmit the given message by copying it to
1945 * the target buffer "buf". "buf" will be
1946 * NULL and "size" zero if the socket was closed for
1947 * writing in the meantime. In that case, only
1951 * @param cls closure, pointer to the message
1952 * @param size number of bytes available in buf
1953 * @param buf where the callee should write the message
1954 * @return number of bytes written to buf
1957 transmit_message (void *cls,
1958 size_t size, void *buf)
1960 struct GNUNET_MessageHeader *msg = cls;
1966 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1967 "Dropping reply, core too busy.\n");
1972 msize = ntohs (msg->size);
1973 GNUNET_assert (size >= msize);
1974 memcpy (buf, msg, msize);
1981 * Test if the load on this peer is too high
1982 * to even consider processing the query at
1985 * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
1988 test_load_too_high ()
1990 return GNUNET_NO; // FIXME
1995 * We're processing (local) results for a search request
1996 * from another peer. Pass applicable results to the
1997 * peer and if we are done either clean up (operation
1998 * complete) or forward to other peers (more results possible).
2000 * @param cls our closure (struct LocalGetContext)
2001 * @param key key for the content
2002 * @param size number of bytes in data
2003 * @param data content stored
2004 * @param type type of the content
2005 * @param priority priority of the content
2006 * @param anonymity anonymity-level for the content
2007 * @param expiration expiration time for the content
2008 * @param uid unique identifier for the datum;
2009 * maybe 0 if no unique identifier is available
2012 process_p2p_get_result (void *cls,
2013 const GNUNET_HashCode * key,
2019 struct GNUNET_TIME_Absolute
2023 struct ProcessGetContext *pgc = cls;
2024 GNUNET_HashCode dhash;
2025 GNUNET_HashCode mhash;
2026 struct PutMessage *reply;
2030 /* no more results */
2031 if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) == ROUTING_POLICY_FORWARD) &&
2032 ( (0 == pgc->results_found) ||
2033 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2034 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2035 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2037 GNUNET_SCHEDULER_add_continuation (sched,
2038 &forward_get_request,
2040 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2044 if (pgc->bf != NULL)
2045 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2051 if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2053 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
2054 anonymity, expiration, uid, dsh,
2055 &process_p2p_get_result,
2059 /* check for duplicates */
2060 GNUNET_CRYPTO_hash (data, size, &dhash);
2061 mingle_hash (&dhash,
2064 if ( (pgc->bf != NULL) &&
2066 GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2070 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2071 "Result from datastore filtered by bloomfilter.\n");
2073 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2076 pgc->results_found++;
2077 if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2078 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2079 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2081 if (pgc->bf == NULL)
2084 pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2088 GNUNET_CONTAINER_bloomfilter_add (pgc->bf,
2092 reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2093 reply->header.size = htons (sizeof (struct PutMessage) + size);
2094 reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2095 reply->type = htonl (type);
2096 reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2097 memcpy (&reply[1], data, size);
2098 GNUNET_CORE_notify_transmit_ready (core,
2100 ACCEPTABLE_REPLY_DELAY,
2102 sizeof (struct PutMessage) + size,
2105 if ( (GNUNET_YES == test_load_too_high()) ||
2106 (pgc->results_found > 5 + 2 * pgc->priority) )
2108 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2109 pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2112 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2117 * We're processing a GET request from another peer. Give it to our
2120 * @param cls our "struct ProcessGetContext"
2121 * @param ok did we get a datastore slice or not?
2124 ds_get_request (void *cls,
2127 struct ProcessGetContext *pgc = cls;
2129 struct GNUNET_TIME_Relative timeout;
2131 if (GNUNET_OK != ok)
2133 /* no point in doing P2P stuff if we can't even do local */
2138 if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2139 type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2140 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2141 (pgc->priority + 1));
2142 GNUNET_DATASTORE_get (dsh,
2145 &process_p2p_get_result,
2152 * The priority level imposes a bound on the maximum
2153 * value for the ttl that can be requested.
2155 * @param ttl_in requested ttl
2156 * @param prio given priority
2157 * @return ttl_in if ttl_in is below the limit,
2158 * otherwise the ttl-limit for the given priority
2161 bound_ttl (int32_t ttl_in, uint32_t prio)
2163 unsigned long long allowed;
2167 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
2168 if (ttl_in > allowed)
2170 if (allowed >= (1 << 30))
2179 * We've received a request with the specified
2180 * priority. Bound it according to how much
2181 * we trust the given peer.
2183 * @param prio_in requested priority
2184 * @param peer the peer making the request
2185 * @return effective priority
2188 bound_priority (uint32_t prio_in,
2189 const struct GNUNET_PeerIdentity *peer)
2196 * Handle P2P "GET" request.
2198 * @param cls closure, always NULL
2199 * @param other the other peer involved (sender or receiver, NULL
2200 * for loopback messages where we are both sender and receiver)
2201 * @param message the actual message
2202 * @param latency reported latency of the connection with 'other'
2203 * @param distance reported distance (DV) to 'other'
2204 * @return GNUNET_OK to keep the connection open,
2205 * GNUNET_SYSERR to close it (signal serious error)
2208 handle_p2p_get (void *cls,
2209 const struct GNUNET_PeerIdentity *other,
2210 const struct GNUNET_MessageHeader *message,
2211 struct GNUNET_TIME_Relative latency,
2215 const struct GetMessage *gm;
2217 const GNUNET_HashCode *opt;
2218 struct ProcessGetContext *pgc;
2221 uint32_t ttl_decrement;
2226 msize = ntohs(message->size);
2227 if (msize < sizeof (struct GetMessage))
2229 GNUNET_break_op (0);
2230 return GNUNET_SYSERR;
2232 gm = (const struct GetMessage*) message;
2233 bm = ntohl (gm->hash_bitmap);
2241 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2243 GNUNET_break_op (0);
2244 return GNUNET_SYSERR;
2246 opt = (const GNUNET_HashCode*) &gm[1];
2247 bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2248 pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2251 pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2254 pgc->bf_size = bfsize;
2256 pgc->type = ntohl (gm->type);
2257 pgc->bm = ntohl (gm->hash_bitmap);
2258 pgc->mingle = gm->filter_mutator;
2260 if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2261 pgc->reply_to.hashPubKey = opt[bits++];
2263 pgc->reply_to = *other;
2264 if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2265 pgc->namespace = opt[bits++];
2266 else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2268 GNUNET_break_op (0);
2270 return GNUNET_SYSERR;
2272 if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2273 pgc->prime_target.hashPubKey = opt[bits++];
2274 /* note that we can really only check load here since otherwise
2275 peers could find out that we are overloaded by being disconnected
2276 after sending us a malformed query... */
2277 if (GNUNET_YES == test_load_too_high ())
2279 if (NULL != pgc->bf)
2280 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2283 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2284 "Dropping query from `%s', this peer is too busy.\n",
2285 GNUNET_i2s (other));
2289 net_load_up = 50; // FIXME
2290 net_load_down = 50; // FIXME
2291 pgc->policy = ROUTING_POLICY_NONE;
2292 if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2293 (net_load_down < IDLE_LOAD_THRESHOLD) )
2295 pgc->policy |= ROUTING_POLICY_ALL;
2296 pgc->priority = 0; /* no charge */
2300 pgc->priority = bound_priority (ntohl (gm->priority), other);
2302 IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2304 IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2306 pgc->policy |= ROUTING_POLICY_ALL;
2310 // FIXME: is this sound?
2311 if (net_load_up < 90 + 10 * pgc->priority)
2312 pgc->policy |= ROUTING_POLICY_FORWARD;
2313 if (net_load_down < 90 + 10 * pgc->priority)
2314 pgc->policy |= ROUTING_POLICY_ANSWER;
2317 if (pgc->policy == ROUTING_POLICY_NONE)
2320 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2321 "Dropping query from `%s', network saturated.\n",
2322 GNUNET_i2s (other));
2324 if (NULL != pgc->bf)
2325 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2327 return GNUNET_OK; /* drop */
2329 if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2330 pgc->priority = 0; /* kill the priority (we cannot benefit) */
2331 pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2332 /* decrement ttl (always) */
2333 ttl_decrement = 2 * TTL_DECREMENT +
2334 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2336 if ( (pgc->ttl < 0) &&
2337 (pgc->ttl - ttl_decrement > 0) )
2340 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2341 "Dropping query from `%s' due to TTL underflow.\n",
2342 GNUNET_i2s (other));
2344 /* integer underflow => drop (should be very rare)! */
2345 if (NULL != pgc->bf)
2346 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2350 pgc->ttl -= ttl_decrement;
2351 pgc->start_time = GNUNET_TIME_absolute_get ();
2352 preference = (double) pgc->priority;
2353 if (preference < QUERY_BANDWIDTH_VALUE)
2354 preference = QUERY_BANDWIDTH_VALUE;
2355 // FIXME: also reserve bandwidth for reply?
2356 (void) GNUNET_CORE_peer_change_preference (sched, cfg,
2358 GNUNET_TIME_UNIT_FOREVER_REL,
2359 0, 0, preference, NULL, NULL);
2360 if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2361 pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2365 GNUNET_SCHEDULER_add_continuation (sched,
2366 &forward_get_request,
2368 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2374 * Function called to notify us that we can now transmit a reply to a
2375 * client or peer. "buf" will be NULL and "size" zero if the socket was
2376 * closed for writing in the meantime.
2378 * @param cls closure, points to a "struct PendingRequest*" with
2379 * one or more pending replies
2380 * @param size number of bytes available in buf
2381 * @param buf where the callee should write the message
2382 * @return number of bytes written to buf
2385 transmit_result (void *cls,
2389 struct PendingRequest *pr = cls;
2391 struct PendingMessage *reply;
2395 while (NULL != (reply = pr->replies_pending))
2397 if ( (reply->msize + ret < ret) ||
2398 (reply->msize + ret > size) )
2400 pr->replies_pending = reply->next;
2401 memcpy (&cbuf[ret], &reply[1], reply->msize);
2402 ret += reply->msize;
2403 GNUNET_free (reply);
2410 * Iterator over pending requests.
2412 * @param cls response (struct ProcessReplyClosure)
2413 * @param key our query
2414 * @param value value in the hash map (meta-info about the query)
2415 * @return GNUNET_YES (we should continue to iterate)
2418 process_reply (void *cls,
2419 const GNUNET_HashCode * key,
2422 struct ProcessReplyClosure *prq = cls;
2423 struct PendingRequest *pr = value;
2424 struct PendingRequest *eer;
2425 struct PendingMessage *reply;
2426 struct PutMessage *pm;
2427 struct ContentMessage *cm;
2428 GNUNET_HashCode chash;
2429 GNUNET_HashCode mhash;
2430 struct GNUNET_PeerIdentity target;
2433 struct GNUNET_TIME_Relative max_delay;
2435 GNUNET_CRYPTO_hash (prq->data,
2440 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2441 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2443 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2444 /* FIXME: does prq->namespace match our expectations? */
2445 /* then: fall-through??? */
2446 case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2449 mingle_hash (&chash, pr->mingle, &mhash);
2450 if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2452 return GNUNET_YES; /* duplicate */
2453 GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2457 case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2458 // FIXME: any checks against duplicates for SKBlocks?
2461 prio = pr->priority;
2462 prq->priority += pr->remaining_priority;
2463 pr->remaining_priority = 0;
2464 if (pr->client != NULL)
2466 if (pr->replies_seen_size == pr->replies_seen_off)
2467 GNUNET_array_grow (pr->replies_seen,
2468 pr->replies_seen_size,
2469 pr->replies_seen_size * 2 + 4);
2470 pr->replies_seen[pr->replies_seen_off++] = chash;
2471 // FIXME: possibly recalculate BF!
2473 if (pr->client == NULL)
2475 msize = sizeof (struct ContentMessage) + prq->size;
2476 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2477 reply->msize = msize;
2478 cm = (struct ContentMessage*) &reply[1];
2479 cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2480 cm->header.size = htons (msize);
2481 cm->type = htonl (prq->type);
2482 cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2483 reply->next = pr->replies_pending;
2484 pr->replies_pending = reply;
2485 memcpy (&reply[1], prq->data, prq->size);
2486 if (pr->cth != NULL)
2488 max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2489 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
2491 /* estimate expiration time from time difference between
2492 first request that will be discarded and this request */
2493 eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2494 max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
2497 GNUNET_PEER_resolve (pr->source_pid,
2499 pr->cth = GNUNET_CORE_notify_transmit_ready (core,
2506 if (NULL == pr->cth)
2508 // FIXME: now what? discard?
2513 msize = sizeof (struct PutMessage) + prq->size;
2514 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2515 reply->msize = msize;
2516 reply->next = pr->replies_pending;
2517 pm = (struct PutMessage*) &reply[1];
2518 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2519 pm->header.size = htons (msize);
2520 pm->type = htonl (prq->type);
2521 pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
2522 pr->replies_pending = reply;
2523 memcpy (&reply[1], prq->data, prq->size);
2526 pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
2528 GNUNET_TIME_UNIT_FOREVER_REL,
2533 // FIXME: need to try again later (not much
2534 // to do here specifically, but we need to
2535 // check somewhere else to handle this case!)
2538 // FIXME: implement hot-path routing statistics keeping!
2544 * Check if the given KBlock is well-formed.
2546 * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
2547 * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
2548 * @param query where to store the query that this block answers
2549 * @return GNUNET_OK if this is actually a well-formed KBlock
2552 check_kblock (const struct KBlock *kb,
2554 GNUNET_HashCode *query)
2556 if (dsize < sizeof (struct KBlock))
2558 GNUNET_break_op (0);
2559 return GNUNET_SYSERR;
2561 if (dsize - sizeof (struct KBlock) !=
2562 ntohs (kb->purpose.size)
2563 - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose)
2564 - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) )
2566 GNUNET_break_op (0);
2567 return GNUNET_SYSERR;
2570 GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
2575 GNUNET_break_op (0);
2576 return GNUNET_SYSERR;
2579 GNUNET_CRYPTO_hash (&kb->keyspace,
2580 sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2587 * Check if the given SBlock is well-formed.
2589 * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
2590 * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
2591 * @param query where to store the query that this block answers
2592 * @param namespace where to store the namespace that this block belongs to
2593 * @return GNUNET_OK if this is actually a well-formed SBlock
2596 check_sblock (const struct SBlock *sb,
2598 GNUNET_HashCode *query,
2599 GNUNET_HashCode *namespace)
2601 if (dsize < sizeof (struct SBlock))
2603 GNUNET_break_op (0);
2604 return GNUNET_SYSERR;
2607 ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
2609 GNUNET_break_op (0);
2610 return GNUNET_SYSERR;
2613 GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
2618 GNUNET_break_op (0);
2619 return GNUNET_SYSERR;
2622 *query = sb->identifier;
2623 if (namespace != NULL)
2624 GNUNET_CRYPTO_hash (&sb->subspace,
2625 sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2632 * Handle P2P "PUT" request.
2634 * @param cls closure, always NULL
2635 * @param other the other peer involved (sender or receiver, NULL
2636 * for loopback messages where we are both sender and receiver)
2637 * @param message the actual message
2638 * @param latency reported latency of the connection with 'other'
2639 * @param distance reported distance (DV) to 'other'
2640 * @return GNUNET_OK to keep the connection open,
2641 * GNUNET_SYSERR to close it (signal serious error)
2644 handle_p2p_put (void *cls,
2645 const struct GNUNET_PeerIdentity *other,
2646 const struct GNUNET_MessageHeader *message,
2647 struct GNUNET_TIME_Relative latency,
2650 const struct PutMessage *put;
2654 struct GNUNET_TIME_Absolute expiration;
2655 GNUNET_HashCode query;
2656 struct ProcessReplyClosure prq;
2658 msize = ntohs (message->size);
2659 if (msize < sizeof (struct PutMessage))
2662 return GNUNET_SYSERR;
2664 put = (const struct PutMessage*) message;
2665 dsize = msize - sizeof (struct PutMessage);
2666 type = ntohl (put->type);
2667 expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
2669 /* first, validate! */
2672 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2673 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2674 GNUNET_CRYPTO_hash (&put[1], dsize, &query);
2676 case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2678 check_kblock ((const struct KBlock*) &put[1],
2681 return GNUNET_SYSERR;
2683 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2685 check_sblock ((const struct SBlock*) &put[1],
2689 return GNUNET_SYSERR;
2691 case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2692 // FIXME -- validate SKBLOCK!
2696 /* unknown block type */
2697 GNUNET_break_op (0);
2698 return GNUNET_SYSERR;
2701 /* now, lookup 'query' */
2702 prq.data = (const void*) &put[1];
2705 prq.expiration = expiration;
2707 GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
2711 // FIXME: if migration is on and load is low,
2712 // queue to store data in datastore;
2713 // use "prq.priority" for that!
2719 * List of handlers for P2P messages
2720 * that we care about.
2722 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2725 GNUNET_MESSAGE_TYPE_FS_GET, 0 },
2727 GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
2733 * Process fs requests.
2735 * @param cls closure
2736 * @param s scheduler to use
2737 * @param server the initialized server
2738 * @param c configuration to use
2742 struct GNUNET_SCHEDULER_Handle *s,
2743 struct GNUNET_SERVER_Handle *server,
2744 const struct GNUNET_CONFIGURATION_Handle *c)
2750 requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2751 requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2752 connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
2753 requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2754 GNUNET_FS_init_indexing (sched, cfg);
2755 dsh = GNUNET_DATASTORE_connect (cfg,
2757 core = GNUNET_CORE_connect (sched,
2759 GNUNET_TIME_UNIT_FOREVER_REL,
2763 &peer_connect_handler,
2764 &peer_disconnect_handler,
2769 GNUNET_SERVER_disconnect_notify (server,
2770 &handle_client_disconnect,
2772 GNUNET_SERVER_add_handlers (server, handlers);
2773 GNUNET_SCHEDULER_add_delayed (sched,
2774 GNUNET_TIME_UNIT_FOREVER_REL,
2779 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2780 _("Failed to connect to `%s' service.\n"),
2782 GNUNET_SCHEDULER_shutdown (sched);
2787 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2788 _("Failed to connect to `%s' service.\n"),
2790 GNUNET_SCHEDULER_shutdown (sched);
2797 * The main function for the fs service.
2799 * @param argc number of arguments from the command line
2800 * @param argv command line arguments
2801 * @return 0 ok, 1 on error
2804 main (int argc, char *const *argv)
2806 return (GNUNET_OK ==
2807 GNUNET_SERVICE_run (argc,
2810 GNUNET_SERVICE_OPTION_NONE,
2811 &run, NULL)) ? 0 : 1;
2814 /* end of gnunet-service-fs.c */