2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011, 2016, 2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file dht/gnunet-service-dht_clients.c
23 * @brief GNUnet DHT service's client management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-service-dht.h"
33 #include "gnunet-service-dht_datacache.h"
34 #include "gnunet-service-dht_neighbours.h"
39 * Should routing details be logged to stderr (for debugging)?
41 #define LOG_TRAFFIC(kind, ...) GNUNET_log_from(kind, "dht-traffic", __VA_ARGS__)
43 #define LOG(kind, ...) GNUNET_log_from(kind, "dht-clients", __VA_ARGS__)
47 * Struct containing information about a client,
48 * handle to connect to it, and any pending messages
49 * that need to be sent to it.
55 * Entry in the local forwarding map for a client's GET request.
57 struct ClientQueryRecord {
59 * The key this request was about
61 struct GNUNET_HashCode key;
64 * Kept in a DLL with @e client.
66 struct ClientQueryRecord *next;
69 * Kept in a DLL with @e client.
71 struct ClientQueryRecord *prev;
74 * Client responsible for the request.
76 struct ClientHandle *ch;
79 * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
84 * Replies we have already seen for this request.
86 struct GNUNET_HashCode *seen_replies;
89 * Pointer to this nodes heap location in the retry-heap (for fast removal)
91 struct GNUNET_CONTAINER_HeapNode *hnode;
94 * What's the delay between re-try operations that we currently use for this
97 struct GNUNET_TIME_Relative retry_frequency;
100 * What's the next time we should re-try this request?
102 struct GNUNET_TIME_Absolute retry_time;
105 * The unique identifier of this request
110 * Number of bytes in xquery.
115 * Number of entries in 'seen_replies'.
117 unsigned int seen_replies_count;
120 * Desired replication level
122 uint32_t replication;
125 * Any message options for this request
127 uint32_t msg_options;
130 * The type for the data for the GET request.
132 enum GNUNET_BLOCK_Type type;
137 * Struct containing paremeters of monitoring requests.
139 struct ClientMonitorRecord {
141 * Next element in DLL.
143 struct ClientMonitorRecord *next;
146 * Previous element in DLL.
148 struct ClientMonitorRecord *prev;
151 * Type of blocks that are of interest
153 enum GNUNET_BLOCK_Type type;
156 * Key of data of interest, NULL for all.
158 struct GNUNET_HashCode *key;
161 * Flag whether to notify about GET messages.
166 * Flag whether to notify about GET_REPONSE messages.
171 * Flag whether to notify about PUT messages.
176 * Client to notify of these requests.
178 struct ClientHandle *ch;
183 * Struct containing information about a client,
184 * handle to connect to it, and any pending messages
185 * that need to be sent to it.
187 struct ClientHandle {
189 * Linked list of active queries of this client.
191 struct ClientQueryRecord *cqr_head;
194 * Linked list of active queries of this client.
196 struct ClientQueryRecord *cqr_tail;
199 * The handle to this client
201 struct GNUNET_SERVICE_Client *client;
204 * The message queue to this client
206 struct GNUNET_MQ_Handle *mq;
210 * Our handle to the BLOCK library.
212 struct GNUNET_BLOCK_Context *GDS_block_context;
215 * Handle for the statistics service.
217 struct GNUNET_STATISTICS_Handle *GDS_stats;
220 * Handle for the service.
222 struct GNUNET_SERVICE_Handle *GDS_service;
225 * The configuration the DHT service is running with
227 const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
230 * List of active monitoring requests.
232 static struct ClientMonitorRecord *monitor_head;
235 * List of active monitoring requests.
237 static struct ClientMonitorRecord *monitor_tail;
240 * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
242 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
245 * Heap with all of our client's request, sorted by retry time (earliest on top).
247 static struct GNUNET_CONTAINER_Heap *retry_heap;
250 * Task that re-transmits requests (using retry_heap).
252 static struct GNUNET_SCHEDULER_Task *retry_task;
256 * Free data structures associated with the given query.
258 * @param record record to remove
261 remove_client_record(struct ClientQueryRecord *record)
263 struct ClientHandle *ch = record->ch;
265 GNUNET_CONTAINER_DLL_remove(ch->cqr_head,
268 GNUNET_assert(GNUNET_YES ==
269 GNUNET_CONTAINER_multihashmap_remove(forward_map,
272 if (NULL != record->hnode)
273 GNUNET_CONTAINER_heap_remove_node(record->hnode);
274 GNUNET_array_grow(record->seen_replies,
275 record->seen_replies_count,
282 * Functions with this signature are called whenever a local client is
285 * @param cls closure (NULL for dht)
286 * @param client identification of the client
287 * @param mq message queue for talking to @a client
288 * @return our `struct ClientHandle` for @a client
291 client_connect_cb(void *cls,
292 struct GNUNET_SERVICE_Client *client,
293 struct GNUNET_MQ_Handle *mq)
295 struct ClientHandle *ch;
297 ch = GNUNET_new(struct ClientHandle);
305 * Functions with this signature are called whenever a client
306 * is disconnected on the network level.
308 * @param cls closure (NULL for dht)
309 * @param client identification of the client
310 * @param app_ctx our `struct ClientHandle` for @a client
313 client_disconnect_cb(void *cls,
314 struct GNUNET_SERVICE_Client *client,
317 struct ClientHandle *ch = app_ctx;
318 struct ClientQueryRecord *cqr;
319 struct ClientMonitorRecord *monitor;
321 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
322 "Local client %p disconnects\n",
324 monitor = monitor_head;
325 while (NULL != monitor)
327 if (monitor->ch == ch)
329 struct ClientMonitorRecord *next;
331 next = monitor->next;
332 GNUNET_free_non_null(monitor->key);
333 GNUNET_CONTAINER_DLL_remove(monitor_head,
336 GNUNET_free(monitor);
341 monitor = monitor->next;
344 while (NULL != (cqr = ch->cqr_head))
345 remove_client_record(cqr);
351 * Route the given request via the DHT. This includes updating
352 * the bloom filter and retransmission times, building the P2P
353 * message and initiating the routing operation.
356 transmit_request(struct ClientQueryRecord *cqr)
358 struct GNUNET_BLOCK_Group *bg;
359 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
361 GNUNET_STATISTICS_update(GDS_stats,
362 gettext_noop("# GET requests from clients injected"),
365 bg = GNUNET_BLOCK_group_create(GDS_block_context,
367 GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
372 cqr->seen_replies_count,
374 GNUNET_BLOCK_group_set_seen(bg,
376 cqr->seen_replies_count);
378 = GNUNET_CONTAINER_bloomfilter_init(NULL,
380 GNUNET_CONSTANTS_BLOOMFILTER_K);
381 LOG(GNUNET_ERROR_TYPE_DEBUG,
382 "Initiating GET for %s, replication %u, already have %u replies\n",
383 GNUNET_h2s(&cqr->key),
385 cqr->seen_replies_count);
386 GDS_NEIGHBOURS_handle_get(cqr->type,
395 GNUNET_BLOCK_group_destroy(bg);
396 GNUNET_CONTAINER_bloomfilter_free(peer_bf);
398 /* exponential back-off for retries.
399 * max GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
400 cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF(cqr->retry_frequency);
401 cqr->retry_time = GNUNET_TIME_relative_to_absolute(cqr->retry_frequency);
406 * Task that looks at the #retry_heap and transmits all of the requests
407 * on the heap that are ready for transmission. Then re-schedules
408 * itself (unless the heap is empty).
413 transmit_next_request_task(void *cls)
415 struct ClientQueryRecord *cqr;
416 struct GNUNET_TIME_Relative delay;
419 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root(retry_heap)))
422 delay = GNUNET_TIME_absolute_get_remaining(cqr->retry_time);
423 if (delay.rel_value_us > 0)
426 = GNUNET_CONTAINER_heap_insert(retry_heap,
428 cqr->retry_time.abs_value_us);
430 = GNUNET_SCHEDULER_add_at(cqr->retry_time,
431 &transmit_next_request_task,
435 transmit_request(cqr);
437 = GNUNET_CONTAINER_heap_insert(retry_heap,
439 cqr->retry_time.abs_value_us);
445 * Check DHT PUT messages from the client.
447 * @param cls the client we received this message from
448 * @param dht_msg the actual message received
449 * @return #GNUNET_OK (always)
452 check_dht_local_put(void *cls,
453 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
455 /* always well-formed */
461 * Handler for PUT messages.
463 * @param cls the client we received this message from
464 * @param dht_msg the actual message received
467 handle_dht_local_put(void *cls,
468 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
470 struct ClientHandle *ch = cls;
471 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
474 size = ntohs(dht_msg->header.size);
475 GNUNET_STATISTICS_update(GDS_stats,
476 gettext_noop("# PUT requests received from clients"),
479 LOG_TRAFFIC(GNUNET_ERROR_TYPE_DEBUG,
481 GNUNET_h2s_full(&dht_msg->key));
482 /* give to local clients */
483 LOG(GNUNET_ERROR_TYPE_DEBUG,
484 "Handling local PUT of %u-bytes for query %s\n",
485 size - sizeof(struct GNUNET_DHT_ClientPutMessage),
486 GNUNET_h2s(&dht_msg->key));
487 GDS_CLIENTS_handle_reply(GNUNET_TIME_absolute_ntoh(dht_msg->expiration),
493 ntohl(dht_msg->type),
494 size - sizeof(struct GNUNET_DHT_ClientPutMessage),
497 GDS_DATACACHE_handle_put(GNUNET_TIME_absolute_ntoh(dht_msg->expiration),
501 ntohl(dht_msg->type),
502 size - sizeof(struct GNUNET_DHT_ClientPutMessage),
504 /* route to other peers */
506 = GNUNET_CONTAINER_bloomfilter_init(NULL,
508 GNUNET_CONSTANTS_BLOOMFILTER_K);
509 GDS_NEIGHBOURS_handle_put(ntohl(dht_msg->type),
510 ntohl(dht_msg->options),
511 ntohl(dht_msg->desired_replication_level),
512 GNUNET_TIME_absolute_ntoh(dht_msg->expiration),
519 size - sizeof(struct GNUNET_DHT_ClientPutMessage));
520 GDS_CLIENTS_process_put(ntohl(dht_msg->options),
521 ntohl(dht_msg->type),
523 ntohl(dht_msg->desired_replication_level),
525 GDS_NEIGHBOURS_get_id(),
526 GNUNET_TIME_absolute_ntoh(dht_msg->expiration),
529 size - sizeof(struct GNUNET_DHT_ClientPutMessage));
530 GNUNET_CONTAINER_bloomfilter_free(peer_bf);
531 GNUNET_SERVICE_client_continue(ch->client);
536 * Check DHT GET messages from the client.
538 * @param cls the client we received this message from
539 * @param message the actual message received
540 * @return #GNUNET_OK (always)
543 check_dht_local_get(void *cls,
544 const struct GNUNET_DHT_ClientGetMessage *get)
546 /* always well-formed */
552 * Handle a result from local datacache for a GET operation.
554 * @param cls the `struct ClientHandle` of the client doing the query
555 * @param type type of the block
556 * @param expiration_time when does the content expire
557 * @param key key for the content
558 * @param put_path_length number of entries in @a put_path
559 * @param put_path peers the original PUT traversed (if tracked)
560 * @param get_path_length number of entries in @a get_path
561 * @param get_path peers this reply has traversed so far (if tracked)
562 * @param data payload of the reply
563 * @param data_size number of bytes in @a data
566 handle_local_result(void *cls,
567 enum GNUNET_BLOCK_Type type,
568 struct GNUNET_TIME_Absolute expiration_time,
569 const struct GNUNET_HashCode *key,
570 unsigned int put_path_length,
571 const struct GNUNET_PeerIdentity *put_path,
572 unsigned int get_path_length,
573 const struct GNUNET_PeerIdentity *get_path,
577 // FIXME: this needs some clean up: inline the function,
578 // possibly avoid even looking up the client!
579 GDS_CLIENTS_handle_reply(expiration_time,
582 put_path_length, put_path,
589 * Handler for DHT GET messages from the client.
591 * @param cls the client we received this message from
592 * @param message the actual message received
595 handle_dht_local_get(void *cls,
596 const struct GNUNET_DHT_ClientGetMessage *get)
598 struct ClientHandle *ch = cls;
599 struct ClientQueryRecord *cqr;
604 size = ntohs(get->header.size);
605 xquery_size = size - sizeof(struct GNUNET_DHT_ClientGetMessage);
606 xquery = (const char *)&get[1];
607 GNUNET_STATISTICS_update(GDS_stats,
609 ("# GET requests received from clients"), 1,
611 LOG(GNUNET_ERROR_TYPE_DEBUG,
612 "Received GET request for %s from local client %p, xq: %.*s\n",
613 GNUNET_h2s(&get->key),
617 LOG_TRAFFIC(GNUNET_ERROR_TYPE_DEBUG,
619 GNUNET_h2s_full(&get->key));
621 cqr = GNUNET_malloc(sizeof(struct ClientQueryRecord) + xquery_size);
624 cqr->xquery = (void *)&cqr[1];
625 GNUNET_memcpy(&cqr[1], xquery, xquery_size);
626 cqr->hnode = GNUNET_CONTAINER_heap_insert(retry_heap, cqr, 0);
627 cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
628 cqr->retry_time = GNUNET_TIME_absolute_get();
629 cqr->unique_id = get->unique_id;
630 cqr->xquery_size = xquery_size;
631 cqr->replication = ntohl(get->desired_replication_level);
632 cqr->msg_options = ntohl(get->options);
633 cqr->type = ntohl(get->type);
634 GNUNET_CONTAINER_DLL_insert(ch->cqr_head,
637 GNUNET_CONTAINER_multihashmap_put(forward_map,
640 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
641 GDS_CLIENTS_process_get(ntohl(get->options),
644 ntohl(get->desired_replication_level),
646 GDS_NEIGHBOURS_get_id(),
648 /* start remote requests */
649 if (NULL != retry_task)
650 GNUNET_SCHEDULER_cancel(retry_task);
651 retry_task = GNUNET_SCHEDULER_add_now(&transmit_next_request_task,
653 /* perform local lookup */
654 GDS_DATACACHE_handle_get(&get->key,
659 &handle_local_result,
661 GNUNET_SERVICE_client_continue(ch->client);
666 * Closure for #find_by_unique_id().
668 struct FindByUniqueIdContext {
670 * Where to store the result, if found.
672 struct ClientQueryRecord *cqr;
679 * Function called for each existing DHT record for the given
680 * query. Checks if it matches the UID given in the closure
681 * and if so returns the entry as a result.
683 * @param cls the search context
684 * @param key query for the lookup (not used)
685 * @param value the `struct ClientQueryRecord`
686 * @return #GNUNET_YES to continue iteration (result not yet found)
689 find_by_unique_id(void *cls,
690 const struct GNUNET_HashCode *key,
693 struct FindByUniqueIdContext *fui_ctx = cls;
694 struct ClientQueryRecord *cqr = value;
696 if (cqr->unique_id != fui_ctx->unique_id)
704 * Check "GET result seen" messages from the client.
706 * @param cls the client we received this message from
707 * @param message the actual message received
708 * @return #GNUNET_OK if @a seen is well-formed
711 check_dht_local_get_result_seen(void *cls,
712 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
715 unsigned int hash_count;
717 size = ntohs(seen->header.size);
718 hash_count = (size - sizeof(struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof(struct GNUNET_HashCode);
719 if (size != sizeof(struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof(struct GNUNET_HashCode))
722 return GNUNET_SYSERR;
729 * Handler for "GET result seen" messages from the client.
731 * @param cls the client we received this message from
732 * @param message the actual message received
735 handle_dht_local_get_result_seen(void *cls,
736 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
738 struct ClientHandle *ch = cls;
740 unsigned int hash_count;
741 unsigned int old_count;
742 const struct GNUNET_HashCode *hc;
743 struct FindByUniqueIdContext fui_ctx;
744 struct ClientQueryRecord *cqr;
746 size = ntohs(seen->header.size);
747 hash_count = (size - sizeof(struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof(struct GNUNET_HashCode);
748 hc = (const struct GNUNET_HashCode*)&seen[1];
749 fui_ctx.unique_id = seen->unique_id;
751 GNUNET_CONTAINER_multihashmap_get_multiple(forward_map,
755 if (NULL == (cqr = fui_ctx.cqr))
758 GNUNET_SERVICE_client_drop(ch->client);
761 /* finally, update 'seen' list */
762 old_count = cqr->seen_replies_count;
763 GNUNET_array_grow(cqr->seen_replies,
764 cqr->seen_replies_count,
765 cqr->seen_replies_count + hash_count);
766 GNUNET_memcpy(&cqr->seen_replies[old_count],
768 sizeof(struct GNUNET_HashCode) * hash_count);
773 * Closure for #remove_by_unique_id().
775 struct RemoveByUniqueIdContext {
777 * Client that issued the removal request.
779 struct ClientHandle *ch;
782 * Unique ID of the request.
789 * Iterator over hash map entries that frees all entries
790 * that match the given client and unique ID.
792 * @param cls unique ID and client to search for in source routes
793 * @param key current key code
794 * @param value value in the hash map, a ClientQueryRecord
795 * @return #GNUNET_YES (we should continue to iterate)
798 remove_by_unique_id(void *cls,
799 const struct GNUNET_HashCode *key,
802 const struct RemoveByUniqueIdContext *ctx = cls;
803 struct ClientQueryRecord *cqr = value;
805 if (cqr->unique_id != ctx->unique_id)
807 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
808 "Removing client %p's record for key %s (by unique id)\n",
811 remove_client_record(cqr);
817 * Handler for any generic DHT stop messages, calls the appropriate handler
818 * depending on message type (if processed locally)
820 * @param cls client we received this message from
821 * @param message the actual message received
825 handle_dht_local_get_stop(void *cls,
826 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
828 struct ClientHandle *ch = cls;
829 struct RemoveByUniqueIdContext ctx;
831 GNUNET_STATISTICS_update(GDS_stats,
833 ("# GET STOP requests received from clients"), 1,
835 LOG(GNUNET_ERROR_TYPE_DEBUG,
836 "Received GET STOP request for %s from local client %p\n",
837 GNUNET_h2s(&dht_stop_msg->key),
840 ctx.unique_id = dht_stop_msg->unique_id;
841 GNUNET_CONTAINER_multihashmap_get_multiple(forward_map,
843 &remove_by_unique_id,
845 GNUNET_SERVICE_client_continue(ch->client);
850 * Handler for monitor start messages
852 * @param cls the client we received this message from
853 * @param msg the actual message received
857 handle_dht_local_monitor(void *cls,
858 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
860 struct ClientHandle *ch = cls;
861 struct ClientMonitorRecord *r;
863 r = GNUNET_new(struct ClientMonitorRecord);
865 r->type = ntohl(msg->type);
866 r->get = ntohs(msg->get);
867 r->get_resp = ntohs(msg->get_resp);
868 r->put = ntohs(msg->put);
869 if (0 == ntohs(msg->filter_key))
875 r->key = GNUNET_new(struct GNUNET_HashCode);
876 GNUNET_memcpy(r->key,
878 sizeof(struct GNUNET_HashCode));
880 GNUNET_CONTAINER_DLL_insert(monitor_head,
883 GNUNET_SERVICE_client_continue(ch->client);
888 * Handler for monitor stop messages
890 * @param cls the client we received this message from
891 * @param msg the actual message received
894 handle_dht_local_monitor_stop(void *cls,
895 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
897 struct ClientHandle *ch = cls;
898 struct ClientMonitorRecord *r;
901 GNUNET_SERVICE_client_continue(ch->client);
902 for (r = monitor_head; NULL != r; r = r->next)
906 keys_match = (0 == ntohs(msg->filter_key));
910 keys_match = ((0 != ntohs(msg->filter_key)) &&
913 sizeof(struct GNUNET_HashCode))));
916 (ntohl(msg->type) == r->type) &&
917 (r->get == msg->get) &&
918 (r->get_resp == msg->get_resp) &&
919 (r->put == msg->put) &&
922 GNUNET_CONTAINER_DLL_remove(monitor_head,
925 GNUNET_free_non_null(r->key);
927 return; /* Delete only ONE entry */
934 * Closure for #forward_reply()
936 struct ForwardReplyContext {
938 * Expiration time of the reply.
940 struct GNUNET_TIME_Absolute expiration;
945 const struct GNUNET_PeerIdentity *get_path;
950 const struct GNUNET_PeerIdentity *put_path;
958 * Number of bytes in data.
963 * Number of entries in @e get_path.
965 unsigned int get_path_length;
968 * Number of entries in @e put_path.
970 unsigned int put_path_length;
975 enum GNUNET_BLOCK_Type type;
980 * Iterator over hash map entries that send a given reply to
981 * each of the matching clients. With some tricky recycling
984 * @param cls the 'struct ForwardReplyContext'
985 * @param key current key
986 * @param value value in the hash map, a ClientQueryRecord
987 * @return #GNUNET_YES (we should continue to iterate),
988 * if the result is mal-formed, #GNUNET_NO
991 forward_reply(void *cls,
992 const struct GNUNET_HashCode *key,
995 struct ForwardReplyContext *frc = cls;
996 struct ClientQueryRecord *record = value;
997 struct GNUNET_MQ_Envelope *env;
998 struct GNUNET_DHT_ClientResultMessage *reply;
999 enum GNUNET_BLOCK_EvaluationResult eval;
1001 struct GNUNET_HashCode ch;
1002 struct GNUNET_PeerIdentity *paths;
1004 LOG_TRAFFIC(GNUNET_ERROR_TYPE_DEBUG,
1005 "CLIENT-RESULT %s\n",
1006 GNUNET_h2s_full(key));
1007 if ((record->type != GNUNET_BLOCK_TYPE_ANY) &&
1008 (record->type != frc->type))
1010 LOG(GNUNET_ERROR_TYPE_DEBUG,
1011 "Record type mismatch, not passing request for key %s to local client\n",
1013 GNUNET_STATISTICS_update(GDS_stats,
1015 ("# Key match, type mismatches in REPLY to CLIENT"),
1017 return GNUNET_YES; /* type mismatch */
1019 GNUNET_CRYPTO_hash(frc->data, frc->data_size, &ch);
1020 for (unsigned int i = 0; i < record->seen_replies_count; i++)
1021 if (0 == memcmp(&record->seen_replies[i],
1023 sizeof(struct GNUNET_HashCode)))
1025 LOG(GNUNET_ERROR_TYPE_DEBUG,
1026 "Duplicate reply, not passing request for key %s to local client\n",
1028 GNUNET_STATISTICS_update(GDS_stats,
1030 ("# Duplicate REPLIES to CLIENT request dropped"),
1032 return GNUNET_YES; /* duplicate */
1035 = GNUNET_BLOCK_evaluate(GDS_block_context,
1038 GNUNET_BLOCK_EO_NONE,
1041 record->xquery_size,
1044 LOG(GNUNET_ERROR_TYPE_DEBUG,
1045 "Evaluation result is %d for key %s for local client's query\n",
1050 case GNUNET_BLOCK_EVALUATION_OK_LAST:
1051 do_free = GNUNET_YES;
1054 case GNUNET_BLOCK_EVALUATION_OK_MORE:
1055 GNUNET_array_append(record->seen_replies,
1056 record->seen_replies_count,
1058 do_free = GNUNET_NO;
1061 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1062 /* should be impossible to encounter here */
1066 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1070 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1074 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1078 case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1081 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1082 GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
1083 _("Unsupported block type (%u) in request!\n"), record->type);
1090 GNUNET_STATISTICS_update(GDS_stats,
1091 gettext_noop("# RESULTS queued for clients"),
1094 env = GNUNET_MQ_msg_extra(reply,
1096 (frc->get_path_length + frc->put_path_length) * sizeof(struct GNUNET_PeerIdentity),
1097 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1098 reply->type = htonl(frc->type);
1099 reply->get_path_length = htonl(frc->get_path_length);
1100 reply->put_path_length = htonl(frc->put_path_length);
1101 reply->unique_id = record->unique_id;
1102 reply->expiration = GNUNET_TIME_absolute_hton(frc->expiration);
1104 paths = (struct GNUNET_PeerIdentity *)&reply[1];
1105 GNUNET_memcpy(paths,
1107 sizeof(struct GNUNET_PeerIdentity) * frc->put_path_length);
1108 GNUNET_memcpy(&paths[frc->put_path_length],
1110 sizeof(struct GNUNET_PeerIdentity) * frc->get_path_length);
1111 GNUNET_memcpy(&paths[frc->get_path_length + frc->put_path_length],
1114 LOG(GNUNET_ERROR_TYPE_DEBUG,
1115 "Sending reply to query %s for client %p\n",
1117 record->ch->client);
1118 GNUNET_MQ_send(record->ch->mq,
1120 if (GNUNET_YES == do_free)
1121 remove_client_record(record);
1127 * Handle a reply we've received from another peer. If the reply
1128 * matches any of our pending queries, forward it to the respective
1131 * @param expiration when will the reply expire
1132 * @param key the query this reply is for
1133 * @param get_path_length number of peers in @a get_path
1134 * @param get_path path the reply took on get
1135 * @param put_path_length number of peers in @a put_path
1136 * @param put_path path the reply took on put
1137 * @param type type of the reply
1138 * @param data_size number of bytes in @a data
1139 * @param data application payload data
1142 GDS_CLIENTS_handle_reply(struct GNUNET_TIME_Absolute expiration,
1143 const struct GNUNET_HashCode *key,
1144 unsigned int get_path_length,
1145 const struct GNUNET_PeerIdentity *get_path,
1146 unsigned int put_path_length,
1147 const struct GNUNET_PeerIdentity *put_path,
1148 enum GNUNET_BLOCK_Type type,
1152 struct ForwardReplyContext frc;
1155 msize = sizeof(struct GNUNET_DHT_ClientResultMessage) + data_size +
1156 (get_path_length + put_path_length) * sizeof(struct GNUNET_PeerIdentity);
1157 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
1162 if (NULL == GNUNET_CONTAINER_multihashmap_get(forward_map,
1165 LOG(GNUNET_ERROR_TYPE_DEBUG,
1166 "No matching client for reply for key %s\n",
1168 GNUNET_STATISTICS_update(GDS_stats,
1169 gettext_noop("# REPLIES ignored for CLIENTS (no match)"),
1172 return; /* no matching request, fast exit! */
1174 frc.expiration = expiration;
1175 frc.get_path = get_path;
1176 frc.put_path = put_path;
1178 frc.data_size = data_size;
1179 frc.get_path_length = get_path_length;
1180 frc.put_path_length = put_path_length;
1182 LOG(GNUNET_ERROR_TYPE_DEBUG,
1183 "Forwarding reply for key %s to client\n",
1185 GNUNET_CONTAINER_multihashmap_get_multiple(forward_map,
1193 * Check if some client is monitoring GET messages and notify
1194 * them in that case.
1196 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1197 * @param type The type of data in the request.
1198 * @param hop_count Hop count so far.
1199 * @param path_length number of entries in path (or 0 if not recorded).
1200 * @param path peers on the GET path (or NULL if not recorded).
1201 * @param desired_replication_level Desired replication level.
1202 * @param key Key of the requested data.
1205 GDS_CLIENTS_process_get(uint32_t options,
1206 enum GNUNET_BLOCK_Type type,
1208 uint32_t desired_replication_level,
1209 unsigned int path_length,
1210 const struct GNUNET_PeerIdentity *path,
1211 const struct GNUNET_HashCode * key)
1213 struct ClientMonitorRecord *m;
1214 struct ClientHandle **cl;
1215 unsigned int cl_size;
1219 for (m = monitor_head; NULL != m; m = m->next)
1221 if (((GNUNET_BLOCK_TYPE_ANY == m->type) ||
1222 (m->type == type)) &&
1223 ((NULL == m->key) ||
1226 sizeof(struct GNUNET_HashCode)))))
1228 struct GNUNET_MQ_Envelope *env;
1229 struct GNUNET_DHT_MonitorGetMessage *mmsg;
1230 struct GNUNET_PeerIdentity *msg_path;
1234 /* Don't send duplicates */
1235 for (i = 0; i < cl_size; i++)
1240 GNUNET_array_append(cl,
1244 msize = path_length * sizeof(struct GNUNET_PeerIdentity);
1245 env = GNUNET_MQ_msg_extra(mmsg,
1247 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1248 mmsg->options = htonl(options);
1249 mmsg->type = htonl(type);
1250 mmsg->hop_count = htonl(hop_count);
1251 mmsg->desired_replication_level = htonl(desired_replication_level);
1252 mmsg->get_path_length = htonl(path_length);
1254 msg_path = (struct GNUNET_PeerIdentity *)&mmsg[1];
1255 GNUNET_memcpy(msg_path,
1257 path_length * sizeof(struct GNUNET_PeerIdentity));
1258 GNUNET_MQ_send(m->ch->mq,
1262 GNUNET_free_non_null(cl);
1267 * Check if some client is monitoring GET RESP messages and notify
1268 * them in that case.
1270 * @param type The type of data in the result.
1271 * @param get_path Peers on GET path (or NULL if not recorded).
1272 * @param get_path_length number of entries in get_path.
1273 * @param put_path peers on the PUT path (or NULL if not recorded).
1274 * @param put_path_length number of entries in get_path.
1275 * @param exp Expiration time of the data.
1276 * @param key Key of the data.
1277 * @param data Pointer to the result data.
1278 * @param size Number of bytes in @a data.
1281 GDS_CLIENTS_process_get_resp(enum GNUNET_BLOCK_Type type,
1282 const struct GNUNET_PeerIdentity *get_path,
1283 unsigned int get_path_length,
1284 const struct GNUNET_PeerIdentity *put_path,
1285 unsigned int put_path_length,
1286 struct GNUNET_TIME_Absolute exp,
1287 const struct GNUNET_HashCode * key,
1291 struct ClientMonitorRecord *m;
1292 struct ClientHandle **cl;
1293 unsigned int cl_size;
1297 for (m = monitor_head; NULL != m; m = m->next)
1299 if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1301 memcmp(key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1303 struct GNUNET_MQ_Envelope *env;
1304 struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1305 struct GNUNET_PeerIdentity *path;
1309 /* Don't send duplicates */
1310 for (i = 0; i < cl_size; i++)
1315 GNUNET_array_append(cl,
1320 msize += (get_path_length + put_path_length)
1321 * sizeof(struct GNUNET_PeerIdentity);
1322 env = GNUNET_MQ_msg_extra(mmsg,
1324 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1325 mmsg->type = htonl(type);
1326 mmsg->put_path_length = htonl(put_path_length);
1327 mmsg->get_path_length = htonl(get_path_length);
1328 mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1330 path = (struct GNUNET_PeerIdentity *)&mmsg[1];
1333 put_path_length * sizeof(struct GNUNET_PeerIdentity));
1336 get_path_length * sizeof(struct GNUNET_PeerIdentity));
1337 GNUNET_memcpy(&path[get_path_length],
1340 GNUNET_MQ_send(m->ch->mq,
1344 GNUNET_free_non_null(cl);
1349 * Check if some client is monitoring PUT messages and notify
1350 * them in that case.
1352 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1353 * @param type The type of data in the request.
1354 * @param hop_count Hop count so far.
1355 * @param path_length number of entries in path (or 0 if not recorded).
1356 * @param path peers on the PUT path (or NULL if not recorded).
1357 * @param desired_replication_level Desired replication level.
1358 * @param exp Expiration time of the data.
1359 * @param key Key under which data is to be stored.
1360 * @param data Pointer to the data carried.
1361 * @param size Number of bytes in data.
1364 GDS_CLIENTS_process_put(uint32_t options,
1365 enum GNUNET_BLOCK_Type type,
1367 uint32_t desired_replication_level,
1368 unsigned int path_length,
1369 const struct GNUNET_PeerIdentity *path,
1370 struct GNUNET_TIME_Absolute exp,
1371 const struct GNUNET_HashCode *key,
1375 struct ClientMonitorRecord *m;
1376 struct ClientHandle **cl;
1377 unsigned int cl_size;
1381 for (m = monitor_head; NULL != m; m = m->next)
1383 if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1385 memcmp(key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1387 struct GNUNET_MQ_Envelope *env;
1388 struct GNUNET_DHT_MonitorPutMessage *mmsg;
1389 struct GNUNET_PeerIdentity *msg_path;
1393 /* Don't send duplicates */
1394 for (i = 0; i < cl_size; i++)
1399 GNUNET_array_append(cl,
1404 msize += path_length * sizeof(struct GNUNET_PeerIdentity);
1405 env = GNUNET_MQ_msg_extra(mmsg,
1407 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1408 mmsg->options = htonl(options);
1409 mmsg->type = htonl(type);
1410 mmsg->hop_count = htonl(hop_count);
1411 mmsg->desired_replication_level = htonl(desired_replication_level);
1412 mmsg->put_path_length = htonl(path_length);
1414 mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1415 msg_path = (struct GNUNET_PeerIdentity *)&mmsg[1];
1416 GNUNET_memcpy(msg_path,
1418 path_length * sizeof(struct GNUNET_PeerIdentity));
1419 GNUNET_memcpy(&msg_path[path_length],
1422 GNUNET_MQ_send(m->ch->mq,
1426 GNUNET_free_non_null(cl);
1431 * Initialize client subsystem.
1433 * @param server the initialized server
1439 = GNUNET_CONTAINER_multihashmap_create(1024,
1442 = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
1447 * Shutdown client subsystem.
1452 if (NULL != retry_task)
1454 GNUNET_SCHEDULER_cancel(retry_task);
1461 * Define "main" method using service macro.
1463 * @param name name of the service, i.e. "dht" or "xdht"
1464 * @param run name of the initializaton method for the service
1466 #define GDS_DHT_SERVICE_INIT(name, run) \
1467 GNUNET_SERVICE_MAIN \
1469 GNUNET_SERVICE_OPTION_NONE, \
1471 &client_connect_cb, \
1472 &client_disconnect_cb, \
1474 GNUNET_MQ_hd_var_size(dht_local_put, \
1475 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1476 struct GNUNET_DHT_ClientPutMessage, \
1478 GNUNET_MQ_hd_var_size(dht_local_get, \
1479 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1480 struct GNUNET_DHT_ClientGetMessage, \
1482 GNUNET_MQ_hd_fixed_size(dht_local_get_stop, \
1483 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1484 struct GNUNET_DHT_ClientGetStopMessage, \
1486 GNUNET_MQ_hd_fixed_size(dht_local_monitor, \
1487 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1488 struct GNUNET_DHT_MonitorStartStopMessage, \
1490 GNUNET_MQ_hd_fixed_size(dht_local_monitor_stop, \
1491 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1492 struct GNUNET_DHT_MonitorStartStopMessage, \
1494 GNUNET_MQ_hd_var_size(dht_local_get_result_seen, \
1495 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1496 struct GNUNET_DHT_ClientGetResultSeenMessage, \
1498 GNUNET_MQ_handler_end())
1502 * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1504 void __attribute__ ((destructor))
1507 if (NULL != retry_heap)
1509 GNUNET_assert(0 == GNUNET_CONTAINER_heap_get_size(retry_heap));
1510 GNUNET_CONTAINER_heap_destroy(retry_heap);
1513 if (NULL != forward_map)
1515 GNUNET_assert(0 == GNUNET_CONTAINER_multihashmap_size(forward_map));
1516 GNUNET_CONTAINER_multihashmap_destroy(forward_map);
1521 /* end of gnunet-service-dht_clients.c */