2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
28 #include "gnunet_bandwidth_lib.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_container_lib.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_server_lib.h"
36 #include "gnunet_time_lib.h"
37 #include "gnunet_dht_service.h"
40 #define DEBUG_DHT_API GNUNET_YES
42 struct PendingMessages
45 * Linked list of pending messages
47 struct PendingMessages *next;
50 * Message that is pending
52 struct GNUNET_MessageHeader *msg;
55 * Timeout for this message
57 struct GNUNET_TIME_Relative timeout;
62 * Connection to the DHT service.
64 struct GNUNET_DHT_Handle
69 struct GNUNET_SCHEDULER_Handle *sched;
72 * Configuration to use.
74 const struct GNUNET_CONFIGURATION_Handle *cfg;
77 * Socket (if available).
79 struct GNUNET_CLIENT_Connection *client;
82 * Currently pending transmission request.
84 struct GNUNET_CLIENT_TransmitHandle *th;
87 * List of the currently pending messages for the DHT service.
89 struct PendingMessages *pending_list;
92 * Message we are currently sending.
94 struct PendingMessages *current;
97 * Hash map containing the current outstanding get requests
99 struct GNUNET_CONTAINER_MultiHashMap *outstanding_get_requests;
102 * Hash map containing the current outstanding put requests, awaiting
105 struct GNUNET_CONTAINER_MultiHashMap *outstanding_put_requests;
108 * Kill off the connection and any pending messages.
114 static struct GNUNET_TIME_Relative default_request_timeout;
116 /* Forward declaration */
117 static void process_pending_message(struct GNUNET_DHT_Handle *handle);
120 * Handler for messages received from the DHT service
121 * a demultiplexer which handles numerous message types
124 void service_message_handler (void *cls,
125 const struct GNUNET_MessageHeader *msg)
128 /* TODO: find out message type, handle callbacks for different types of messages.
129 * Should be a put acknowledgment, get data or find node result. */
134 * Initialize the connection with the DHT service.
136 * @param cfg configuration to use
137 * @param sched scheduler to use
138 * @return NULL on error
140 struct GNUNET_DHT_Handle *
141 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
142 const struct GNUNET_CONFIGURATION_Handle *cfg)
144 struct GNUNET_DHT_Handle *handle;
146 handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_Handle));
148 default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
150 handle->sched = sched;
151 handle->pending_list = NULL;
152 handle->current = NULL;
153 handle->do_destroy = GNUNET_NO;
156 handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg);
157 handle->outstanding_get_requests = GNUNET_CONTAINER_multihashmap_create(100); /* FIXME: better number */
158 handle->outstanding_put_requests = GNUNET_CONTAINER_multihashmap_create(100); /* FIXME: better number */
159 if (handle->client == NULL)
162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
163 "`%s': Connection to service in progress\n", "DHT API");
165 GNUNET_CLIENT_receive (handle->client,
166 &service_message_handler,
167 handle, GNUNET_TIME_UNIT_FOREVER_REL);
174 * Shutdown connection with the DHT service.
176 * @param h connection to shut down
179 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
181 struct PendingMessages *pos;
183 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
184 "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
186 GNUNET_assert(handle != NULL);
188 if (handle->th != NULL) /* We have a live transmit request in the Aether */
190 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
193 if (handle->current != NULL) /* We are trying to send something now, clean it up */
194 GNUNET_free(handle->current);
196 while (NULL != (pos = handle->pending_list)) /* Remove all pending sends from the list */
198 handle->pending_list = pos->next;
201 if (handle->client != NULL) /* Finally, disconnect from the service */
203 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
204 handle->client = NULL;
207 GNUNET_free (handle);
212 * Handle to control a GET operation.
214 struct GNUNET_DHT_GetHandle
218 * Key that this get request is for
223 * Type of data get request was for
228 * Iterator to call on data receipt
230 GNUNET_DHT_Iterator iter;
233 * Closure for the iterator callback
239 * Handle for a PUT request, holds callback
241 struct GNUNET_DHT_PutHandle
244 * Key that this get request is for
249 * Type of data get request was for
254 * Continuation to call on put send
256 GNUNET_SCHEDULER_Task cont;
259 * Send continuation cls
265 * Send complete (or failed), schedule next (or don't)
268 finish (struct GNUNET_DHT_Handle *handle, int code)
270 /* TODO: if code is not GNUNET_OK, do something! */
271 struct PendingMessages *pos = handle->current;
272 struct GNUNET_DHT_GetMessage *get;
273 struct GNUNET_DHT_PutMessage *put;
275 GNUNET_assert(pos != NULL);
277 switch (ntohs(pos->msg->type))
279 case GNUNET_MESSAGE_TYPE_DHT_GET:
280 get = (struct GNUNET_DHT_GetMessage *)pos->msg;
283 case GNUNET_MESSAGE_TYPE_DHT_PUT:
284 put = (struct GNUNET_DHT_PutMessage *)pos->msg;
291 handle->current = NULL;
293 if (code != GNUNET_SYSERR)
294 process_pending_message (handle);
300 * Transmit the next pending message, called by notify_transmit_ready
303 transmit_pending (void *cls, size_t size, void *buf)
305 struct GNUNET_DHT_Handle *handle = cls;
312 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313 "`%s': In transmit_pending buf is NULL\n", "DHT API");
315 /* FIXME: free associated resources or summat */
316 finish(handle, GNUNET_SYSERR);
323 if (handle->current != NULL)
325 tsize = ntohs(handle->current->msg->size);
329 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
330 "`%s': Sending message size %d\n", "DHT API", tsize);
332 memcpy(buf, handle->current->msg, tsize);
345 * Try to (re)connect to the dht service.
347 * @return GNUNET_YES on success, GNUNET_NO on failure.
350 try_connect (struct GNUNET_DHT_Handle *ret)
352 if (ret->client != NULL)
354 ret->client = GNUNET_CLIENT_connect (ret->sched, "dht", ret->cfg);
355 if (ret->client != NULL)
358 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
359 _("Failed to connect to the dht service!\n"));
366 * Try to send messages from list of messages to send
368 static void process_pending_message(struct GNUNET_DHT_Handle *handle)
371 if (handle->current != NULL)
372 return; /* action already pending */
373 if (GNUNET_YES != try_connect (handle))
375 finish (handle, GNUNET_SYSERR);
379 /* TODO: set do_destroy somewhere's, see what needs to happen in that case! */
380 if (handle->do_destroy)
382 //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
385 /* schedule next action */
386 handle->current = handle->pending_list;
387 if (NULL == handle->current)
391 handle->pending_list = handle->pending_list->next;
392 handle->current->next = NULL;
395 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
396 ntohs(handle->current->msg->size),
397 handle->current->timeout,
399 &transmit_pending, handle)))
402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
403 "Failed to transmit request to dht service.\n");
405 finish (handle, GNUNET_SYSERR);
408 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
409 "`%s': Scheduled sending message of size %d to service\n", "DHT API", ntohs(handle->current->msg->size));
414 * Add a pending message to the linked list of messages which need to be sent
416 * @param handle handle to the specified DHT api
417 * @param msg the message to add to the list
419 static void add_pending(struct GNUNET_DHT_Handle *handle, struct GNUNET_MessageHeader *msg)
421 struct PendingMessages *new_message;
422 struct PendingMessages *pos;
423 struct PendingMessages *last;
425 new_message = GNUNET_malloc(sizeof(struct PendingMessages));
426 new_message->msg = msg;
427 new_message->timeout = default_request_timeout;
429 if (handle->pending_list != NULL)
431 pos = handle->pending_list;
437 new_message->next = last->next; /* Should always be null */
438 last->next = new_message;
442 new_message->next = handle->pending_list; /* Will always be null */
443 handle->pending_list = new_message;
446 process_pending_message(handle);
450 * Perform an asynchronous GET operation on the DHT identified.
452 * @param h handle to the DHT service
453 * @param type expected type of the response object
454 * @param key the key to look up
455 * @param iter function to call on each result
456 * @param iter_cls closure for iter
457 * @return handle to stop the async get
459 struct GNUNET_DHT_GetHandle *
460 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
462 const GNUNET_HashCode * key,
463 GNUNET_DHT_Iterator iter,
466 struct GNUNET_DHT_GetMessage *get_msg;
467 struct GNUNET_DHT_GetHandle *get_handle;
469 get_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_get_requests, key);
471 if (get_handle != NULL)
474 * A get has been previously sent, return existing handle.
475 * FIXME: should we re-transmit the request to the DHT service?
480 get_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetHandle));
481 get_handle->type = type;
482 memcpy(&get_handle->key, key, sizeof(GNUNET_HashCode));
483 get_handle->iter = iter;
484 get_handle->iter_cls = iter_cls;
486 GNUNET_CONTAINER_multihashmap_put(handle->outstanding_get_requests, key, get_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
488 get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
489 get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
490 get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
491 get_msg->type = htonl(type);
492 memcpy(&get_msg->key, key, sizeof(GNUNET_HashCode));
494 add_pending(handle, &get_msg->header);
501 * Stop async DHT-get. Frees associated resources.
503 * @param record GET operation to stop.
506 GNUNET_DHT_get_stop (struct GNUNET_DHT_Handle *handle, struct GNUNET_DHT_GetHandle *get_handle)
508 struct GNUNET_DHT_GetMessage *get_msg;
510 if (handle->do_destroy == GNUNET_NO)
512 get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
513 get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_STOP);
514 get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
515 get_msg->type = htonl(get_handle->type);
516 memcpy(&get_msg->key, &get_handle->key, sizeof(GNUNET_HashCode));
518 add_pending(handle, &get_msg->header);
521 GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_get_requests, &get_handle->key, get_handle) == GNUNET_YES);
522 GNUNET_free(get_handle);
527 * Perform a PUT operation storing data in the DHT.
529 * @param h handle to DHT service
530 * @param key the key to store under
531 * @param type type of the value
532 * @param size number of bytes in data; must be less than 64k
533 * @param data the data to store
534 * @param exp desired expiration time for the value
535 * @param cont continuation to call when done;
536 * reason will be TIMEOUT on error,
537 * reason will be PREREQ_DONE on success
538 * @param cont_cls closure for cont
540 * @return GNUNET_YES if put message is queued for transmission
542 int GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
543 const GNUNET_HashCode * key,
547 struct GNUNET_TIME_Relative exp,
548 GNUNET_SCHEDULER_Task cont,
551 struct GNUNET_DHT_PutMessage *put_msg;
552 struct GNUNET_DHT_PutHandle *put_handle;
555 put_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_put_requests, key);
557 if (put_handle != NULL)
560 * A put has been previously queued, but not yet sent.
561 * FIXME: change the continuation function and callback or something?
566 put_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_PutHandle));
567 put_handle->type = type;
568 memcpy(&put_handle->key, key, sizeof(GNUNET_HashCode));
570 GNUNET_CONTAINER_multihashmap_put(handle->outstanding_put_requests, key, put_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
572 msize = sizeof(struct GNUNET_DHT_PutMessage) + size;
573 put_msg = GNUNET_malloc(msize);
574 put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
575 put_msg->header.size = htons(msize);
576 put_msg->type = htonl(type);
577 memcpy(&put_msg->key, key, sizeof(GNUNET_HashCode));
578 memcpy(&put_msg[1], data, size);
580 add_pending(handle, &put_msg->header);