basic find peer functionality, added to test_dht_api testcase
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  */
28
29 #include "platform.h"
30 #include "gnunet_bandwidth_lib.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_container_lib.h"
34 #include "gnunet_arm_service.h"
35 #include "gnunet_hello_lib.h"
36 #include "gnunet_protocols.h"
37 #include "gnunet_server_lib.h"
38 #include "gnunet_time_lib.h"
39 #include "gnunet_dht_service.h"
40 #include "dht.h"
41
42 #define DEBUG_DHT_API GNUNET_NO
43
44 #define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
45
46 struct PendingMessage
47 {
48   /**
49    * Message that is pending
50    */
51   struct GNUNET_MessageHeader *msg;
52
53   /**
54    * Timeout for this message
55    */
56   struct GNUNET_TIME_Relative timeout;
57
58   /**
59    * Continuation to call on message send
60    * or message receipt confirmation
61    */
62   GNUNET_SCHEDULER_Task cont;
63
64   /**
65    * Continuation closure
66    */
67   void *cont_cls;
68
69   /**
70    * Whether or not to await verification the message
71    * was received by the service
72    */
73   size_t is_unique;
74
75   /**
76    * Unique ID for this request
77    */
78   uint64_t unique_id;
79
80 };
81
82 struct GNUNET_DHT_GetContext
83 {
84   /**
85    * Iterator to call on data receipt
86    */
87   GNUNET_DHT_GetIterator iter;
88
89   /**
90    * Closure for the iterator callback
91    */
92   void *iter_cls;
93
94 };
95
96 struct GNUNET_DHT_FindPeerContext
97 {
98   /**
99    * Iterator to call on data receipt
100    */
101   GNUNET_DHT_FindPeerProcessor proc;
102
103   /**
104    * Closure for the iterator callback
105    */
106   void *proc_cls;
107
108 };
109
110 /**
111  * Handle to control a unique operation (one that is
112  * expected to return results)
113  */
114 struct GNUNET_DHT_RouteHandle
115 {
116
117   /**
118    * Unique identifier for this request (for key collisions)
119    */
120   uint64_t uid;
121
122   /**
123    * Key that this get request is for
124    */
125   GNUNET_HashCode key;
126
127   /**
128    * Iterator to call on data receipt
129    */
130   GNUNET_DHT_ReplyProcessor iter;
131
132   /**
133    * Closure for the iterator callback
134    */
135   void *iter_cls;
136
137   /**
138    * Main handle to this DHT api
139    */
140   struct GNUNET_DHT_Handle *dht_handle;
141 };
142
143 /**
144  * Handle for a non unique request, holds callback
145  * which needs to be called before we allow other
146  * messages to be processed and sent to the DHT service
147  */
148 struct GNUNET_DHT_NonUniqueHandle
149 {
150   /**
151    * Key that this get request is for
152    */
153   GNUNET_HashCode key;
154
155   /**
156    * Type of data get request was for
157    */
158   uint32_t type;
159
160   /**
161    * Continuation to call on service
162    * confirmation of message receipt.
163    */
164   GNUNET_SCHEDULER_Task cont;
165
166   /**
167    * Send continuation cls
168    */
169   void *cont_cls;
170 };
171
172 /**
173  * Handle to control a get operation.
174  */
175 struct GNUNET_DHT_GetHandle
176 {
177   /**
178    * Handle to the actual route operation for the get
179    */
180   struct GNUNET_DHT_RouteHandle *route_handle;
181
182   /**
183    * The context of the get request
184    */
185   struct GNUNET_DHT_GetContext get_context;
186 };
187
188 /**
189  * Handle to control a find peer operation.
190  */
191 struct GNUNET_DHT_FindPeerHandle
192 {
193   /**
194      * Handle to the actual route operation for the request
195      */
196   struct GNUNET_DHT_RouteHandle *route_handle;
197
198     /**
199      * The context of the get request
200      */
201   struct GNUNET_DHT_FindPeerContext find_peer_context;
202 };
203
204
205 /**
206  * Connection to the DHT service.
207  */
208 struct GNUNET_DHT_Handle
209 {
210   /**
211    * Our scheduler.
212    */
213   struct GNUNET_SCHEDULER_Handle *sched;
214
215   /**
216    * Configuration to use.
217    */
218   const struct GNUNET_CONFIGURATION_Handle *cfg;
219
220   /**
221    * Socket (if available).
222    */
223   struct GNUNET_CLIENT_Connection *client;
224
225   /**
226    * Currently pending transmission request.
227    */
228   struct GNUNET_CLIENT_TransmitHandle *th;
229
230   /**
231    * Message we are currently sending, only allow
232    * a single message to be queued.  If not unique
233    * (typically a put request), await a confirmation
234    * from the service that the message was received.
235    * If unique, just fire and forget.
236    */
237   struct PendingMessage *current;
238
239   /**
240    * Hash map containing the current outstanding unique requests
241    */
242   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
243
244   /**
245    * Non unique handle.  If set don't schedule another non
246    * unique request.
247    */
248   struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
249
250   /**
251    * Kill off the connection and any pending messages.
252    */
253   int do_destroy;
254
255 };
256
257 static struct GNUNET_TIME_Relative default_request_timeout;
258
259 /* Forward declaration */
260 static void process_pending_message (struct GNUNET_DHT_Handle *handle);
261
262 static GNUNET_HashCode *
263 hash_from_uid (uint64_t uid)
264 {
265   int count;
266   int remaining;
267   GNUNET_HashCode *hash;
268   hash = GNUNET_malloc (sizeof (GNUNET_HashCode));
269   count = 0;
270
271   while (count < sizeof (GNUNET_HashCode))
272     {
273       remaining = sizeof (GNUNET_HashCode) - count;
274       if (remaining > sizeof (uid))
275         remaining = sizeof (uid);
276
277       memcpy (hash, &uid, remaining);
278       count += remaining;
279     }
280
281   return hash;
282 }
283
284 /**
285  * Handler for messages received from the DHT service
286  * a demultiplexer which handles numerous message types
287  *
288  */
289 void
290 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
291 {
292   struct GNUNET_DHT_Handle *handle = cls;
293   struct GNUNET_DHT_Message *dht_msg;
294   struct GNUNET_DHT_StopMessage *stop_msg;
295   struct GNUNET_MessageHeader *enc_msg;
296   struct GNUNET_DHT_RouteHandle *route_handle;
297   uint64_t uid;
298   GNUNET_HashCode *uid_hash;
299   size_t enc_size;
300   /* TODO: find out message type, handle callbacks for different types of messages.
301    * Should be a non unique acknowledgment, or unique result. */
302
303   if (msg == NULL)
304     {
305 #if DEBUG_DHT_API
306       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
307                   "`%s': Received NULL from server, connection down?\n",
308                   "DHT API");
309 #endif
310       return;
311     }
312
313   switch (ntohs (msg->type))
314     {
315     case GNUNET_MESSAGE_TYPE_DHT:
316       {
317         dht_msg = (struct GNUNET_DHT_Message *) msg;
318         uid = GNUNET_ntohll (dht_msg->unique_id);
319 #if DEBUG_DHT_API
320         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
321                     "`%s': Received response to message (uid %llu)\n",
322                     "DHT API", uid);
323 #endif
324         if (ntohs (dht_msg->unique))
325           {
326             uid_hash = hash_from_uid (uid);
327             route_handle =
328               GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
329                                                  uid_hash);
330             GNUNET_free (uid_hash);
331             if (route_handle == NULL)   /* We have no recollection of this request */
332               {
333 #if DEBUG_DHT_API
334                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
335                             "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
336                             "DHT API", uid);
337 #endif
338               }
339             else
340               {
341                 enc_size =
342                   ntohs (dht_msg->header.size) -
343                   sizeof (struct GNUNET_DHT_Message);
344                 GNUNET_assert (enc_size > 0);
345                 enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
346                 route_handle->iter (route_handle->iter_cls, enc_msg);
347
348               }
349           }
350         break;
351       }
352     case GNUNET_MESSAGE_TYPE_DHT_STOP:
353       {
354         stop_msg = (struct GNUNET_DHT_StopMessage *) msg;
355         uid = GNUNET_ntohll (stop_msg->unique_id);
356 #if DEBUG_DHT_API
357         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
358                     "`%s': Received response to message (uid %llu), current uid %llu\n",
359                     "DHT API", uid, handle->current->unique_id);
360 #endif
361         if (handle->current->unique_id == uid)
362           {
363 #if DEBUG_DHT_API
364             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365                         "`%s': Have pending confirmation for this message!\n",
366                         "DHT API", uid);
367 #endif
368             if (handle->current->cont != NULL)
369               GNUNET_SCHEDULER_add_continuation (handle->sched,
370                                                  handle->current->cont,
371                                                  handle->current->cont_cls,
372                                                  GNUNET_SCHEDULER_REASON_PREREQ_DONE);
373
374             GNUNET_free (handle->current->msg);
375             GNUNET_free (handle->current);
376             handle->current = NULL;
377           }
378         break;
379       }
380     default:
381       {
382         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
383                     "`%s': Received unknown message type %d\n", "DHT API",
384                     ntohs (msg->type));
385       }
386     }
387   GNUNET_CLIENT_receive (handle->client,
388                          &service_message_handler,
389                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
390
391 }
392
393
394 /**
395  * Initialize the connection with the DHT service.
396  *
397  * @param sched scheduler to use
398  * @param cfg configuration to use
399  * @param ht_len size of the internal hash table to use for
400  *               processing multiple GET/FIND requests in parallel
401  *
402  * @return handle to the DHT service, or NULL on error
403  */
404 struct GNUNET_DHT_Handle *
405 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
406                     const struct GNUNET_CONFIGURATION_Handle *cfg,
407                     unsigned int ht_len)
408 {
409   struct GNUNET_DHT_Handle *handle;
410
411   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
412
413   default_request_timeout =
414     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
415   handle->cfg = cfg;
416   handle->sched = sched;
417
418   handle->current = NULL;
419   handle->do_destroy = GNUNET_NO;
420   handle->th = NULL;
421
422   handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
423   handle->outstanding_requests =
424     GNUNET_CONTAINER_multihashmap_create (ht_len);
425
426   if (handle->client == NULL)
427     {
428       GNUNET_free (handle);
429       return NULL;
430     }
431 #if DEBUG_DHT_API
432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433               "`%s': Connection to service in progress\n", "DHT API");
434 #endif
435   GNUNET_CLIENT_receive (handle->client,
436                          &service_message_handler,
437                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
438
439   return handle;
440 }
441
442
443 /**
444  * Shutdown connection with the DHT service.
445  *
446  * @param handle handle of the DHT connection to stop
447  */
448 void
449 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
450 {
451 #if DEBUG_DHT_API
452   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
454 #endif
455   GNUNET_assert (handle != NULL);
456
457   if (handle->th != NULL)       /* We have a live transmit request in the Aether */
458     {
459       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
460       handle->th = NULL;
461     }
462   if (handle->current != NULL)  /* We are trying to send something now, clean it up */
463     GNUNET_free (handle->current);
464
465   if (handle->client != NULL)   /* Finally, disconnect from the service */
466     {
467       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
468       handle->client = NULL;
469     }
470
471   GNUNET_free (handle);
472 }
473
474
475 /**
476  * Send complete (or failed), schedule next (or don't)
477  */
478 static void
479 finish (struct GNUNET_DHT_Handle *handle, int code)
480 {
481   /* TODO: if code is not GNUNET_OK, do something! */
482   struct PendingMessage *pos = handle->current;
483 #if DEBUG_DHT_API
484   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
485 #endif
486   GNUNET_assert (pos != NULL);
487
488   if (pos->is_unique)
489     {
490       if (pos->cont != NULL)
491         {
492           if (code == GNUNET_SYSERR)
493             GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
494                                                pos->cont_cls,
495                                                GNUNET_SCHEDULER_REASON_TIMEOUT);
496           else
497             GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
498                                                pos->cont_cls,
499                                                GNUNET_SCHEDULER_REASON_PREREQ_DONE);
500         }
501
502       GNUNET_free (pos->msg);
503       handle->current = NULL;
504       GNUNET_free (pos);
505     }
506   /* Otherwise we need to wait for a response to this message! */
507 }
508
509 /**
510  * Transmit the next pending message, called by notify_transmit_ready
511  */
512 static size_t
513 transmit_pending (void *cls, size_t size, void *buf)
514 {
515   struct GNUNET_DHT_Handle *handle = cls;
516   size_t tsize;
517
518 #if DEBUG_DHT_API
519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
520               "`%s': In transmit_pending\n", "DHT API");
521 #endif
522   if (buf == NULL)
523     {
524 #if DEBUG_DHT_API
525       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
526                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
527 #endif
528       /* FIXME: free associated resources or summat */
529       finish (handle, GNUNET_SYSERR);
530       return 0;
531     }
532
533   handle->th = NULL;
534
535   if (handle->current != NULL)
536     {
537       tsize = ntohs (handle->current->msg->size);
538       if (size >= tsize)
539         {
540 #if DEBUG_DHT_API
541           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
542                       "`%s': Sending message size %d\n", "DHT API", tsize);
543 #endif
544           memcpy (buf, handle->current->msg, tsize);
545           finish (handle, GNUNET_OK);
546           return tsize;
547         }
548       else
549         {
550           return 0;
551         }
552     }
553   /* Have no pending request */
554   return 0;
555 }
556
557
558 /**
559  * Try to (re)connect to the dht service.
560  *
561  * @return GNUNET_YES on success, GNUNET_NO on failure.
562  */
563 static int
564 try_connect (struct GNUNET_DHT_Handle *handle)
565 {
566   if (handle->client != NULL)
567     return GNUNET_OK;
568   handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
569   if (handle->client != NULL)
570     return GNUNET_YES;
571 #if DEBUG_STATISTICS
572   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573               _("Failed to connect to the dht service!\n"));
574 #endif
575   return GNUNET_NO;
576 }
577
578
579 /**
580  * Try to send messages from list of messages to send
581  */
582 static void
583 process_pending_message (struct GNUNET_DHT_Handle *handle)
584 {
585
586   if (handle->current == NULL)
587     return;                     /* action already pending */
588   if (GNUNET_YES != try_connect (handle))
589     {
590       finish (handle, GNUNET_SYSERR);
591       return;
592     }
593
594   /* TODO: set do_destroy somewhere's, see what needs to happen in that case! */
595   if (handle->do_destroy)
596     {
597       //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
598     }
599
600
601   if (NULL ==
602       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
603                                                          ntohs (handle->
604                                                                 current->msg->
605                                                                 size),
606                                                          handle->current->
607                                                          timeout, GNUNET_YES,
608                                                          &transmit_pending,
609                                                          handle)))
610     {
611 #if DEBUG_DHT_API
612       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
613                   "Failed to transmit request to dht service.\n");
614 #endif
615       finish (handle, GNUNET_SYSERR);
616     }
617 #if DEBUG_DHT_API
618   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619               "`%s': Scheduled sending message of size %d to service\n",
620               "DHT API", ntohs (handle->current->msg->size));
621 #endif
622 }
623
624 /**
625  * Iterator called on each result obtained from a generic route
626  * operation
627  */
628 void
629 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
630 {
631   struct GNUNET_DHT_GetHandle *get_handle = cls;
632   struct GNUNET_DHT_GetResultMessage *result;
633   size_t data_size;
634   char *result_data;
635
636   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
637     return;
638
639   GNUNET_assert (ntohs (reply->size) >=
640                  sizeof (struct GNUNET_DHT_GetResultMessage));
641   result = (struct GNUNET_DHT_GetResultMessage *) reply;
642   data_size = ntohs (result->data_size);
643   GNUNET_assert (ntohs (reply->size) ==
644                  sizeof (struct GNUNET_DHT_GetResultMessage) + data_size);
645   result_data = (char *) &result[1];    /* Set data pointer to end of message */
646
647   get_handle->get_context.iter (get_handle->get_context.iter_cls,
648                                 result->expiration, &result->key,
649                                 ntohs (result->type), data_size, result_data);
650 }
651
652
653 /**
654  * Iterator called on each result obtained from a generic route
655  * operation
656  */
657 void
658 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
659 {
660   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
661   struct GNUNET_DHT_FindPeerResultMessage *result;
662   size_t data_size;
663   struct GNUNET_MessageHeader *result_data;
664
665 #if DEBUG_DHT_API
666       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
667                   "Find peer iterator called.\n");
668 #endif
669   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
670     return;
671
672   GNUNET_assert (ntohs (reply->size) >=
673                  sizeof (struct GNUNET_DHT_FindPeerResultMessage));
674   result = (struct GNUNET_DHT_FindPeerResultMessage *) reply;
675   data_size = ntohs (result->data_size);
676   GNUNET_assert (ntohs (reply->size) ==
677                  sizeof (struct GNUNET_DHT_FindPeerResultMessage) + data_size);
678
679   if (data_size > 0)
680     result_data = (struct GNUNET_MessageHeader *) &result[1];   /* Set data pointer to end of message */
681   else
682     result_data = NULL;
683
684   find_peer_handle->find_peer_context.proc (find_peer_handle->
685                                             find_peer_context.proc_cls,
686                                             &result->peer, result_data);
687 }
688
689 /**
690  * Perform an asynchronous FIND_PEER operation on the DHT.
691  *
692  * @param handle handle to the DHT service
693  * @param key the key to look up
694  * @param desired_replication_level how many peers should ultimately receive
695  *                this message (advisory only, target may be too high for the
696  *                given DHT or not hit exactly).
697  * @param options options for routing
698  * @param enc send the encapsulated message to a peer close to the key
699  * @param iter function to call on each result, NULL if no replies are expected
700  * @param iter_cls closure for iter
701  * @param timeout when to abort with an error if we fail to get
702  *                a confirmation for the request (when necessary) or how long
703  *                to wait for tramission to the service
704  * @param cont continuation to call when done;
705  *             reason will be TIMEOUT on error,
706  *             reason will be PREREQ_DONE on success
707  * @param cont_cls closure for cont
708  *
709  * @return handle to stop the request, NULL if the request is "fire and forget"
710  */
711 struct GNUNET_DHT_RouteHandle *
712 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
713                         const GNUNET_HashCode * key,
714                         unsigned int desired_replication_level,
715                         enum GNUNET_DHT_RouteOption options,
716                         const struct GNUNET_MessageHeader *enc,
717                         struct GNUNET_TIME_Relative timeout,
718                         GNUNET_DHT_ReplyProcessor iter,
719                         void *iter_cls,
720                         GNUNET_SCHEDULER_Task cont, void *cont_cls)
721 {
722   struct GNUNET_DHT_RouteHandle *route_handle;
723   struct PendingMessage *pending;
724   struct GNUNET_DHT_Message *message;
725   size_t is_unique;
726   size_t msize;
727   GNUNET_HashCode *uid_key;
728   uint64_t uid;
729
730   is_unique = GNUNET_YES;
731   if (iter == NULL)
732     is_unique = GNUNET_NO;
733
734   route_handle = NULL;
735   uid_key = NULL;
736
737   do
738     {
739       GNUNET_free_non_null (uid_key);
740       uid = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
741       uid_key = hash_from_uid (uid);
742     }
743   while (GNUNET_CONTAINER_multihashmap_contains
744          (handle->outstanding_requests, uid_key) == GNUNET_YES);
745
746   if (is_unique)
747     {
748       route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
749       memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
750       route_handle->iter = iter;
751       route_handle->iter_cls = iter_cls;
752       route_handle->dht_handle = handle;
753       route_handle->uid = uid;
754 #if DEBUG_DHT_API
755       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
756                   "`%s': Unique ID is %llu\n", "DHT API", uid);
757 #endif
758       /**
759        * Store based on random identifier!
760        */
761       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
762                                          uid_key, route_handle,
763                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
764       msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size);
765
766     }
767   else
768     {
769       msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size);
770     }
771
772   GNUNET_free (uid_key);
773   message = GNUNET_malloc (msize);
774   message->header.size = htons (msize);
775   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT);
776   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
777   message->options = htons (options);
778   message->desired_replication_level = htons (options);
779   message->unique = htons (is_unique);
780   message->unique_id = GNUNET_htonll (uid);
781   memcpy (&message[1], enc, ntohs (enc->size));
782
783   pending = GNUNET_malloc (sizeof (struct PendingMessage));
784   pending->msg = &message->header;
785   pending->timeout = timeout;
786   pending->cont = cont;
787   pending->cont_cls = cont_cls;
788   pending->is_unique = is_unique;
789   pending->unique_id = uid;
790
791   GNUNET_assert (handle->current == NULL);
792
793   handle->current = pending;
794
795   process_pending_message (handle);
796
797   return route_handle;
798 }
799
800 void
801 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
802                        GNUNET_SCHEDULER_Task cont, void *cont_cls);
803
804 /**
805  * Perform an asynchronous GET operation on the DHT identified.
806  *
807  * @param handle handle to the DHT service
808  * @param timeout how long to wait for transmission of this request to the service
809  * @param type expected type of the response object
810  * @param key the key to look up
811  * @param iter function to call on each result
812  * @param iter_cls closure for iter
813  * @param cont continuation to call once message sent
814  * @param cont_cls closure for continuation
815  *
816  * @return handle to stop the async get
817  */
818 struct GNUNET_DHT_GetHandle *
819 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
820                       struct GNUNET_TIME_Relative timeout,
821                       uint32_t type,
822                       const GNUNET_HashCode * key,
823                       GNUNET_DHT_GetIterator iter,
824                       void *iter_cls,
825                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
826 {
827   struct GNUNET_DHT_GetHandle *get_handle;
828   struct GNUNET_DHT_GetMessage *get_msg;
829
830   if (handle->current != NULL)  /* Can't send right now, we have a pending message... */
831     return NULL;
832
833   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
834   get_handle->get_context.iter = iter;
835   get_handle->get_context.iter_cls = iter_cls;
836
837 #if DEBUG_DHT_API
838   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839               "`%s': Inserting pending get request with key %s\n", "DHT API",
840               GNUNET_h2s (key));
841 #endif
842
843   get_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetMessage));
844   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
845   get_msg->header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
846   get_msg->type = htonl (type);
847
848   get_handle->route_handle =
849     GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg->header, timeout,
850                             &get_reply_iterator, get_handle, cont, cont_cls);
851   return get_handle;
852 }
853
854 /**
855  * Stop a previously issued routing request
856  *
857  * @param route_handle handle to the request to stop
858  * @param cont continuation to call once this message is sent to the service or times out
859  * @param cont_cls closure for the continuation
860  *
861  */
862 void
863 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
864                        GNUNET_SCHEDULER_Task cont, void *cont_cls)
865 {
866   struct PendingMessage *pending;
867   struct GNUNET_DHT_StopMessage *message;
868   size_t msize;
869   GNUNET_HashCode *uid_key;
870
871   msize = sizeof (struct GNUNET_DHT_StopMessage);
872
873   message = GNUNET_malloc (msize);
874   message->header.size = htons (msize);
875   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
876 #if DEBUG_DHT_API
877   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
878               "`%s': Remove outstanding request for uid %llu\n", "DHT API",
879               route_handle->uid);
880 #endif
881   message->unique_id = GNUNET_htonll (route_handle->uid);
882
883   GNUNET_assert (route_handle->dht_handle->current == NULL);
884
885   pending = GNUNET_malloc (sizeof (struct PendingMessage));
886   pending->msg = (struct GNUNET_MessageHeader *) message;
887   pending->timeout = DEFAULT_DHT_TIMEOUT;
888   pending->cont = cont;
889   pending->cont_cls = cont_cls;
890   pending->is_unique = GNUNET_NO;
891   pending->unique_id = route_handle->uid;
892
893   GNUNET_assert (route_handle->dht_handle->current == NULL);
894
895   route_handle->dht_handle->current = pending;
896
897   process_pending_message (route_handle->dht_handle);
898
899   uid_key = hash_from_uid (route_handle->uid);
900
901   if (GNUNET_CONTAINER_multihashmap_remove
902       (route_handle->dht_handle->outstanding_requests, uid_key,
903        route_handle) != GNUNET_YES)
904     {
905 #if DEBUG_DHT_API
906       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907                   "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n",
908                   "DHT API", GNUNET_h2s (uid_key), route_handle->uid);
909 #endif
910     }
911   GNUNET_free (uid_key);
912   return;
913 }
914
915
916 /**
917  * Stop async DHT-get.
918  *
919  * @param get_handle handle to the GET operation to stop
920  * @param cont continuation to call once this message is sent to the service or times out
921  * @param cont_cls closure for the continuation
922  */
923 void
924 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
925                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
926 {
927 #if DEBUG_DHT_API
928   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
929               "`%s': Removing pending get request with key %s, uid %llu\n",
930               "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
931               get_handle->route_handle->uid);
932 #endif
933   GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
934   GNUNET_free (get_handle);
935
936 }
937
938
939 /**
940  * Perform an asynchronous FIND PEER operation on the DHT.
941  *
942  * @param handle handle to the DHT service
943  * @param timeout timeout for this request to be sent to the
944  *        service
945  * @param options routing options for this message
946  * @param message a message to inject at found peers (may be null)
947  * @param key the key to look up
948  * @param proc function to call on each result
949  * @param proc_cls closure for proc
950  * @param cont continuation to call once message sent
951  * @param cont_cls closure for continuation
952  *
953  * @return handle to stop the async get, NULL on error
954  */
955 struct GNUNET_DHT_FindPeerHandle *
956 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
957                             struct GNUNET_TIME_Relative timeout,
958                             enum GNUNET_DHT_RouteOption options,
959                             struct GNUNET_MessageHeader *message,
960                             const GNUNET_HashCode * key,
961                             GNUNET_DHT_FindPeerProcessor proc,
962                             void *proc_cls,
963                             GNUNET_SCHEDULER_Task cont, void *cont_cls)
964 {
965   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
966   struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
967   size_t msize;
968
969   if (handle->current != NULL)  /* Can't send right now, we have a pending message... */
970     return NULL;
971
972   if (message != NULL)
973     msize = ntohs (message->size);
974   else
975     msize = 0;
976
977   find_peer_handle =
978     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
979   find_peer_handle->find_peer_context.proc = proc;
980   find_peer_handle->find_peer_context.proc_cls = proc_cls;
981
982 #if DEBUG_DHT_API
983   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
984               "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
985               "FIND PEER", GNUNET_h2s (key));
986 #endif
987
988   find_peer_msg =
989     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) + msize);
990   find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
991   find_peer_msg->header.size =
992     htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
993   find_peer_msg->msg_len = msize;
994
995   if (message != NULL)
996     {
997       memcpy (&find_peer_msg[1], message, msize);
998     }
999
1000   find_peer_handle->route_handle =
1001     GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg->header,
1002                             timeout, &find_peer_reply_iterator,
1003                             find_peer_handle, cont, cont_cls);
1004   return find_peer_handle;
1005 }
1006
1007 /**
1008  * Stop async find peer.  Frees associated resources.
1009  *
1010  * @param find_peer_handle GET operation to stop.
1011  * @param cont continuation to call once this message is sent to the service or times out
1012  * @param cont_cls closure for the continuation
1013  */
1014 void
1015 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
1016                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
1017 {
1018 #if DEBUG_DHT_API
1019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020               "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1021               "DHT API", "FIND PEER",
1022               GNUNET_h2s (&find_peer_handle->route_handle->key),
1023               find_peer_handle->route_handle->uid);
1024 #endif
1025   GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1026   GNUNET_free (find_peer_handle);
1027
1028 }
1029
1030
1031 /**
1032  * Perform a PUT operation storing data in the DHT.
1033  *
1034  * @param handle handle to DHT service
1035  * @param key the key to store under
1036  * @param type type of the value
1037  * @param size number of bytes in data; must be less than 64k
1038  * @param data the data to store
1039  * @param exp desired expiration time for the value
1040  * @param timeout how long to wait for transmission of this request
1041  * @param cont continuation to call when done;
1042  *             reason will be TIMEOUT on error,
1043  *             reason will be PREREQ_DONE on success
1044  * @param cont_cls closure for cont
1045  *
1046  * @return GNUNET_YES if put message is queued for transmission
1047  */
1048 void
1049 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1050                 const GNUNET_HashCode * key,
1051                 uint32_t type,
1052                 uint32_t size,
1053                 const char *data,
1054                 struct GNUNET_TIME_Absolute exp,
1055                 struct GNUNET_TIME_Relative timeout,
1056                 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1057 {
1058   struct GNUNET_DHT_PutMessage *put_msg;
1059   size_t msize;
1060
1061   if (handle->current != NULL)
1062     {
1063       GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1064                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
1065       return;
1066     }
1067
1068 #if DEBUG_DHT_API
1069   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070               "`%s': Inserting pending put request with key %s\n", "DHT API",
1071               GNUNET_h2s (key));
1072 #endif
1073
1074   msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1075   put_msg = GNUNET_malloc (msize);
1076   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1077   put_msg->header.size = htons (msize);
1078   put_msg->type = htonl (type);
1079   put_msg->data_size = htons (size);
1080   put_msg->expiration = exp;
1081   memcpy (&put_msg[1], data, size);
1082
1083   GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
1084                           NULL, cont, cont_cls);
1085
1086   GNUNET_free (put_msg);
1087 }