dead
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  * TODO: retransmission of pending requests maybe happens now, at least
28  *       the code is in place to do so.  Need to add checks when api calls
29  *       happen to check if retransmission is in progress, and if so set
30  *       the single pending message for transmission once the list of
31  *       retries are done.
32  */
33
34 #include "platform.h"
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
45 #include "dht.h"
46
47 #define DEBUG_DHT_API GNUNET_NO
48
49 struct PendingMessage
50 {
51   /**
52    * Message that is pending
53    */
54   struct GNUNET_MessageHeader *msg;
55
56   /**
57    * Timeout for this message
58    */
59   struct GNUNET_TIME_Relative timeout;
60
61   /**
62    * Continuation to call on message send
63    * or message receipt confirmation
64    */
65   GNUNET_SCHEDULER_Task cont;
66
67   /**
68    * Continuation closure
69    */
70   void *cont_cls;
71
72   /**
73    * Unique ID for this request
74    */
75   uint64_t unique_id;
76
77 };
78
79 struct PendingMessageList
80 {
81   /**
82    * This is a singly linked list.
83    */
84   struct PendingMessageList *next;
85
86   /**
87    * The pending message.
88    */
89   struct PendingMessage *message;
90 };
91
92 struct GNUNET_DHT_GetContext
93 {
94   /**
95    * Iterator to call on data receipt
96    */
97   GNUNET_DHT_GetIterator iter;
98
99   /**
100    * Closure for the iterator callback
101    */
102   void *iter_cls;
103
104 };
105
106 struct GNUNET_DHT_FindPeerContext
107 {
108   /**
109    * Iterator to call on data receipt
110    */
111   GNUNET_DHT_FindPeerProcessor proc;
112
113   /**
114    * Closure for the iterator callback
115    */
116   void *proc_cls;
117
118 };
119
120 /**
121  * Handle to a route request
122  */
123 struct GNUNET_DHT_RouteHandle
124 {
125
126   /**
127    * Unique identifier for this request (for key collisions)
128    */
129   uint64_t uid;
130
131   /**
132    * Key that this get request is for
133    */
134   GNUNET_HashCode key;
135
136   /**
137    * Iterator to call on data receipt
138    */
139   GNUNET_DHT_ReplyProcessor iter;
140
141   /**
142    * Closure for the iterator callback
143    */
144   void *iter_cls;
145
146   /**
147    * Main handle to this DHT api
148    */
149   struct GNUNET_DHT_Handle *dht_handle;
150
151   /**
152    * The actual message sent for this request,
153    * used for retransmitting requests on service
154    * failure/reconnect.  Freed on route_stop.
155    */
156   struct GNUNET_DHT_RouteMessage *message;
157 };
158
159
160 /**
161  * Handle to control a get operation.
162  */
163 struct GNUNET_DHT_GetHandle
164 {
165   /**
166    * Handle to the actual route operation for the get
167    */
168   struct GNUNET_DHT_RouteHandle *route_handle;
169
170   /**
171    * The context of the get request
172    */
173   struct GNUNET_DHT_GetContext get_context;
174 };
175
176
177 /**
178  * Handle to control a find peer operation.
179  */
180 struct GNUNET_DHT_FindPeerHandle
181 {
182   /**
183      * Handle to the actual route operation for the request
184      */
185   struct GNUNET_DHT_RouteHandle *route_handle;
186
187     /**
188      * The context of the find peer request
189      */
190   struct GNUNET_DHT_FindPeerContext find_peer_context;
191 };
192
193
194 enum DHT_Retransmit_Stage
195 {
196   /**
197    * The API is not retransmitting anything at this time.
198    */
199   DHT_NOT_RETRANSMITTING,
200
201   /**
202    * The API is retransmitting, and nothing has been single
203    * queued for sending.
204    */
205   DHT_RETRANSMITTING,
206
207   /**
208    * The API is retransmitting, and a single message has been
209    * queued for transmission once finished.
210    */
211   DHT_RETRANSMITTING_MESSAGE_QUEUED
212 };
213
214
215 /**
216  * Connection to the DHT service.
217  */
218 struct GNUNET_DHT_Handle
219 {
220   /**
221    * Our scheduler.
222    */
223   struct GNUNET_SCHEDULER_Handle *sched;
224
225   /**
226    * Configuration to use.
227    */
228   const struct GNUNET_CONFIGURATION_Handle *cfg;
229
230   /**
231    * Socket (if available).
232    */
233   struct GNUNET_CLIENT_Connection *client;
234
235   /**
236    * Currently pending transmission request.
237    */
238   struct GNUNET_CLIENT_TransmitHandle *th;
239
240   /**
241    * Message we are currently sending, only allow
242    * a single message to be queued.  If not unique
243    * (typically a put request), await a confirmation
244    * from the service that the message was received.
245    * If unique, just fire and forget.
246    */
247   struct PendingMessage *current;
248
249   /**
250    * Hash map containing the current outstanding unique requests
251    */
252   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
253
254   /**
255    * Generator for unique ids.
256    */
257   uint64_t uid_gen;
258
259   /**
260    * Are we currently retransmitting requests?  If so queue a _single_
261    * new request when received.
262    */
263   enum DHT_Retransmit_Stage retransmit_stage;
264
265   /**
266    * Linked list of retranmissions, to be used in the event
267    * of a dht service disconnect/reconnect.
268    */
269   struct PendingMessageList *retransmissions;
270
271   /**
272    * A single pending message allowed to be scheduled
273    * during retransmission phase.
274    */
275   struct PendingMessage *retransmission_buffer;
276 };
277
278
279 /**
280  * Convert unique ID to hash code.
281  *
282  * @param uid unique ID to convert
283  * @param hash set to uid (extended with zeros)
284  */
285 static void
286 hash_from_uid (uint64_t uid,
287                GNUNET_HashCode *hash)
288 {
289   memset (hash, 0, sizeof(GNUNET_HashCode));
290   *((uint64_t*)hash) = uid;
291 }
292
293 #if RETRANSMIT
294 /**
295  * Iterator callback to retransmit each outstanding request
296  * because the connection to the DHT service went down (and
297  * came back).
298  *
299  *
300  */
301 static int retransmit_iterator (void *cls,
302                                 const GNUNET_HashCode * key,
303                                 void *value)
304 {
305   struct GNUNET_DHT_RouteHandle *route_handle = value;
306   struct PendingMessageList *pending_message_list;
307
308   pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
309   pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
310   pending_message_list->message->msg = &route_handle->message->header;
311   pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
312   pending_message_list->message->cont = NULL;
313   pending_message_list->message->cont_cls = NULL;
314   pending_message_list->message->unique_id = route_handle->uid;
315   /* Add the new pending message to the front of the retransmission list */
316   pending_message_list->next = route_handle->dht_handle->retransmissions;
317   route_handle->dht_handle->retransmissions = pending_message_list;
318
319   return GNUNET_OK;
320 }
321 #endif
322
323 /**
324  * Try to (re)connect to the dht service.
325  *
326  * @return GNUNET_YES on success, GNUNET_NO on failure.
327  */
328 static int
329 try_connect (struct GNUNET_DHT_Handle *handle)
330 {
331   if (handle->client != NULL)
332     return GNUNET_OK;
333   handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
334   if (handle->client != NULL)
335     return GNUNET_YES;
336 #if DEBUG_STATISTICS
337   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
338               _("Failed to connect to the dht service!\n"));
339 #endif
340   return GNUNET_NO;
341 }
342
343 /**
344  * Send complete (or failed), call continuation if we have one.
345  */
346 static void
347 finish (struct GNUNET_DHT_Handle *handle, int code)
348 {
349   struct PendingMessage *pos = handle->current;
350   GNUNET_HashCode uid_hash;
351 #if DEBUG_DHT_API
352   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
353 #endif
354   GNUNET_assert (pos != NULL);
355   hash_from_uid (pos->unique_id, &uid_hash);
356   if (pos->cont != NULL)
357     {
358       if (code == GNUNET_SYSERR)
359         GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
360                                            pos->cont_cls,
361                                            GNUNET_SCHEDULER_REASON_TIMEOUT);
362       else
363         GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
364                                            pos->cont_cls,
365                                            GNUNET_SCHEDULER_REASON_PREREQ_DONE);
366     }
367
368   if (pos->unique_id == 0)
369     GNUNET_free(pos->msg);
370   GNUNET_free (pos);
371   handle->current = NULL;
372 }
373
374 /**
375  * Transmit the next pending message, called by notify_transmit_ready
376  */
377 static size_t
378 transmit_pending (void *cls, size_t size, void *buf)
379 {
380   struct GNUNET_DHT_Handle *handle = cls;
381   size_t tsize;
382
383 #if DEBUG_DHT_API
384   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
385               "`%s': In transmit_pending\n", "DHT API");
386 #endif
387   if (buf == NULL)
388     {
389 #if DEBUG_DHT_API
390       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
391                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
392 #endif
393       finish (handle, GNUNET_SYSERR);
394       return 0;
395     }
396
397   handle->th = NULL;
398
399   if (handle->current != NULL)
400     {
401       tsize = ntohs (handle->current->msg->size);
402       if (size >= tsize)
403         {
404 #if DEBUG_DHT_API
405           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
406                       "`%s': Sending message size %d\n", "DHT API", tsize);
407 #endif
408           memcpy (buf, handle->current->msg, tsize);
409           finish (handle, GNUNET_OK);
410           return tsize;
411         }
412       else
413         {
414           return 0;
415         }
416     }
417   /* Have no pending request */
418   return 0;
419 }
420
421 /**
422  * Try to send messages from list of messages to send
423  */
424 static void
425 process_pending_message (struct GNUNET_DHT_Handle *handle)
426 {
427
428   if (handle->current == NULL)
429     return;                     /* action already pending */
430   if (GNUNET_YES != try_connect (handle))
431     {
432       finish (handle, GNUNET_SYSERR);
433       return;
434     }
435
436   if (NULL ==
437       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
438                                                          ntohs (handle->
439                                                                 current->msg->
440                                                                 size),
441                                                          handle->current->
442                                                          timeout, GNUNET_YES,
443                                                          &transmit_pending,
444                                                          handle)))
445     {
446 #if DEBUG_DHT_API
447       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
448                   "Failed to transmit request to dht service.\n");
449 #endif
450       finish (handle, GNUNET_SYSERR);
451       return;
452     }
453 #if DEBUG_DHT_API
454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
455               "`%s': Scheduled sending message of size %d to service\n",
456               "DHT API", ntohs (handle->current->msg->size));
457 #endif
458 }
459
460 /**
461  * Send complete (or failed), call continuation if we have one.
462  * Forward declaration.
463  */
464 static void
465 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
466
467 /* Forward declaration */
468 static size_t
469 transmit_pending_retransmission (void *cls, size_t size, void *buf);
470
471 /**
472  * Try to send messages from list of messages to send
473  */
474 static void
475 process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
476 {
477
478   if (handle->current == NULL)
479     return;                     /* action already pending */
480   if (GNUNET_YES != try_connect (handle))
481     {
482       finish_retransmission (handle, GNUNET_SYSERR);
483       return;
484     }
485
486   if (NULL ==
487       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
488                                                          ntohs (handle->
489                                                                 current->msg->
490                                                                 size),
491                                                          handle->current->
492                                                          timeout, GNUNET_YES,
493                                                          &transmit_pending_retransmission,
494                                                          handle)))
495     {
496 #if DEBUG_DHT_API
497       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498                   "Failed to transmit request to dht service.\n");
499 #endif
500       finish_retransmission (handle, GNUNET_SYSERR);
501       return;
502     }
503 #if DEBUG_DHT_API
504   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
505               "`%s': Scheduled sending message of size %d to service\n",
506               "DHT API", ntohs (handle->current->msg->size));
507 #endif
508 }
509
510 /**
511  * Send complete (or failed), call continuation if we have one.
512  */
513 static void
514 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
515 {
516   struct PendingMessage *pos = handle->current;
517   struct PendingMessageList *pending_list;
518 #if DEBUG_DHT_API
519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
520 #endif
521   GNUNET_assert (pos == handle->retransmissions->message);
522   pending_list = handle->retransmissions;
523   handle->retransmissions = handle->retransmissions->next;
524   GNUNET_free (pending_list);
525
526   if (handle->retransmissions == NULL)
527     {
528       handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
529     }
530
531   if (handle->retransmissions != NULL)
532     {
533       handle->current = handle->retransmissions->message;
534       process_pending_retransmissions(handle);
535     }
536   else if (handle->retransmission_buffer != NULL)
537     {
538       handle->current = handle->retransmission_buffer;
539       process_pending_message(handle);
540     }
541 }
542
543 /**
544  * Handler for messages received from the DHT service
545  * a demultiplexer which handles numerous message types
546  *
547  */
548 void
549 service_message_handler (void *cls,
550                          const struct GNUNET_MessageHeader *msg)
551 {
552   struct GNUNET_DHT_Handle *handle = cls;
553   struct GNUNET_DHT_RouteResultMessage *dht_msg;
554   struct GNUNET_MessageHeader *enc_msg;
555   struct GNUNET_DHT_RouteHandle *route_handle;
556   uint64_t uid;
557   GNUNET_HashCode uid_hash;
558   size_t enc_size;
559
560   if (msg == NULL)
561     {
562 #if DEBUG_DHT_API
563       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
564                   "`%s': Received NULL from server, connection down!\n",
565                   "DHT API");
566 #endif
567       GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
568       handle->client = GNUNET_CLIENT_connect (handle->sched, 
569                                               "dht",
570                                               handle->cfg);
571       if (handle->current != NULL)
572         {
573           finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */
574         }
575 #if RETRANSMIT
576       if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0))
577         {
578           handle->retransmit_stage = DHT_RETRANSMITTING;
579           handle->current = handle->retransmissions->message;
580           process_pending_retransmissions(handle);
581         }
582 #endif
583       return;
584     }
585
586   switch (ntohs (msg->type))
587     {
588     case GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT:
589       {
590         dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
591         uid = GNUNET_ntohll (dht_msg->unique_id);
592 #if DEBUG_DHT_API
593         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594                     "`%s': Received response to message (uid %llu)\n",
595                     "DHT API", uid);
596 #endif
597
598         hash_from_uid (uid, &uid_hash);
599         route_handle =
600           GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
601                                              &uid_hash);
602         if (route_handle == NULL)   /* We have no recollection of this request */
603           {
604 #if DEBUG_DHT_API
605             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606                         "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
607                         "DHT API", uid);
608 #endif
609           }
610         else
611           {
612             enc_size =
613               ntohs (dht_msg->header.size) -
614               sizeof (struct GNUNET_DHT_RouteResultMessage);
615             GNUNET_assert (enc_size > 0);
616             enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
617             route_handle->iter (route_handle->iter_cls, enc_msg);
618           }
619
620         break;
621       }
622     default:
623       {
624         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
625                     "`%s': Received unknown message type %d\n", "DHT API",
626                     ntohs (msg->type));
627       }
628     }
629   GNUNET_CLIENT_receive (handle->client,
630                          &service_message_handler,
631                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
632
633 }
634
635
636 /**
637  * Initialize the connection with the DHT service.
638  *
639  * @param sched scheduler to use
640  * @param cfg configuration to use
641  * @param ht_len size of the internal hash table to use for
642  *               processing multiple GET/FIND requests in parallel
643  *
644  * @return handle to the DHT service, or NULL on error
645  */
646 struct GNUNET_DHT_Handle *
647 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
648                     const struct GNUNET_CONFIGURATION_Handle *cfg,
649                     unsigned int ht_len)
650 {
651   struct GNUNET_DHT_Handle *handle;
652
653   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
654   handle->cfg = cfg;
655   handle->sched = sched;
656   handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
657   handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
658   if (handle->client == NULL)
659     {
660       GNUNET_free (handle);
661       return NULL;
662     }
663   handle->outstanding_requests =
664     GNUNET_CONTAINER_multihashmap_create (ht_len);
665 #if DEBUG_DHT_API
666   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
667               "`%s': Connection to service in progress\n", "DHT API");
668 #endif
669   GNUNET_CLIENT_receive (handle->client,
670                          &service_message_handler,
671                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
672   return handle;
673 }
674
675
676 /**
677  * Shutdown connection with the DHT service.
678  *
679  * @param handle handle of the DHT connection to stop
680  */
681 void
682 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
683 {
684 #if DEBUG_DHT_API
685   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
687 #endif
688   GNUNET_assert (handle != NULL);
689   if (handle->th != NULL)       /* We have a live transmit request in the Aether */
690     {
691       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
692       handle->th = NULL;
693     }
694   if (handle->current != NULL)  /* We are trying to send something now, clean it up */
695     GNUNET_free (handle->current);
696
697   if (handle->client != NULL)   /* Finally, disconnect from the service */
698     {
699       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
700       handle->client = NULL;
701     }
702
703   GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
704   GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
705   GNUNET_free (handle);
706 }
707
708
709 /**
710  * Transmit the next pending message, called by notify_transmit_ready
711  */
712 static size_t
713 transmit_pending_retransmission (void *cls, size_t size, void *buf)
714 {
715   struct GNUNET_DHT_Handle *handle = cls;
716   size_t tsize;
717
718 #if DEBUG_DHT_API
719   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720               "`%s': In transmit_pending\n", "DHT API");
721 #endif
722   if (buf == NULL)
723     {
724 #if DEBUG_DHT_API
725       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
726                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
727 #endif
728       finish_retransmission (handle, GNUNET_SYSERR);
729       return 0;
730     }
731
732   handle->th = NULL;
733
734   if (handle->current != NULL)
735     {
736       tsize = ntohs (handle->current->msg->size);
737       if (size >= tsize)
738         {
739 #if DEBUG_DHT_API
740           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
741                       "`%s': Sending message size %d\n", "DHT API", tsize);
742 #endif
743           memcpy (buf, handle->current->msg, tsize);
744           finish_retransmission (handle, GNUNET_OK);
745           return tsize;
746         }
747       else
748         {
749           return 0;
750         }
751     }
752   /* Have no pending request */
753   return 0;
754 }
755
756
757 /**
758  * Iterator called on each result obtained from a generic route
759  * operation
760  */
761 void
762 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
763 {
764   struct GNUNET_DHT_GetHandle *get_handle = cls;
765   struct GNUNET_DHT_GetResultMessage *result;
766   size_t data_size;
767   char *result_data;
768
769   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
770     return;
771
772   GNUNET_assert (ntohs (reply->size) >=
773                  sizeof (struct GNUNET_DHT_GetResultMessage));
774   result = (struct GNUNET_DHT_GetResultMessage *) reply;
775   data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
776
777   result_data = (char *) &result[1];    /* Set data pointer to end of message */
778
779   get_handle->get_context.iter (get_handle->get_context.iter_cls,
780                                 result->expiration, &result->key,
781                                 ntohs (result->type), data_size, result_data);
782 }
783
784
785 /**
786  * Iterator called on each result obtained from a generic route
787  * operation
788  */
789 void
790 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
791 {
792   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
793   struct GNUNET_MessageHeader *hello;
794
795   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
796     {
797       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
798                   "Received wrong type of response to a find peer request...\n");
799       return;
800     }
801
802
803   GNUNET_assert (ntohs (reply->size) >=
804                  sizeof (struct GNUNET_MessageHeader));
805   hello = (struct GNUNET_MessageHeader *)&reply[1];
806
807   if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
808     {
809       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810                   "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
811       return;
812     }
813   find_peer_handle->find_peer_context.proc (find_peer_handle->
814                                             find_peer_context.proc_cls,
815                                             (struct GNUNET_HELLO_Message *)hello);
816 }
817
818 /**
819  * Perform an asynchronous FIND_PEER operation on the DHT.
820  *
821  * @param handle handle to the DHT service
822  * @param key the key to look up
823  * @param desired_replication_level how many peers should ultimately receive
824  *                this message (advisory only, target may be too high for the
825  *                given DHT or not hit exactly).
826  * @param options options for routing
827  * @param enc send the encapsulated message to a peer close to the key
828  * @param iter function to call on each result, NULL if no replies are expected
829  * @param iter_cls closure for iter
830  * @param timeout when to abort with an error if we fail to get
831  *                a confirmation for the request (when necessary) or how long
832  *                to wait for tramission to the service
833  * @param cont continuation to call when done;
834  *             reason will be TIMEOUT on error,
835  *             reason will be PREREQ_DONE on success
836  * @param cont_cls closure for cont
837  *
838  * @return handle to stop the request, NULL if the request is "fire and forget"
839  */
840 struct GNUNET_DHT_RouteHandle *
841 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
842                         const GNUNET_HashCode * key,
843                         unsigned int desired_replication_level,
844                         enum GNUNET_DHT_RouteOption options,
845                         const struct GNUNET_MessageHeader *enc,
846                         struct GNUNET_TIME_Relative timeout,
847                         GNUNET_DHT_ReplyProcessor iter,
848                         void *iter_cls,
849                         GNUNET_SCHEDULER_Task cont, void *cont_cls)
850 {
851   struct GNUNET_DHT_RouteHandle *route_handle;
852   struct PendingMessage *pending;
853   struct GNUNET_DHT_RouteMessage *message;
854   uint16_t msize;
855   GNUNET_HashCode uid_key;
856
857   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
858     return NULL;
859
860   if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
861     {
862       GNUNET_break (0);
863       return NULL;
864     }
865
866   route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
867   memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
868   route_handle->iter = iter;
869   route_handle->iter_cls = iter_cls;
870   route_handle->dht_handle = handle;
871   if (iter != NULL)
872     {
873       route_handle->uid = handle->uid_gen++;
874       hash_from_uid (route_handle->uid, &uid_key);
875       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
876                                          &uid_key, route_handle,
877                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
878     }
879
880 #if DEBUG_DHT_API
881       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
882                   "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
883 #endif
884
885   msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
886   message = GNUNET_malloc (msize);
887   message->header.size = htons (msize);
888   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE);
889   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
890   message->options = htonl (options);
891   message->desired_replication_level = htonl (options);
892   message->unique_id = GNUNET_htonll (route_handle->uid);
893   memcpy (&message[1], enc, ntohs (enc->size));
894   pending = GNUNET_malloc (sizeof (struct PendingMessage));
895   pending->msg = &message->header;
896   pending->timeout = timeout;
897   pending->cont = cont;
898   pending->cont_cls = cont_cls;
899   pending->unique_id = route_handle->uid;
900   if (handle->current == NULL)
901     {
902       handle->current = pending;
903       process_pending_message (handle);
904     }
905   else
906   {
907     handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
908     handle->retransmission_buffer = pending;
909   }
910
911   route_handle->message = message;
912   return route_handle;
913 }
914
915
916 /**
917  * Perform an asynchronous GET operation on the DHT identified.
918  *
919  * @param handle handle to the DHT service
920  * @param timeout how long to wait for transmission of this request to the service
921  * @param type expected type of the response object
922  * @param key the key to look up
923  * @param iter function to call on each result
924  * @param iter_cls closure for iter
925  * @param cont continuation to call once message sent
926  * @param cont_cls closure for continuation
927  *
928  * @return handle to stop the async get
929  */
930 struct GNUNET_DHT_GetHandle *
931 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
932                       struct GNUNET_TIME_Relative timeout,
933                       uint32_t type,
934                       const GNUNET_HashCode * key,
935                       GNUNET_DHT_GetIterator iter,
936                       void *iter_cls,
937                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
938 {
939   struct GNUNET_DHT_GetHandle *get_handle;
940   struct GNUNET_DHT_GetMessage get_msg;
941
942   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
943     return NULL;
944
945   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
946   get_handle->get_context.iter = iter;
947   get_handle->get_context.iter_cls = iter_cls;
948
949 #if DEBUG_DHT_API
950   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
951               "`%s': Inserting pending get request with key %s\n", "DHT API",
952               GNUNET_h2s (key));
953 #endif
954
955   get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
956   get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
957   get_msg.type = htons (type);
958
959   get_handle->route_handle =
960     GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout,
961                             &get_reply_iterator, get_handle, cont, cont_cls);
962
963   return get_handle;
964 }
965
966
967 /**
968  * Stop a previously issued routing request
969  *
970  * @param route_handle handle to the request to stop
971  * @param cont continuation to call once this message is sent to the service or times out
972  * @param cont_cls closure for the continuation
973  */
974 void
975 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
976                        GNUNET_SCHEDULER_Task cont, void *cont_cls)
977 {
978   struct PendingMessage *pending;
979   struct GNUNET_DHT_StopMessage *message;
980   size_t msize;
981   GNUNET_HashCode uid_key;
982
983   msize = sizeof (struct GNUNET_DHT_StopMessage);
984   message = GNUNET_malloc (msize);
985   message->header.size = htons (msize);
986   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
987 #if DEBUG_DHT_API
988   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
989               "`%s': Remove outstanding request for uid %llu\n", "DHT API",
990               route_handle->uid);
991 #endif
992   message->unique_id = GNUNET_htonll (route_handle->uid);
993   pending = GNUNET_malloc (sizeof (struct PendingMessage));
994   pending->msg = (struct GNUNET_MessageHeader *) message;
995   pending->timeout = GNUNET_TIME_relative_get_forever();
996   pending->cont = cont;
997   pending->cont_cls = cont_cls;
998   pending->unique_id = 0; /* When finished is called, free pending->msg */
999
1000   if (route_handle->dht_handle->current == NULL)
1001     {
1002       route_handle->dht_handle->current = pending;
1003       process_pending_message (route_handle->dht_handle);
1004     }
1005   else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)
1006     {
1007       route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1008       route_handle->dht_handle->retransmission_buffer = pending;
1009     }
1010   else
1011     {
1012       GNUNET_break(0);
1013     }
1014
1015   hash_from_uid (route_handle->uid, &uid_key);
1016   GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
1017                  (route_handle->dht_handle->outstanding_requests, &uid_key,
1018                   route_handle) == GNUNET_YES);
1019
1020   GNUNET_free(route_handle->message);
1021   GNUNET_free(route_handle);
1022 }
1023
1024
1025 /**
1026  * Stop async DHT-get.
1027  *
1028  * @param get_handle handle to the GET operation to stop
1029  * @param cont continuation to call once this message is sent to the service or times out
1030  * @param cont_cls closure for the continuation
1031  */
1032 void
1033 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
1034                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
1035 {
1036   if ((get_handle->route_handle->dht_handle->current != NULL) &&
1037       (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1038     {
1039       if (cont != NULL)
1040         {
1041           GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
1042                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1043         }
1044       return;
1045     }
1046
1047 #if DEBUG_DHT_API
1048   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049               "`%s': Removing pending get request with key %s, uid %llu\n",
1050               "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
1051               get_handle->route_handle->uid);
1052 #endif
1053   GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
1054   GNUNET_free (get_handle);
1055 }
1056
1057
1058 /**
1059  * Perform an asynchronous FIND PEER operation on the DHT.
1060  *
1061  * @param handle handle to the DHT service
1062  * @param timeout timeout for this request to be sent to the
1063  *        service
1064  * @param options routing options for this message
1065  * @param key the key to look up
1066  * @param proc function to call on each result
1067  * @param proc_cls closure for proc
1068  * @param cont continuation to call once message sent
1069  * @param cont_cls closure for continuation
1070  *
1071  * @return handle to stop the async get, NULL on error
1072  */
1073 struct GNUNET_DHT_FindPeerHandle *
1074 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
1075                             struct GNUNET_TIME_Relative timeout,
1076                             enum GNUNET_DHT_RouteOption options,
1077                             const GNUNET_HashCode * key,
1078                             GNUNET_DHT_FindPeerProcessor proc,
1079                             void *proc_cls,
1080                             GNUNET_SCHEDULER_Task cont,
1081                             void *cont_cls)
1082 {
1083   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
1084   struct GNUNET_MessageHeader find_peer_msg;
1085
1086   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))  /* Can't send right now, we have a pending message... */
1087     return NULL;
1088
1089   find_peer_handle =
1090     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
1091   find_peer_handle->find_peer_context.proc = proc;
1092   find_peer_handle->find_peer_context.proc_cls = proc_cls;
1093
1094 #if DEBUG_DHT_API
1095   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1096               "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
1097               "FIND PEER", GNUNET_h2s (key));
1098 #endif
1099
1100   find_peer_msg.size = htons(sizeof(struct GNUNET_MessageHeader));
1101   find_peer_msg.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
1102   find_peer_handle->route_handle =
1103     GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg,
1104                             timeout, &find_peer_reply_iterator,
1105                             find_peer_handle, cont, cont_cls);
1106   return find_peer_handle;
1107 }
1108
1109 /**
1110  * Stop async find peer.  Frees associated resources.
1111  *
1112  * @param find_peer_handle GET operation to stop.
1113  * @param cont continuation to call once this message is sent to the service or times out
1114  * @param cont_cls closure for the continuation
1115  */
1116 void
1117 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
1118                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
1119 {
1120   if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
1121       (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1122     {
1123       if (cont != NULL)
1124         {
1125           GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
1126                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1127         }
1128       return;
1129     }
1130
1131 #if DEBUG_DHT_API
1132   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1133               "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1134               "DHT API", "FIND PEER",
1135               GNUNET_h2s (&find_peer_handle->route_handle->key),
1136               find_peer_handle->route_handle->uid);
1137 #endif
1138   GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1139   GNUNET_free (find_peer_handle);
1140
1141 }
1142
1143
1144 /**
1145  * Perform a PUT operation storing data in the DHT.
1146  *
1147  * @param handle handle to DHT service
1148  * @param key the key to store under
1149  * @param type type of the value
1150  * @param size number of bytes in data; must be less than 64k
1151  * @param data the data to store
1152  * @param exp desired expiration time for the value
1153  * @param timeout how long to wait for transmission of this request
1154  * @param cont continuation to call when done;
1155  *             reason will be TIMEOUT on error,
1156  *             reason will be PREREQ_DONE on success
1157  * @param cont_cls closure for cont
1158  *
1159  * @return GNUNET_YES if put message is queued for transmission
1160  */
1161 void
1162 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1163                 const GNUNET_HashCode * key,
1164                 uint32_t type,
1165                 uint32_t size,
1166                 const char *data,
1167                 struct GNUNET_TIME_Absolute exp,
1168                 struct GNUNET_TIME_Relative timeout,
1169                 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1170 {
1171   struct GNUNET_DHT_PutMessage *put_msg;
1172   struct GNUNET_DHT_RouteHandle *put_route;
1173   size_t msize;
1174
1175   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1176     {
1177       if (cont != NULL)
1178         {
1179           GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1180                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1181         }
1182       return;
1183     }
1184
1185 #if DEBUG_DHT_API
1186   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1187               "`%s': Inserting pending put request with key %s\n", "DHT API",
1188               GNUNET_h2s (key));
1189 #endif
1190
1191   msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1192   put_msg = GNUNET_malloc (msize);
1193   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1194   put_msg->header.size = htons (msize);
1195   put_msg->type = htons (type);
1196   put_msg->data_size = htons (size);
1197   put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
1198   memcpy (&put_msg[1], data, size);
1199
1200   put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
1201                                       NULL, cont, cont_cls);
1202
1203   if (put_route == NULL) /* Route start failed! */
1204     {
1205       if (cont != NULL)
1206         {
1207           GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1208                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1209         }
1210     }
1211   else
1212     GNUNET_free(put_route);
1213
1214   GNUNET_free (put_msg);
1215 }