2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file dht/gnunet-service-dht.c
23 * @brief main DHT service shell, building block for DHT implementations
24 * @author Christian Grothoff
25 * @author Nathan Evans
29 #include "gnunet_client_lib.h"
30 #include "gnunet_getopt_lib.h"
31 #include "gnunet_os_lib.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_service_lib.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_signal_lib.h"
36 #include "gnunet_util_lib.h"
37 #include "gnunet_datacache_lib.h"
41 * Handle to the datacache service (for inserting/retrieving data)
43 struct GNUNET_DATACACHE_Handle *datacache;
46 * The main scheduler to use for the DHT service
48 static struct GNUNET_SCHEDULER_Handle *sched;
51 * The configuration the DHT service is running with
53 static const struct GNUNET_CONFIGURATION_Handle *cfg;
56 * Timeout for transmissions to clients
58 static struct GNUNET_TIME_Relative client_transmit_timeout;
61 * Handle to the core service
63 static struct GNUNET_CORE_Handle *coreAPI;
66 * The identity of our peer.
68 static struct GNUNET_PeerIdentity my_identity;
71 * Task to run when we shut down, cleaning up all our trash
73 static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
77 * Linked list of messages to send to clients.
82 * Pointer to next item in the list
84 struct PendingMessage *next;
87 * Actual message to be sent
89 struct GNUNET_MessageHeader *msg;
94 * Struct containing information about a client,
95 * handle to connect to it, and any pending messages
96 * that need to be sent to it.
101 * Linked list of active clients
103 struct ClientList *next;
106 * The handle to this client
108 struct GNUNET_SERVER_Client *client_handle;
111 * Handle to the current transmission request, NULL
114 struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
117 * Linked list of pending messages for this client
119 struct PendingMessage *pending_head;
124 * Context for handling results from a get request.
126 struct DatacacheGetContext
129 * The client to send the result to.
131 struct ClientList *client;
134 * The unique id of this request
136 unsigned long long unique_id;
140 * Context containing information about a DHT message received.
142 struct DHT_MessageContext
145 * The client this request was received from.
147 struct ClientList *client;
150 * The key this request was about
152 GNUNET_HashCode *key;
155 * The unique identifier of this request
157 unsigned long long unique_id;
160 * Desired replication level
165 * Any message options for this request
171 * List of active clients.
173 static struct ClientList *client_list;
177 * Server handlers for handling locally received dht requests
180 handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
181 const struct GNUNET_MessageHeader *message);
184 handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client,
185 const struct GNUNET_MessageHeader *message);
187 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
188 {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0},
189 {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0},
195 * Core handler for p2p dht get requests.
197 static int handle_dht_p2p_get (void *cls,
198 const struct GNUNET_PeerIdentity *peer,
199 const struct GNUNET_MessageHeader *message,
200 struct GNUNET_TIME_Relative latency,
204 * Core handler for p2p dht put requests.
206 static int handle_dht_p2p_put (void *cls,
207 const struct GNUNET_PeerIdentity *peer,
208 const struct GNUNET_MessageHeader *message,
209 struct GNUNET_TIME_Relative latency,
213 * Core handler for p2p dht find peer requests.
215 static int handle_dht_p2p_find_peer (void *cls,
216 const struct GNUNET_PeerIdentity *peer,
217 const struct GNUNET_MessageHeader
219 struct GNUNET_TIME_Relative latency,
222 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
223 {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_GET, 0},
224 {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_PUT, 0},
225 {&handle_dht_p2p_find_peer, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0},
230 * Forward declaration.
232 static size_t send_generic_reply (void *cls, size_t size, void *buf);
235 * Task run to check for messages that need to be sent to a client.
237 * @param cls a ClientList, containing the client and any messages to be sent to it
238 * @param tc reason this was called
241 process_pending_messages (void *cls,
242 const struct GNUNET_SCHEDULER_TaskContext *tc)
244 struct ClientList *client = cls;
246 if (client->pending_head == NULL) /* No messages queued */
249 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
250 "`%s': Have no pending messages for client.\n", "DHT");
255 if (client->transmit_handle == NULL) /* No current pending messages, we can try to send! */
256 client->transmit_handle =
257 GNUNET_SERVER_notify_transmit_ready (client->client_handle,
258 ntohs (client->pending_head->msg->
260 GNUNET_TIME_relative_multiply
261 (GNUNET_TIME_UNIT_SECONDS, 5),
262 &send_generic_reply, client);
266 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
267 "`%s': Transmit handle is non-null.\n", "DHT");
273 * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
274 * request. A ClientList is passed as closure, take the head of the list
275 * and copy it into buf, which has the result of sending the message to the
278 * @param cls closure to this call
279 * @param size maximum number of bytes available to send
280 * @param buf where to copy the actual message to
282 * @return the number of bytes actually copied, 0 indicates failure
285 send_generic_reply (void *cls, size_t size, void *buf)
287 struct ClientList *client = cls;
288 struct PendingMessage *reply = client->pending_head;
291 client->transmit_handle = NULL;
292 if (buf == NULL) /* Message timed out, that's crappy... */
295 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': buffer was NULL\n", "DHT");
297 client->pending_head = reply->next;
298 GNUNET_free (reply->msg);
303 if (size >= ntohs (reply->msg->size))
306 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
307 "`%s': Copying reply to buffer, REALLY SENT\n", "DHT");
309 memcpy (buf, reply->msg, ntohs (reply->msg->size));
311 ret = ntohs (reply->msg->size);
316 client->pending_head = reply->next;
317 GNUNET_free (reply->msg);
320 GNUNET_SCHEDULER_add_now (sched, &process_pending_messages, client);
325 * Add a PendingMessage to the clients list of messages to be sent
327 * @param client the active client to send the message to
328 * @param pending_message the actual message to send
331 add_pending_message (struct ClientList *client,
332 struct PendingMessage *pending_message)
334 struct PendingMessage *pos;
335 struct PendingMessage *prev;
337 pos = client->pending_head;
340 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
341 "`%s': Adding pending message for client.\n", "DHT");
346 client->pending_head = pending_message;
348 else /* This means another request is already queued, rely on send_reply to process all pending messages */
350 while (pos != NULL) /* Find end of list */
356 GNUNET_assert (prev != NULL);
357 prev->next = pending_message;
360 GNUNET_SCHEDULER_add_now (sched, &process_pending_messages, client);
365 * Called when a reply needs to be sent to a client, either as
366 * a result it found to a GET or FIND PEER request.
368 * @param client the client to send the reply to
369 * @param message the encapsulated message to send
370 * @param uid the unique identifier of this request
373 send_reply_to_client (struct ClientList *client,
374 struct GNUNET_MessageHeader *message,
375 unsigned long long uid)
377 struct GNUNET_DHT_Message *reply;
378 struct PendingMessage *pending_message;
383 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384 "`%s': Sending reply to client.\n", "DHT");
386 msize = ntohs (message->size);
387 tsize = sizeof (struct GNUNET_DHT_Message) + msize;
388 reply = GNUNET_malloc (tsize);
389 reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT);
390 reply->header.size = htons (tsize);
392 reply->unique = htons (GNUNET_YES);
393 reply->unique_id = GNUNET_htonll (uid);
394 memcpy (&reply[1], message, msize);
396 pending_message = GNUNET_malloc (sizeof (struct PendingMessage));
397 pending_message->msg = &reply->header;
398 pending_message->next = NULL; /* We insert at the end of the list */
400 add_pending_message (client, pending_message);
405 * Iterator for local get request results,
407 * @param cls closure for iterator, a DatacacheGetContext
408 * @param exp when does this value expire?
409 * @param key the key this data is stored under
410 * @param size the size of the data identified by key
411 * @param data the actual data
412 * @param type the type of the data
414 * @return GNUNET_OK to continue iteration, anything else
418 datacache_get_iterator (void *cls,
419 struct GNUNET_TIME_Absolute exp,
420 const GNUNET_HashCode * key,
421 uint32_t size, const char *data, uint32_t type)
423 struct DatacacheGetContext *datacache_get_ctx = cls;
424 struct GNUNET_DHT_GetResultMessage *get_result;
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427 "`%s': Received `%s' response from datacache\n", "DHT", "GET");
430 GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
431 get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
432 get_result->header.size =
433 htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
434 get_result->data_size = htons (size);
435 get_result->expiration = exp;
436 memcpy (&get_result->key, key, sizeof (GNUNET_HashCode));
437 get_result->type = htons (type);
438 memcpy (&get_result[1], data, size);
440 send_reply_to_client (datacache_get_ctx->client, &get_result->header,
441 datacache_get_ctx->unique_id);
443 GNUNET_free (get_result);
448 * Server handler for initiating local dht get requests
450 * @param cls closure for service
451 * @param get_msg the actual get message
452 * @param message_context struct containing pertinent information about the get request
456 handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg,
457 struct DHT_MessageContext *message_context)
460 GNUNET_HashCode get_key;
463 unsigned int results;
464 struct DatacacheGetContext *datacache_get_context;
466 GNUNET_assert (ntohs (get_msg->header.size) >=
467 sizeof (struct GNUNET_DHT_GetMessage));
468 get_type = ntohs (get_msg->type);
471 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
472 "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
473 "DHT", "GET", get_type, GNUNET_h2s (&get_key),
474 message_context->unique_id);
477 datacache_get_context = GNUNET_malloc (sizeof (struct DatacacheGetContext));
478 datacache_get_context->client = message_context->client;
479 datacache_get_context->unique_id = message_context->unique_id;
482 if (datacache != NULL)
484 GNUNET_DATACACHE_get (datacache, message_context->key, get_type,
485 &datacache_get_iterator, datacache_get_context);
488 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489 "`%s': Found %d results for local `%s' request\n", "DHT",
492 GNUNET_free (datacache_get_context);
493 /* FIXME: Implement get functionality here */
498 * Server handler for initiating local dht find peer requests
500 * @param cls closure for service
501 * @param find_msg the actual find peer message
502 * @param message_context struct containing pertinent information about the request
506 handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg,
507 struct DHT_MessageContext *message_context)
510 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
511 "`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
512 "DHT", "FIND PEER", GNUNET_h2s (message_context->key),
513 ntohs (find_msg->header.size),
514 sizeof (struct GNUNET_DHT_FindPeerMessage));
517 GNUNET_assert (ntohs (find_msg->header.size) >=
518 sizeof (struct GNUNET_DHT_FindPeerMessage));
520 /* FIXME: Implement find peer functionality here */
525 * Server handler for initiating local dht put requests
527 * @param cls closure for service
528 * @param put_msg the actual put message
529 * @param message_context struct containing pertinent information about the request
532 handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg,
533 struct DHT_MessageContext *message_context)
538 GNUNET_assert (ntohs (put_msg->header.size) >=
539 sizeof (struct GNUNET_DHT_PutMessage));
541 put_type = ntohs (put_msg->type);
542 data_size = ntohs (put_msg->data_size);
544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545 "`%s': %s msg total size is %d, data size %d, struct size %d\n",
546 "DHT", "PUT", ntohs (put_msg->header.size), data_size,
547 sizeof (struct GNUNET_DHT_PutMessage));
549 GNUNET_assert (ntohs (put_msg->header.size) ==
550 sizeof (struct GNUNET_DHT_PutMessage) + data_size);
553 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554 "`%s': Received `%s' request from client, message type %d, key %s\n",
555 "DHT", "PUT", put_type, GNUNET_h2s (message_context->key));
559 * Simplest DHT functionality, store any message we receive a put request for.
561 if (datacache != NULL)
562 GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
563 (char *) &put_msg[1], put_type,
564 put_msg->expiration);
566 * FIXME: Implement dht put request functionality here!
573 * Find a client if it exists, add it otherwise.
575 * @param client the server handle to the client
577 * @return the client if found, a new client otherwise
579 static struct ClientList *
580 find_active_client (struct GNUNET_SERVER_Client *client)
582 struct ClientList *pos = client_list;
583 struct ClientList *ret;
587 if (pos->client_handle == client)
592 ret = GNUNET_malloc (sizeof (struct ClientList));
593 ret->client_handle = client;
594 ret->next = client_list;
596 ret->pending_head = NULL;
602 * Construct a message receipt confirmation for a particular uid.
603 * Receipt confirmations are used for any requests that don't expect
604 * a reply otherwise (i.e. put requests, stop requests).
606 * @param client the handle for the client
607 * @param uid the unique identifier of this message
610 send_client_receipt_confirmation (struct GNUNET_SERVER_Client *client,
613 struct GNUNET_DHT_StopMessage *confirm_message;
614 struct ClientList *active_client;
615 struct PendingMessage *pending_message;
618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619 "`%s': Sending receipt confirmation for uid %llu\n", "DHT",
622 confirm_message = GNUNET_malloc (sizeof (struct GNUNET_DHT_StopMessage));
623 confirm_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
624 confirm_message->header.size =
625 htons (sizeof (struct GNUNET_DHT_StopMessage));
626 confirm_message->unique_id = GNUNET_htonll (uid);
628 active_client = find_active_client (client);
629 pending_message = GNUNET_malloc (sizeof (struct PendingMessage));
630 pending_message->msg = &confirm_message->header;
632 add_pending_message (active_client, pending_message);
637 * Handler for any generic DHT messages, calls the appropriate handler
638 * depending on message type, sends confirmation if responses aren't otherwise
641 * @param cls closure for the service
642 * @param client the client we received this message from
643 * @param message the actual message received
646 handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
647 const struct GNUNET_MessageHeader *message)
649 struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *) message;
650 struct GNUNET_MessageHeader *enc_msg;
651 struct DHT_MessageContext *message_context;
655 enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
656 enc_type = ntohs (enc_msg->type);
660 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
661 "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
662 "DHT", "GENERIC", enc_type, GNUNET_h2s (&dht_msg->key),
663 GNUNET_ntohll (dht_msg->unique_id));
666 message_context = GNUNET_malloc (sizeof (struct DHT_MessageContext));
667 message_context->client = find_active_client (client);
668 message_context->key = &dht_msg->key;
669 message_context->unique_id = GNUNET_ntohll (dht_msg->unique_id);
670 message_context->replication = ntohs (dht_msg->desired_replication_level);
671 message_context->msg_options = ntohs (dht_msg->options);
675 case GNUNET_MESSAGE_TYPE_DHT_GET:
676 handle_dht_get (cls, (struct GNUNET_DHT_GetMessage *) enc_msg,
679 case GNUNET_MESSAGE_TYPE_DHT_PUT:
680 handle_dht_put (cls, (struct GNUNET_DHT_PutMessage *) enc_msg,
682 send_client_receipt_confirmation (client,
683 GNUNET_ntohll (dht_msg->unique_id));
685 case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
686 handle_dht_find_peer (cls,
687 (struct GNUNET_DHT_FindPeerMessage *) enc_msg,
691 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
692 "`%s': Message type (%d) not handled\n", "DHT", enc_type);
695 GNUNET_free (message_context);
696 GNUNET_SERVER_receive_done (client, GNUNET_OK);
701 * Handler for any generic DHT stop messages, calls the appropriate handler
702 * depending on message type, sends confirmation by default (stop messages
703 * do not otherwise expect replies)
705 * @param cls closure for the service
706 * @param client the client we received this message from
707 * @param message the actual message received
709 * TODO: add demultiplexing for stop message types.
712 handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client,
713 const struct GNUNET_MessageHeader *message)
715 struct GNUNET_DHT_StopMessage *dht_stop_msg =
716 (struct GNUNET_DHT_StopMessage *) message;
719 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720 "`%s': Received `%s' request from client, uid %llu\n", "DHT",
721 "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
724 /* TODO: Put in demultiplexing here */
726 send_client_receipt_confirmation (client,
727 GNUNET_ntohll (dht_stop_msg->unique_id));
728 GNUNET_SERVER_receive_done (client, GNUNET_OK);
733 * Core handler for p2p dht get requests.
736 handle_dht_p2p_get (void *cls,
737 const struct GNUNET_PeerIdentity *peer,
738 const struct GNUNET_MessageHeader *message,
739 struct GNUNET_TIME_Relative latency, uint32_t distance)
742 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
743 "`%s': Received `%s' request from another peer\n", "DHT",
751 * Core handler for p2p dht put requests.
754 handle_dht_p2p_put (void *cls,
755 const struct GNUNET_PeerIdentity *peer,
756 const struct GNUNET_MessageHeader *message,
757 struct GNUNET_TIME_Relative latency, uint32_t distance)
760 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
761 "`%s': Received `%s' request from another peer\n", "DHT",
769 * Core handler for p2p dht find peer requests.
772 handle_dht_p2p_find_peer (void *cls,
773 const struct GNUNET_PeerIdentity *peer,
774 const struct GNUNET_MessageHeader *message,
775 struct GNUNET_TIME_Relative latency,
779 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
780 "`%s': Received `%s' request from another peer\n", "DHT",
788 * Task run during shutdown.
794 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
796 GNUNET_CORE_disconnect (coreAPI);
800 * To be called on core init/fail.
802 * @param cls service closure
803 * @param server handle to the server for this service
804 * @param identity the public identity of this peer
805 * @param publicKey the public key of this peer
808 core_init (void *cls,
809 struct GNUNET_CORE_Handle *server,
810 const struct GNUNET_PeerIdentity *identity,
811 const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
816 GNUNET_SCHEDULER_cancel (sched, cleanup_task);
817 GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
821 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822 "%s: Core connection initialized, I am peer: %s\n", "dht",
823 GNUNET_i2s (identity));
825 /* Copy our identity so we can use it */
826 memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
827 /* Set the server to local variable */
832 * Process dht requests.
835 * @param scheduler scheduler to use
836 * @param server the initialized server
837 * @param c configuration to use
841 struct GNUNET_SCHEDULER_Handle *scheduler,
842 struct GNUNET_SERVER_Handle *server,
843 const struct GNUNET_CONFIGURATION_Handle *c)
848 datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache");
850 client_transmit_timeout =
851 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
852 GNUNET_SERVER_add_handlers (server, plugin_handlers);
854 coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
855 cfg, /* Main configuration */
856 client_transmit_timeout, /* Delay for connecting */
857 NULL, /* FIXME: anything we want to pass around? */
858 &core_init, /* Call core_init once connected */
859 NULL, /* Don't care about pre-connects */
860 NULL, /* Don't care about connects */
861 NULL, /* Don't care about disconnects */
862 NULL, /* Don't want notified about all incoming messages */
863 GNUNET_NO, /* For header only inbound notification */
864 NULL, /* Don't want notified about all outbound messages */
865 GNUNET_NO, /* For header only outbound notification */
866 core_handlers); /* Register these handlers */
871 /* Scheduled the task to clean up when shutdown is called */
872 cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
873 GNUNET_TIME_UNIT_FOREVER_REL,
874 &shutdown_task, NULL);
879 * The main function for the dht service.
881 * @param argc number of arguments from the command line
882 * @param argv command line arguments
883 * @return 0 ok, 1 on error
886 main (int argc, char *const *argv)
889 GNUNET_SERVICE_run (argc,
892 GNUNET_SERVICE_OPTION_NONE,
893 &run, NULL)) ? 0 : 1;