2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file dht/gnunet-service-dht.c
23 * @brief main DHT service shell, building block for DHT implementations
24 * @author Christian Grothoff
25 * @author Nathan Evans
29 #include "gnunet_client_lib.h"
30 #include "gnunet_getopt_lib.h"
31 #include "gnunet_os_lib.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_service_lib.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_signal_lib.h"
36 #include "gnunet_util_lib.h"
37 #include "gnunet_datacache_lib.h"
38 #include "gnunet_transport_service.h"
39 #include "gnunet_hello_lib.h"
43 * Handle to the datacache service (for inserting/retrieving data)
45 struct GNUNET_DATACACHE_Handle *datacache;
48 * The main scheduler to use for the DHT service
50 static struct GNUNET_SCHEDULER_Handle *sched;
53 * The configuration the DHT service is running with
55 static const struct GNUNET_CONFIGURATION_Handle *cfg;
58 * Handle to the core service
60 static struct GNUNET_CORE_Handle *coreAPI;
63 * Handle to the transport service, for getting our hello
65 static struct GNUNET_TRANSPORT_Handle *transport_handle;
68 * The identity of our peer.
70 static struct GNUNET_PeerIdentity my_identity;
73 * Short id of the peer, for printing
75 static char *my_short_id;
80 static struct GNUNET_MessageHeader *my_hello;
83 * Task to run when we shut down, cleaning up all our trash
85 static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
89 * Linked list of messages to send to clients.
94 * Pointer to next item in the list
96 struct PendingMessage *next;
99 * Pointer to previous item in the list
101 struct PendingMessage *prev;
104 * Actual message to be sent; // avoid allocation
106 const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
111 * Struct containing information about a client,
112 * handle to connect to it, and any pending messages
113 * that need to be sent to it.
118 * Linked list of active clients
120 struct ClientList *next;
123 * The handle to this client
125 struct GNUNET_SERVER_Client *client_handle;
128 * Handle to the current transmission request, NULL
131 struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
134 * Linked list of pending messages for this client
136 struct PendingMessage *pending_head;
139 * Tail of linked list of pending messages for this client
141 struct PendingMessage *pending_tail;
146 * Context for handling results from a get request.
148 struct DatacacheGetContext
151 * The client to send the result to.
153 struct ClientList *client;
156 * The unique id of this request
158 unsigned long long unique_id;
162 * Context containing information about a DHT message received.
164 struct DHT_MessageContext
167 * The client this request was received from.
169 struct ClientList *client;
172 * The key this request was about
174 const GNUNET_HashCode *key;
177 * The unique identifier of this request
179 unsigned long long unique_id;
182 * Desired replication level
187 * Any message options for this request
193 * List of active clients.
195 static struct ClientList *client_list;
198 * Forward declaration.
200 static size_t send_generic_reply (void *cls, size_t size, void *buf);
204 * Task run to check for messages that need to be sent to a client.
206 * @param client a ClientList, containing the client and any messages to be sent to it
209 process_pending_messages (struct ClientList *client)
211 if (client->pending_head == NULL)
213 if (client->transmit_handle != NULL)
215 client->transmit_handle =
216 GNUNET_SERVER_notify_transmit_ready (client->client_handle,
217 ntohs (client->pending_head->msg->
219 GNUNET_TIME_UNIT_FOREVER_REL,
220 &send_generic_reply, client);
224 * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
225 * request. A ClientList is passed as closure, take the head of the list
226 * and copy it into buf, which has the result of sending the message to the
229 * @param cls closure to this call
230 * @param size maximum number of bytes available to send
231 * @param buf where to copy the actual message to
233 * @return the number of bytes actually copied, 0 indicates failure
236 send_generic_reply (void *cls, size_t size, void *buf)
238 struct ClientList *client = cls;
240 struct PendingMessage *reply;
244 client->transmit_handle = NULL;
247 /* client disconnected */
251 while ( (NULL != (reply = client->pending_head)) &&
252 (size >= off + (msize = ntohs (reply->msg->size))))
254 GNUNET_CONTAINER_DLL_remove (client->pending_head,
255 client->pending_tail,
257 memcpy (&cbuf[off], reply->msg, msize);
261 process_pending_messages (client);
267 * Add a PendingMessage to the clients list of messages to be sent
269 * @param client the active client to send the message to
270 * @param pending_message the actual message to send
273 add_pending_message (struct ClientList *client,
274 struct PendingMessage *pending_message)
276 GNUNET_CONTAINER_DLL_insert_after (client->pending_head,
277 client->pending_tail,
278 client->pending_tail,
280 process_pending_messages (client);
285 * Called when a reply needs to be sent to a client, as
286 * a result it found to a GET or FIND PEER request.
288 * @param client the client to send the reply to
289 * @param message the encapsulated message to send
290 * @param uid the unique identifier of this request
293 send_reply_to_client (struct ClientList *client,
294 const struct GNUNET_MessageHeader *message,
295 unsigned long long uid)
297 struct GNUNET_DHT_RouteResultMessage *reply;
298 struct PendingMessage *pending_message;
302 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
303 "`%s': Sending reply to client.\n", "DHT");
305 msize = ntohs (message->size);
306 tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize;
307 if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
313 pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
314 pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
315 reply = (struct GNUNET_DHT_RouteResultMessage *)&pending_message[1];
316 reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT);
317 reply->header.size = htons (tsize);
318 reply->unique_id = GNUNET_htonll (uid);
319 memcpy (&reply[1], message, msize);
321 add_pending_message (client, pending_message);
326 * Iterator for local get request results,
328 * @param cls closure for iterator, a DatacacheGetContext
329 * @param exp when does this value expire?
330 * @param key the key this data is stored under
331 * @param size the size of the data identified by key
332 * @param data the actual data
333 * @param type the type of the data
335 * @return GNUNET_OK to continue iteration, anything else
339 datacache_get_iterator (void *cls,
340 struct GNUNET_TIME_Absolute exp,
341 const GNUNET_HashCode * key,
342 uint32_t size, const char *data, uint32_t type)
344 struct DatacacheGetContext *datacache_get_ctx = cls;
345 struct GNUNET_DHT_GetResultMessage *get_result;
347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
348 "`%s': Received `%s' response from datacache\n", "DHT", "GET");
351 GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
352 get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
353 get_result->header.size =
354 htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
355 get_result->expiration = exp;
356 memcpy (&get_result->key, key, sizeof (GNUNET_HashCode));
357 get_result->type = htons (type);
358 memcpy (&get_result[1], data, size);
359 send_reply_to_client (datacache_get_ctx->client, &get_result->header,
360 datacache_get_ctx->unique_id);
361 GNUNET_free (get_result);
367 * Server handler for initiating local dht get requests
369 * @param cls closure for service
370 * @param msg the actual get message
371 * @param message_context struct containing pertinent information about the get request
374 handle_dht_get (void *cls,
375 const struct GNUNET_MessageHeader *msg,
376 struct DHT_MessageContext *message_context)
378 const struct GNUNET_DHT_GetMessage *get_msg;
380 unsigned int results;
381 struct DatacacheGetContext datacache_get_context;
383 get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
384 if (ntohs (get_msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage))
390 get_type = ntohs (get_msg->type);
392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
393 "`%s': Received `%s' request from client, message type %u, key %s, uid %llu\n",
394 "DHT", "GET", get_type, GNUNET_h2s (message_context->key),
395 message_context->unique_id);
397 datacache_get_context.client = message_context->client;
398 datacache_get_context.unique_id = message_context->unique_id;
400 if (datacache != NULL)
402 GNUNET_DATACACHE_get (datacache, message_context->key, get_type,
403 &datacache_get_iterator, &datacache_get_context);
404 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
405 "`%s': Found %d results for local `%s' request\n", "DHT",
411 * Server handler for initiating local dht find peer requests
413 * @param cls closure for service
414 * @param find_msg the actual find peer message
415 * @param message_context struct containing pertinent information about the request
419 handle_dht_find_peer (void *cls,
420 const struct GNUNET_MessageHeader *find_msg,
421 struct DHT_MessageContext *message_context)
423 struct GNUNET_MessageHeader *find_peer_result;
428 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
429 "`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
430 "DHT", "FIND PEER", GNUNET_h2s (message_context->key),
431 ntohs (find_msg->size),
432 sizeof (struct GNUNET_MessageHeader));
434 if (my_hello == NULL)
437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438 "`%s': Our HELLO is null, can't return.\n",
443 /* Simplistic find_peer functionality, always return our hello */
444 hello_size = ntohs(my_hello->size);
445 tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
447 find_peer_result = GNUNET_malloc (tsize);
448 find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
449 find_peer_result->size = htons (tsize);
450 memcpy (&find_peer_result[1], my_hello, hello_size);
452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453 "`%s': Sending hello size %d to client.\n",
456 send_reply_to_client(message_context->client, find_peer_result, message_context->unique_id);
457 GNUNET_free(find_peer_result);
462 * Server handler for initiating local dht put requests
464 * @param cls closure for service
465 * @param msg the actual put message
466 * @param message_context struct containing pertinent information about the request
469 handle_dht_put (void *cls,
470 const struct GNUNET_MessageHeader *msg,
471 struct DHT_MessageContext *message_context)
473 struct GNUNET_DHT_PutMessage *put_msg;
477 GNUNET_assert (ntohs (msg->size) >=
478 sizeof (struct GNUNET_DHT_PutMessage));
479 put_msg = (struct GNUNET_DHT_PutMessage *)msg;
480 put_type = ntohs (put_msg->type);
481 data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
483 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
484 "`%s': Received `%s' request from client, message type %d, key %s\n",
485 "DHT", "PUT", put_type, GNUNET_h2s (message_context->key));
487 if (datacache != NULL)
488 GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
489 (char *) &put_msg[1], put_type,
490 GNUNET_TIME_absolute_ntoh(put_msg->expiration));
492 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
493 "`%s': %s request received locally, but have no datacache!\n",
499 * Find a client if it exists, add it otherwise.
501 * @param client the server handle to the client
503 * @return the client if found, a new client otherwise
505 static struct ClientList *
506 find_active_client (struct GNUNET_SERVER_Client *client)
508 struct ClientList *pos = client_list;
509 struct ClientList *ret;
513 if (pos->client_handle == client)
518 ret = GNUNET_malloc (sizeof (struct ClientList));
519 ret->client_handle = client;
520 ret->next = client_list;
526 * Handler for any generic DHT messages, calls the appropriate handler
527 * depending on message type, sends confirmation if responses aren't otherwise
530 * @param cls closure for the service
531 * @param client the client we received this message from
532 * @param message the actual message received
535 handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
536 const struct GNUNET_MessageHeader *message)
538 const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message;
539 const struct GNUNET_MessageHeader *enc_msg;
540 struct DHT_MessageContext *message_context;
544 enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
545 enc_type = ntohs (enc_msg->type);
549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550 "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
551 "DHT", "GENERIC", enc_type, GNUNET_h2s (&dht_msg->key),
552 GNUNET_ntohll (dht_msg->unique_id));
555 message_context = GNUNET_malloc (sizeof (struct DHT_MessageContext));
556 message_context->client = find_active_client (client);
557 message_context->key = &dht_msg->key;
558 message_context->unique_id = GNUNET_ntohll (dht_msg->unique_id);
559 message_context->replication = ntohl (dht_msg->desired_replication_level);
560 message_context->msg_options = ntohl (dht_msg->options);
562 /* TODO: Steps to be added by students */
563 /* FIXME: Implement *remote* DHT operations here (forward request) */
564 /* Implement generic route function and call here. */
565 /* FIXME: *IF* handling should be local, then do this: */
566 /* 1. find if this peer is closest based on whatever metric the DHT uses
567 * 2. if this peer is closest _OR_ the message options indicate it should
568 * be processed everywhere _AND_ we want it processed everywhere, then
571 handle_locally = GNUNET_YES;
572 if (handle_locally == GNUNET_YES)
576 case GNUNET_MESSAGE_TYPE_DHT_GET:
577 handle_dht_get (cls, enc_msg,
580 case GNUNET_MESSAGE_TYPE_DHT_PUT:
581 handle_dht_put (cls, enc_msg,
584 case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
585 handle_dht_find_peer (cls,
590 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
591 "`%s': Message type (%d) not handled\n", "DHT", enc_type);
594 GNUNET_free (message_context);
595 GNUNET_SERVER_receive_done (client, GNUNET_OK);
600 * Handler for any generic DHT stop messages, calls the appropriate handler
601 * depending on message type (if processed locally)
603 * @param cls closure for the service
604 * @param client the client we received this message from
605 * @param message the actual message received
607 * TODO: once message are remembered by unique id, add code to
611 handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client,
612 const struct GNUNET_MessageHeader *message)
614 const struct GNUNET_DHT_StopMessage *dht_stop_msg =
615 (const struct GNUNET_DHT_StopMessage *) message;
618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619 "`%s': Received `%s' request from client, uid %llu\n", "DHT",
620 "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
623 uid = GNUNET_ntohll(dht_stop_msg->unique_id);
624 /* TODO: actually stop... free associated resources for the request
625 * lookup request by uid and remove state. */
627 GNUNET_SERVER_receive_done (client, GNUNET_OK);
632 * Core handler for p2p route requests.
635 handle_dht_p2p_route_request (void *cls,
636 const struct GNUNET_PeerIdentity *peer,
637 const struct GNUNET_MessageHeader *message,
638 struct GNUNET_TIME_Relative latency, uint32_t distance)
641 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642 "`%s': Received `%s' request from another peer\n", "DHT",
645 // FIXME: setup tracking for sending replies to peer (with timeout)
646 // FIXME: call code from handle_dht_start_message (refactor...)
652 * Core handler for p2p route results.
655 handle_dht_p2p_route_result (void *cls,
656 const struct GNUNET_PeerIdentity *peer,
657 const struct GNUNET_MessageHeader *message,
658 struct GNUNET_TIME_Relative latency, uint32_t distance)
661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
662 "`%s': Received `%s' request from another peer\n", "DHT",
665 // FIXME: setup tracking for sending replies to peer
666 // FIXME: possibly call code from handle_dht_stop_message? (unique result?) (refactor...)
672 * Receive the HELLO from transport service,
673 * free current and replace if necessary.
676 * @param message HELLO message of peer
679 process_hello (void *cls, const struct GNUNET_MessageHeader *message)
682 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
683 "Received our `%s' from transport service\n",
687 GNUNET_assert (message != NULL);
688 GNUNET_free_non_null(my_hello);
689 my_hello = GNUNET_malloc(ntohs(message->size));
690 memcpy(my_hello, message, ntohs(message->size));
694 * Task run during shutdown.
700 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
702 if (transport_handle != NULL)
704 GNUNET_free_non_null(my_hello);
705 GNUNET_TRANSPORT_get_hello_cancel(transport_handle, &process_hello, NULL);
706 GNUNET_TRANSPORT_disconnect(transport_handle);
709 GNUNET_CORE_disconnect (coreAPI);
710 if (datacache != NULL)
711 GNUNET_DATACACHE_destroy (datacache);
712 if (my_short_id != NULL)
713 GNUNET_free(my_short_id);
718 * To be called on core init/fail.
720 * @param cls service closure
721 * @param server handle to the server for this service
722 * @param identity the public identity of this peer
723 * @param publicKey the public key of this peer
726 core_init (void *cls,
727 struct GNUNET_CORE_Handle *server,
728 const struct GNUNET_PeerIdentity *identity,
729 const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
735 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
736 "%s: Connection to core FAILED!\n", "dht",
737 GNUNET_i2s (identity));
739 GNUNET_SCHEDULER_cancel (sched, cleanup_task);
740 GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
744 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745 "%s: Core connection initialized, I am peer: %s\n", "dht",
746 GNUNET_i2s (identity));
748 /* Copy our identity so we can use it */
749 memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
750 my_short_id = GNUNET_strdup(GNUNET_i2s(&my_identity));
751 /* Set the server to local variable */
756 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
757 {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT_ROUTE, 0},
758 {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0},
763 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
764 {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_ROUTE, 0},
765 {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT, 0},
770 * Method called whenever a peer connects.
773 * @param peer peer identity this notification is about
774 * @param latency reported latency of the connection with peer
775 * @param distance reported distance (DV) to peer
777 void handle_core_connect (void *cls,
778 const struct GNUNET_PeerIdentity * peer,
779 struct GNUNET_TIME_Relative latency,
784 size_t data_size = 42;
785 data = GNUNET_malloc (data_size);
786 memset (data, 43, data_size);
789 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
790 "%s:%s Receives core connect message for peer %s distance %d!\n", my_short_id, "dht", GNUNET_i2s(peer), distance);
792 if (datacache != NULL)
794 ret = GNUNET_DATACACHE_put (datacache, &peer->hashPubKey, data_size,
796 GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get(),
797 GNUNET_TIME_UNIT_MINUTES));
799 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800 "%s Inserting data %s, type %d into datacache, return value was %d\n", my_short_id, GNUNET_h2s(&peer->hashPubKey), 130, ret);
806 * Process dht requests.
809 * @param scheduler scheduler to use
810 * @param server the initialized server
811 * @param c configuration to use
815 struct GNUNET_SCHEDULER_Handle *scheduler,
816 struct GNUNET_SERVER_Handle *server,
817 const struct GNUNET_CONFIGURATION_Handle *c)
821 datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache");
822 GNUNET_SERVER_add_handlers (server, plugin_handlers);
823 coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
824 cfg, /* Main configuration */
825 GNUNET_TIME_UNIT_FOREVER_REL,
826 NULL, /* FIXME: anything we want to pass around? */
827 &core_init, /* Call core_init once connected */
828 &handle_core_connect, /* Don't care about connects */
829 NULL, /* Don't care about disconnects */
830 NULL, /* Don't want notified about all incoming messages */
831 GNUNET_NO, /* For header only inbound notification */
832 NULL, /* Don't want notified about all outbound messages */
833 GNUNET_NO, /* For header only outbound notification */
834 core_handlers); /* Register these handlers */
837 transport_handle = GNUNET_TRANSPORT_connect(sched, cfg, NULL, NULL, NULL, NULL);
838 if (transport_handle != NULL)
839 GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
841 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to connect to transport service!\n");
842 /* Scheduled the task to clean up when shutdown is called */
843 cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
844 GNUNET_TIME_UNIT_FOREVER_REL,
845 &shutdown_task, NULL);
849 * The main function for the dht service.
851 * @param argc number of arguments from the command line
852 * @param argv command line arguments
853 * @return 0 ok, 1 on error
856 main (int argc, char *const *argv)
859 GNUNET_SERVICE_run (argc,
862 GNUNET_SERVICE_OPTION_NONE,
863 &run, NULL)) ? 0 : 1;