dht api, shell dht service, base of future test case.... not yet working
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27 #include "platform.h"
28 #include "gnunet_bandwidth_lib.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_container_lib.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_server_lib.h"
36 #include "gnunet_time_lib.h"
37 #include "gnunet_dht_service.h"
38 #include "dht.h"
39
40 #define DEBUG_DHT_API GNUNET_YES
41
42 struct PendingMessages
43 {
44   /**
45    * Linked list of pending messages
46    */
47   struct PendingMessages *next;
48
49   /**
50    * Message that is pending
51    */
52   struct GNUNET_MessageHeader *msg;
53
54   /**
55    * Timeout for this message
56    */
57   struct GNUNET_TIME_Relative timeout;
58
59 };
60
61 /**
62  * Connection to the DHT service.
63  */
64 struct GNUNET_DHT_Handle
65 {
66   /**
67    * Our scheduler.
68    */
69   struct GNUNET_SCHEDULER_Handle *sched;
70
71   /**
72    * Configuration to use.
73    */
74   const struct GNUNET_CONFIGURATION_Handle *cfg;
75
76   /**
77    * Socket (if available).
78    */
79   struct GNUNET_CLIENT_Connection *client;
80
81   /**
82    * Currently pending transmission request.
83    */
84   struct GNUNET_CLIENT_TransmitHandle *th;
85
86   /**
87    * List of the currently pending messages for the DHT service.
88    */
89   struct PendingMessages *pending_list;
90
91   /**
92    * Message we are currently sending.
93    */
94   struct PendingMessages *current;
95
96   /**
97    * Hash map containing the current outstanding get requests
98    */
99   struct GNUNET_CONTAINER_MultiHashMap *outstanding_get_requests;
100
101   /**
102    * Hash map containing the current outstanding put requests, awaiting
103    * a response
104    */
105   struct GNUNET_CONTAINER_MultiHashMap *outstanding_put_requests;
106
107   /**
108    * Kill off the connection and any pending messages.
109    */
110   int do_destroy;
111
112 };
113
114 static struct GNUNET_TIME_Relative default_request_timeout;
115
116 /* Forward declaration */
117 static void process_pending_message(struct GNUNET_DHT_Handle *handle);
118
119 /**
120  * Handler for messages received from the DHT service
121  * a demultiplexer which handles numerous message types
122  *
123  */
124 void service_message_handler (void *cls,
125                               const struct GNUNET_MessageHeader *msg)
126 {
127
128   /* TODO: find out message type, handle callbacks for different types of messages.
129    * Should be a put acknowledgment, get data or find node result. */
130 }
131
132
133 /**
134  * Initialize the connection with the DHT service.
135  *
136  * @param cfg configuration to use
137  * @param sched scheduler to use
138  * @return NULL on error
139  */
140 struct GNUNET_DHT_Handle *
141 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
142                     const struct GNUNET_CONFIGURATION_Handle *cfg)
143 {
144   struct GNUNET_DHT_Handle *handle;
145
146   handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_Handle));
147
148   default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
149   handle->cfg = cfg;
150   handle->sched = sched;
151   handle->pending_list = NULL;
152   handle->current = NULL;
153   handle->do_destroy = GNUNET_NO;
154   handle->th = NULL;
155
156   handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg);
157   handle->outstanding_get_requests = GNUNET_CONTAINER_multihashmap_create(100); /* FIXME: better number */
158   handle->outstanding_put_requests = GNUNET_CONTAINER_multihashmap_create(100); /* FIXME: better number */
159   if (handle->client == NULL)
160     return NULL;
161 #if DEBUG_DHT_API
162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
163               "`%s': Connection to service in progress\n", "DHT API");
164 #endif
165   GNUNET_CLIENT_receive (handle->client,
166                          &service_message_handler,
167                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
168
169   return handle;
170 }
171
172
173 /**
174  * Shutdown connection with the DHT service.
175  *
176  * @param h connection to shut down
177  */
178 void
179 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
180 {
181   struct PendingMessages *pos;
182 #if DEBUG_DHT_API
183   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
184               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
185 #endif
186   GNUNET_assert(handle != NULL);
187
188   if (handle->th != NULL) /* We have a live transmit request in the Aether */
189     {
190       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
191       handle->th = NULL;
192     }
193   if (handle->current != NULL) /* We are trying to send something now, clean it up */
194     GNUNET_free(handle->current);
195
196   while (NULL != (pos = handle->pending_list)) /* Remove all pending sends from the list */
197     {
198       handle->pending_list = pos->next;
199       GNUNET_free(pos);
200     }
201   if (handle->client != NULL) /* Finally, disconnect from the service */
202     {
203       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
204       handle->client = NULL;
205     }
206
207   GNUNET_free (handle);
208 }
209
210
211 /**
212  * Handle to control a GET operation.
213  */
214 struct GNUNET_DHT_GetHandle
215 {
216
217   /**
218    * Key that this get request is for
219    */
220   GNUNET_HashCode key;
221
222   /**
223    * Type of data get request was for
224    */
225   uint32_t type;
226
227   /**
228    * Iterator to call on data receipt
229    */
230   GNUNET_DHT_Iterator iter;
231
232   /**
233    * Closure for the iterator callback
234    */
235   void *iter_cls;
236 };
237
238 /**
239  * Handle for a PUT request, holds callback
240  */
241 struct GNUNET_DHT_PutHandle
242 {
243   /**
244    * Key that this get request is for
245    */
246   GNUNET_HashCode key;
247
248   /**
249    * Type of data get request was for
250    */
251   uint32_t type;
252
253   /**
254    * Continuation to call on put send
255    */
256   GNUNET_SCHEDULER_Task cont;
257
258   /**
259    * Send continuation cls
260    */
261   void *cont_cls;
262 };
263
264 /**
265  * Send complete (or failed), schedule next (or don't)
266  */
267 static void
268 finish (struct GNUNET_DHT_Handle *handle, int code)
269 {
270   /* TODO: if code is not GNUNET_OK, do something! */
271   struct PendingMessages *pos = handle->current;
272   struct GNUNET_DHT_GetMessage *get;
273   struct GNUNET_DHT_PutMessage *put;
274
275   GNUNET_assert(pos != NULL);
276
277   switch (ntohs(pos->msg->type))
278   {
279     case GNUNET_MESSAGE_TYPE_DHT_GET:
280       get = (struct GNUNET_DHT_GetMessage *)pos->msg;
281       GNUNET_free(get);
282       break;
283     case GNUNET_MESSAGE_TYPE_DHT_PUT:
284       put = (struct GNUNET_DHT_PutMessage *)pos->msg;
285       GNUNET_free(put);
286       break;
287     default:
288       GNUNET_break(0);
289   }
290
291   handle->current = NULL;
292
293   if (code != GNUNET_SYSERR)
294     process_pending_message (handle);
295
296   GNUNET_free(pos);
297 }
298
299 /**
300  * Transmit the next pending message, called by notify_transmit_ready
301  */
302 static size_t
303 transmit_pending (void *cls, size_t size, void *buf)
304 {
305   struct GNUNET_DHT_Handle *handle = cls;
306   size_t ret;
307   size_t tsize;
308
309   if (buf == NULL)
310     {
311 #if DEBUG_DHT_API
312       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
314 #endif
315       /* FIXME: free associated resources or summat */
316       finish(handle, GNUNET_SYSERR);
317       return 0;
318     }
319
320   handle->th = NULL;
321   ret = 0;
322
323   if (handle->current != NULL)
324   {
325     tsize = ntohs(handle->current->msg->size);
326     if (size >= tsize)
327     {
328 #if DEBUG_DHT_API
329       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
330                   "`%s': Sending message size %d\n", "DHT API", tsize);
331 #endif
332       memcpy(buf, handle->current->msg, tsize);
333     }
334     else
335     {
336       return ret;
337     }
338   }
339
340   return ret;
341 }
342
343
344 /**
345  * Try to (re)connect to the dht service.
346  *
347  * @return GNUNET_YES on success, GNUNET_NO on failure.
348  */
349 static int
350 try_connect (struct GNUNET_DHT_Handle *ret)
351 {
352   if (ret->client != NULL)
353     return GNUNET_OK;
354   ret->client = GNUNET_CLIENT_connect (ret->sched, "dht", ret->cfg);
355   if (ret->client != NULL)
356     return GNUNET_YES;
357 #if DEBUG_STATISTICS
358   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
359               _("Failed to connect to the dht service!\n"));
360 #endif
361   return GNUNET_NO;
362 }
363
364
365 /**
366  * Try to send messages from list of messages to send
367  */
368 static void process_pending_message(struct GNUNET_DHT_Handle *handle)
369 {
370
371   if (handle->current != NULL)
372     return;                     /* action already pending */
373   if (GNUNET_YES != try_connect (handle))
374     {
375       finish (handle, GNUNET_SYSERR);
376       return;
377     }
378
379   /* TODO: set do_destroy somewhere's, see what needs to happen in that case! */
380   if (handle->do_destroy)
381     {
382       //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
383     }
384
385   /* schedule next action */
386   handle->current = handle->pending_list;
387   if (NULL == handle->current)
388     {
389       return;
390     }
391   handle->pending_list = handle->pending_list->next;
392   handle->current->next = NULL;
393
394   if (NULL ==
395       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
396                                                     ntohs(handle->current->msg->size),
397                                                     handle->current->timeout,
398                                                     GNUNET_YES,
399                                                     &transmit_pending, handle)))
400     {
401 #if DEBUG_DHT_API
402       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
403                   "Failed to transmit request to dht service.\n");
404 #endif
405       finish (handle, GNUNET_SYSERR);
406     }
407 #if DEBUG_DHT_API
408   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
409               "`%s': Scheduled sending message of size %d to service\n", "DHT API", ntohs(handle->current->msg->size));
410 #endif
411 }
412
413 /**
414  * Add a pending message to the linked list of messages which need to be sent
415  *
416  * @param handle handle to the specified DHT api
417  * @param msg the message to add to the list
418  */
419 static void add_pending(struct GNUNET_DHT_Handle *handle, struct GNUNET_MessageHeader *msg)
420 {
421   struct PendingMessages *new_message;
422   struct PendingMessages *pos;
423   struct PendingMessages *last;
424
425   new_message = GNUNET_malloc(sizeof(struct PendingMessages));
426   new_message->msg = msg;
427   new_message->timeout = default_request_timeout;
428
429   if (handle->pending_list != NULL)
430     {
431       pos = handle->pending_list;
432       while(pos != NULL)
433         {
434           last = pos;
435           pos = pos->next;
436         }
437       new_message->next = last->next; /* Should always be null */
438       last->next = new_message;
439     }
440   else
441     {
442       new_message->next = handle->pending_list; /* Will always be null */
443       handle->pending_list = new_message;
444     }
445
446   process_pending_message(handle);
447 }
448
449 /**
450  * Perform an asynchronous GET operation on the DHT identified.
451  *
452  * @param h handle to the DHT service
453  * @param type expected type of the response object
454  * @param key the key to look up
455  * @param iter function to call on each result
456  * @param iter_cls closure for iter
457  * @return handle to stop the async get
458  */
459 struct GNUNET_DHT_GetHandle *
460 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
461                       uint32_t type,
462                       const GNUNET_HashCode * key,
463                       GNUNET_DHT_Iterator iter,
464                       void *iter_cls)
465 {
466   struct GNUNET_DHT_GetMessage *get_msg;
467   struct GNUNET_DHT_GetHandle *get_handle;
468
469   get_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_get_requests, key);
470
471   if (get_handle != NULL)
472     {
473       /*
474        * A get has been previously sent, return existing handle.
475        * FIXME: should we re-transmit the request to the DHT service?
476        */
477       return get_handle;
478     }
479
480   get_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetHandle));
481   get_handle->type = type;
482   memcpy(&get_handle->key, key, sizeof(GNUNET_HashCode));
483   get_handle->iter = iter;
484   get_handle->iter_cls = iter_cls;
485
486   GNUNET_CONTAINER_multihashmap_put(handle->outstanding_get_requests, key, get_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
487
488   get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
489   get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
490   get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
491   get_msg->type = htonl(type);
492   memcpy(&get_msg->key, key, sizeof(GNUNET_HashCode));
493
494   add_pending(handle, &get_msg->header);
495
496   return get_handle;
497 }
498
499
500 /**
501  * Stop async DHT-get.  Frees associated resources.
502  *
503  * @param record GET operation to stop.
504  */
505 void
506 GNUNET_DHT_get_stop (struct GNUNET_DHT_Handle *handle, struct GNUNET_DHT_GetHandle *get_handle)
507 {
508   struct GNUNET_DHT_GetMessage *get_msg;
509
510   if (handle->do_destroy == GNUNET_NO)
511     {
512       get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
513       get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_STOP);
514       get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
515       get_msg->type = htonl(get_handle->type);
516       memcpy(&get_msg->key, &get_handle->key, sizeof(GNUNET_HashCode));
517
518       add_pending(handle, &get_msg->header);
519     }
520
521   GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_get_requests, &get_handle->key, get_handle) == GNUNET_YES);
522   GNUNET_free(get_handle);
523 }
524
525
526 /**
527  * Perform a PUT operation storing data in the DHT.
528  *
529  * @param h handle to DHT service
530  * @param key the key to store under
531  * @param type type of the value
532  * @param size number of bytes in data; must be less than 64k
533  * @param data the data to store
534  * @param exp desired expiration time for the value
535  * @param cont continuation to call when done;
536  *             reason will be TIMEOUT on error,
537  *             reason will be PREREQ_DONE on success
538  * @param cont_cls closure for cont
539  *
540  * @return GNUNET_YES if put message is queued for transmission
541  */
542 int GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
543                     const GNUNET_HashCode * key,
544                     uint32_t type,
545                     uint32_t size,
546                     const char *data,
547                     struct GNUNET_TIME_Relative exp,
548                     GNUNET_SCHEDULER_Task cont,
549                     void *cont_cls)
550 {
551   struct GNUNET_DHT_PutMessage *put_msg;
552   struct GNUNET_DHT_PutHandle *put_handle;
553   size_t msize;
554
555   put_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_put_requests, key);
556
557   if (put_handle != NULL)
558     {
559       /*
560        * A put has been previously queued, but not yet sent.
561        * FIXME: change the continuation function and callback or something?
562        */
563       return GNUNET_NO;
564     }
565
566   put_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_PutHandle));
567   put_handle->type = type;
568   memcpy(&put_handle->key, key, sizeof(GNUNET_HashCode));
569
570   GNUNET_CONTAINER_multihashmap_put(handle->outstanding_put_requests, key, put_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
571
572   msize = sizeof(struct GNUNET_DHT_PutMessage) + size;
573   put_msg = GNUNET_malloc(msize);
574   put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
575   put_msg->header.size = htons(msize);
576   put_msg->type = htonl(type);
577   memcpy(&put_msg->key, key, sizeof(GNUNET_HashCode));
578   memcpy(&put_msg[1], data, size);
579
580   add_pending(handle, &put_msg->header);
581
582   return GNUNET_YES;
583 }