bad commit fix
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  * TODO: retransmission of pending requests maybe happens now, at least
28  *       the code is in place to do so.  Need to add checks when api calls
29  *       happen to check if retransmission is in progress, and if so set
30  *       the single pending message for transmission once the list of
31  *       retries are done.
32  */
33
34 #include "platform.h"
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
45 #include "dht.h"
46
47 #define DEBUG_DHT_API GNUNET_NO
48
49 struct PendingMessage
50 {
51   /**
52    * Message that is pending
53    */
54   struct GNUNET_MessageHeader *msg;
55
56   /**
57    * Timeout for this message
58    */
59   struct GNUNET_TIME_Relative timeout;
60
61   /**
62    * Continuation to call on message send
63    * or message receipt confirmation
64    */
65   GNUNET_SCHEDULER_Task cont;
66
67   /**
68    * Continuation closure
69    */
70   void *cont_cls;
71
72   /**
73    * Unique ID for this request
74    */
75   uint64_t unique_id;
76
77 };
78
79 struct PendingMessageList
80 {
81   /**
82    * This is a singly linked list.
83    */
84   struct PendingMessageList *next;
85
86   /**
87    * The pending message.
88    */
89   struct PendingMessage *message;
90 };
91
92 struct GNUNET_DHT_GetContext
93 {
94   /**
95    * Iterator to call on data receipt
96    */
97   GNUNET_DHT_GetIterator iter;
98
99   /**
100    * Closure for the iterator callback
101    */
102   void *iter_cls;
103
104 };
105
106 struct GNUNET_DHT_FindPeerContext
107 {
108   /**
109    * Iterator to call on data receipt
110    */
111   GNUNET_DHT_FindPeerProcessor proc;
112
113   /**
114    * Closure for the iterator callback
115    */
116   void *proc_cls;
117
118 };
119
120 /**
121  * Handle to a route request
122  */
123 struct GNUNET_DHT_RouteHandle
124 {
125
126   /**
127    * Unique identifier for this request (for key collisions)
128    */
129   uint64_t uid;
130
131   /**
132    * Key that this get request is for
133    */
134   GNUNET_HashCode key;
135
136   /**
137    * Iterator to call on data receipt
138    */
139   GNUNET_DHT_ReplyProcessor iter;
140
141   /**
142    * Closure for the iterator callback
143    */
144   void *iter_cls;
145
146   /**
147    * Main handle to this DHT api
148    */
149   struct GNUNET_DHT_Handle *dht_handle;
150
151   /**
152    * The actual message sent for this request,
153    * used for retransmitting requests on service
154    * failure/reconnect.  Freed on route_stop.
155    */
156   struct GNUNET_DHT_RouteMessage *message;
157 };
158
159
160 /**
161  * Handle to control a get operation.
162  */
163 struct GNUNET_DHT_GetHandle
164 {
165   /**
166    * Handle to the actual route operation for the get
167    */
168   struct GNUNET_DHT_RouteHandle *route_handle;
169
170   /**
171    * The context of the get request
172    */
173   struct GNUNET_DHT_GetContext get_context;
174 };
175
176
177 /**
178  * Handle to control a find peer operation.
179  */
180 struct GNUNET_DHT_FindPeerHandle
181 {
182   /**
183      * Handle to the actual route operation for the request
184      */
185   struct GNUNET_DHT_RouteHandle *route_handle;
186
187     /**
188      * The context of the find peer request
189      */
190   struct GNUNET_DHT_FindPeerContext find_peer_context;
191 };
192
193
194 enum DHT_Retransmit_Stage
195 {
196   /**
197    * The API is not retransmitting anything at this time.
198    */
199   DHT_NOT_RETRANSMITTING,
200
201   /**
202    * The API is retransmitting, and nothing has been single
203    * queued for sending.
204    */
205   DHT_RETRANSMITTING,
206
207   /**
208    * The API is retransmitting, and a single message has been
209    * queued for transmission once finished.
210    */
211   DHT_RETRANSMITTING_MESSAGE_QUEUED
212 };
213
214
215 /**
216  * Connection to the DHT service.
217  */
218 struct GNUNET_DHT_Handle
219 {
220   /**
221    * Our scheduler.
222    */
223   struct GNUNET_SCHEDULER_Handle *sched;
224
225   /**
226    * Configuration to use.
227    */
228   const struct GNUNET_CONFIGURATION_Handle *cfg;
229
230   /**
231    * Socket (if available).
232    */
233   struct GNUNET_CLIENT_Connection *client;
234
235   /**
236    * Currently pending transmission request.
237    */
238   struct GNUNET_CLIENT_TransmitHandle *th;
239
240   /**
241    * Message we are currently sending, only allow
242    * a single message to be queued.  If not unique
243    * (typically a put request), await a confirmation
244    * from the service that the message was received.
245    * If unique, just fire and forget.
246    */
247   struct PendingMessage *current;
248
249   /**
250    * Hash map containing the current outstanding unique requests
251    */
252   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
253
254   /**
255    * Generator for unique ids.
256    */
257   uint64_t uid_gen;
258
259   /**
260    * Are we currently retransmitting requests?  If so queue a _single_
261    * new request when received.
262    */
263   enum DHT_Retransmit_Stage retransmit_stage;
264
265   /**
266    * Linked list of retranmissions, to be used in the event
267    * of a dht service disconnect/reconnect.
268    */
269   struct PendingMessageList *retransmissions;
270
271   /**
272    * A single pending message allowed to be scheduled
273    * during retransmission phase.
274    */
275   struct PendingMessage *retransmission_buffer;
276 };
277
278
279 /**
280  * Convert unique ID to hash code.
281  *
282  * @param uid unique ID to convert
283  * @param hash set to uid (extended with zeros)
284  */
285 static void
286 hash_from_uid (uint64_t uid,
287                GNUNET_HashCode *hash)
288 {
289   memset (hash, 0, sizeof(GNUNET_HashCode));
290   *((uint64_t*)hash) = uid;
291 }
292
293 #if RETRANSMIT
294 /**
295  * Iterator callback to retransmit each outstanding request
296  * because the connection to the DHT service went down (and
297  * came back).
298  *
299  *
300  */
301 static int retransmit_iterator (void *cls,
302                                 const GNUNET_HashCode * key,
303                                 void *value)
304 {
305   struct GNUNET_DHT_RouteHandle *route_handle = value;
306   struct PendingMessageList *pending_message_list;
307
308   pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
309   pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
310   pending_message_list->message->msg = &route_handle->message->header;
311   pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
312   pending_message_list->message->cont = NULL;
313   pending_message_list->message->cont_cls = NULL;
314   pending_message_list->message->unique_id = route_handle->uid;
315   /* Add the new pending message to the front of the retransmission list */
316   pending_message_list->next = route_handle->dht_handle->retransmissions;
317   route_handle->dht_handle->retransmissions = pending_message_list;
318
319   return GNUNET_OK;
320 }
321 #endif
322
323 /**
324  * Try to (re)connect to the dht service.
325  *
326  * @return GNUNET_YES on success, GNUNET_NO on failure.
327  */
328 static int
329 try_connect (struct GNUNET_DHT_Handle *handle)
330 {
331   if (handle->client != NULL)
332     return GNUNET_OK;
333   handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
334   if (handle->client != NULL)
335     return GNUNET_YES;
336 #if DEBUG_STATISTICS
337   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
338               _("Failed to connect to the dht service!\n"));
339 #endif
340   return GNUNET_NO;
341 }
342
343 /**
344  * Send complete (or failed), call continuation if we have one.
345  */
346 static void
347 finish (struct GNUNET_DHT_Handle *handle, int code)
348 {
349   struct PendingMessage *pos = handle->current;
350   GNUNET_HashCode uid_hash;
351   hash_from_uid (pos->unique_id, &uid_hash);
352 #if DEBUG_DHT_API
353   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
354 #endif
355   GNUNET_assert (pos != NULL);
356
357   if (pos->cont != NULL)
358     {
359       if (code == GNUNET_SYSERR)
360         GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
361                                            pos->cont_cls,
362                                            GNUNET_SCHEDULER_REASON_TIMEOUT);
363       else
364         GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
365                                            pos->cont_cls,
366                                            GNUNET_SCHEDULER_REASON_PREREQ_DONE);
367     }
368
369   if (pos->unique_id == 0)
370     GNUNET_free(pos->msg);
371   GNUNET_free (pos);
372   handle->current = NULL;
373 }
374
375 /**
376  * Transmit the next pending message, called by notify_transmit_ready
377  */
378 static size_t
379 transmit_pending (void *cls, size_t size, void *buf)
380 {
381   struct GNUNET_DHT_Handle *handle = cls;
382   size_t tsize;
383
384 #if DEBUG_DHT_API
385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386               "`%s': In transmit_pending\n", "DHT API");
387 #endif
388   if (buf == NULL)
389     {
390 #if DEBUG_DHT_API
391       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
393 #endif
394       finish (handle, GNUNET_SYSERR);
395       return 0;
396     }
397
398   handle->th = NULL;
399
400   if (handle->current != NULL)
401     {
402       tsize = ntohs (handle->current->msg->size);
403       if (size >= tsize)
404         {
405 #if DEBUG_DHT_API
406           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
407                       "`%s': Sending message size %d\n", "DHT API", tsize);
408 #endif
409           memcpy (buf, handle->current->msg, tsize);
410           finish (handle, GNUNET_OK);
411           return tsize;
412         }
413       else
414         {
415           return 0;
416         }
417     }
418   /* Have no pending request */
419   return 0;
420 }
421
422 /**
423  * Try to send messages from list of messages to send
424  */
425 static void
426 process_pending_message (struct GNUNET_DHT_Handle *handle)
427 {
428
429   if (handle->current == NULL)
430     return;                     /* action already pending */
431   if (GNUNET_YES != try_connect (handle))
432     {
433       finish (handle, GNUNET_SYSERR);
434       return;
435     }
436
437   if (NULL ==
438       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
439                                                          ntohs (handle->
440                                                                 current->msg->
441                                                                 size),
442                                                          handle->current->
443                                                          timeout, GNUNET_YES,
444                                                          &transmit_pending,
445                                                          handle)))
446     {
447 #if DEBUG_DHT_API
448       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
449                   "Failed to transmit request to dht service.\n");
450 #endif
451       finish (handle, GNUNET_SYSERR);
452       return;
453     }
454 #if DEBUG_DHT_API
455   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
456               "`%s': Scheduled sending message of size %d to service\n",
457               "DHT API", ntohs (handle->current->msg->size));
458 #endif
459 }
460
461 /**
462  * Send complete (or failed), call continuation if we have one.
463  * Forward declaration.
464  */
465 static void
466 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
467
468 /* Forward declaration */
469 static size_t
470 transmit_pending_retransmission (void *cls, size_t size, void *buf);
471
472 /**
473  * Try to send messages from list of messages to send
474  */
475 static void
476 process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
477 {
478
479   if (handle->current == NULL)
480     return;                     /* action already pending */
481   if (GNUNET_YES != try_connect (handle))
482     {
483       finish_retransmission (handle, GNUNET_SYSERR);
484       return;
485     }
486
487   if (NULL ==
488       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
489                                                          ntohs (handle->
490                                                                 current->msg->
491                                                                 size),
492                                                          handle->current->
493                                                          timeout, GNUNET_YES,
494                                                          &transmit_pending_retransmission,
495                                                          handle)))
496     {
497 #if DEBUG_DHT_API
498       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
499                   "Failed to transmit request to dht service.\n");
500 #endif
501       finish_retransmission (handle, GNUNET_SYSERR);
502       return;
503     }
504 #if DEBUG_DHT_API
505   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
506               "`%s': Scheduled sending message of size %d to service\n",
507               "DHT API", ntohs (handle->current->msg->size));
508 #endif
509 }
510
511 /**
512  * Send complete (or failed), call continuation if we have one.
513  */
514 static void
515 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
516 {
517   struct PendingMessage *pos = handle->current;
518   struct PendingMessageList *pending_list;
519 #if DEBUG_DHT_API
520   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
521 #endif
522   GNUNET_assert (pos == handle->retransmissions->message);
523   pending_list = handle->retransmissions;
524   handle->retransmissions = handle->retransmissions->next;
525   GNUNET_free (pending_list);
526
527   if (handle->retransmissions == NULL)
528     {
529       handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
530     }
531
532   if (handle->retransmissions != NULL)
533     {
534       handle->current = handle->retransmissions->message;
535       process_pending_retransmissions(handle);
536     }
537   else if (handle->retransmission_buffer != NULL)
538     {
539       handle->current = handle->retransmission_buffer;
540       process_pending_message(handle);
541     }
542 }
543
544 /**
545  * Handler for messages received from the DHT service
546  * a demultiplexer which handles numerous message types
547  *
548  */
549 void
550 service_message_handler (void *cls,
551                          const struct GNUNET_MessageHeader *msg)
552 {
553   struct GNUNET_DHT_Handle *handle = cls;
554   struct GNUNET_DHT_RouteResultMessage *dht_msg;
555   struct GNUNET_MessageHeader *enc_msg;
556   struct GNUNET_DHT_RouteHandle *route_handle;
557   uint64_t uid;
558   GNUNET_HashCode uid_hash;
559   size_t enc_size;
560
561   if (msg == NULL)
562     {
563 #if DEBUG_DHT_API
564       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565                   "`%s': Received NULL from server, connection down!\n",
566                   "DHT API");
567 #endif
568       GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
569       handle->client = GNUNET_CLIENT_connect (handle->sched, 
570                                               "dht",
571                                               handle->cfg);
572       if (handle->current != NULL)
573         {
574           finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */
575         }
576 #if RETRANSMIT
577       if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0))
578         {
579           handle->retransmit_stage = DHT_RETRANSMITTING;
580           handle->current = handle->retransmissions->message;
581           process_pending_retransmissions(handle);
582         }
583 #endif
584       return;
585     }
586
587   switch (ntohs (msg->type))
588     {
589     case GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT:
590       {
591         dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
592         uid = GNUNET_ntohll (dht_msg->unique_id);
593 #if DEBUG_DHT_API
594         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595                     "`%s': Received response to message (uid %llu)\n",
596                     "DHT API", uid);
597 #endif
598
599         hash_from_uid (uid, &uid_hash);
600         route_handle =
601           GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
602                                              &uid_hash);
603         if (route_handle == NULL)   /* We have no recollection of this request */
604           {
605 #if DEBUG_DHT_API
606             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
607                         "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
608                         "DHT API", uid);
609 #endif
610           }
611         else
612           {
613             enc_size =
614               ntohs (dht_msg->header.size) -
615               sizeof (struct GNUNET_DHT_RouteResultMessage);
616             GNUNET_assert (enc_size > 0);
617             enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
618             route_handle->iter (route_handle->iter_cls, enc_msg);
619           }
620
621         break;
622       }
623     default:
624       {
625         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
626                     "`%s': Received unknown message type %d\n", "DHT API",
627                     ntohs (msg->type));
628       }
629     }
630   GNUNET_CLIENT_receive (handle->client,
631                          &service_message_handler,
632                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
633
634 }
635
636
637 /**
638  * Initialize the connection with the DHT service.
639  *
640  * @param sched scheduler to use
641  * @param cfg configuration to use
642  * @param ht_len size of the internal hash table to use for
643  *               processing multiple GET/FIND requests in parallel
644  *
645  * @return handle to the DHT service, or NULL on error
646  */
647 struct GNUNET_DHT_Handle *
648 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
649                     const struct GNUNET_CONFIGURATION_Handle *cfg,
650                     unsigned int ht_len)
651 {
652   struct GNUNET_DHT_Handle *handle;
653
654   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
655   handle->cfg = cfg;
656   handle->sched = sched;
657   handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
658   handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
659   if (handle->client == NULL)
660     {
661       GNUNET_free (handle);
662       return NULL;
663     }
664   handle->outstanding_requests =
665     GNUNET_CONTAINER_multihashmap_create (ht_len);
666 #if DEBUG_DHT_API
667   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
668               "`%s': Connection to service in progress\n", "DHT API");
669 #endif
670   GNUNET_CLIENT_receive (handle->client,
671                          &service_message_handler,
672                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
673   return handle;
674 }
675
676
677 /**
678  * Shutdown connection with the DHT service.
679  *
680  * @param handle handle of the DHT connection to stop
681  */
682 void
683 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
684 {
685 #if DEBUG_DHT_API
686   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
688 #endif
689   GNUNET_assert (handle != NULL);
690   if (handle->th != NULL)       /* We have a live transmit request in the Aether */
691     {
692       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
693       handle->th = NULL;
694     }
695   if (handle->current != NULL)  /* We are trying to send something now, clean it up */
696     GNUNET_free (handle->current);
697
698   if (handle->client != NULL)   /* Finally, disconnect from the service */
699     {
700       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
701       handle->client = NULL;
702     }
703
704   GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
705   GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
706   GNUNET_free (handle);
707 }
708
709
710 /**
711  * Transmit the next pending message, called by notify_transmit_ready
712  */
713 static size_t
714 transmit_pending_retransmission (void *cls, size_t size, void *buf)
715 {
716   struct GNUNET_DHT_Handle *handle = cls;
717   size_t tsize;
718
719 #if DEBUG_DHT_API
720   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
721               "`%s': In transmit_pending\n", "DHT API");
722 #endif
723   if (buf == NULL)
724     {
725 #if DEBUG_DHT_API
726       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
727                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
728 #endif
729       finish_retransmission (handle, GNUNET_SYSERR);
730       return 0;
731     }
732
733   handle->th = NULL;
734
735   if (handle->current != NULL)
736     {
737       tsize = ntohs (handle->current->msg->size);
738       if (size >= tsize)
739         {
740 #if DEBUG_DHT_API
741           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
742                       "`%s': Sending message size %d\n", "DHT API", tsize);
743 #endif
744           memcpy (buf, handle->current->msg, tsize);
745           finish_retransmission (handle, GNUNET_OK);
746           return tsize;
747         }
748       else
749         {
750           return 0;
751         }
752     }
753   /* Have no pending request */
754   return 0;
755 }
756
757
758 /**
759  * Iterator called on each result obtained from a generic route
760  * operation
761  */
762 void
763 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
764 {
765   struct GNUNET_DHT_GetHandle *get_handle = cls;
766   struct GNUNET_DHT_GetResultMessage *result;
767   size_t data_size;
768   char *result_data;
769
770   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
771     return;
772
773   GNUNET_assert (ntohs (reply->size) >=
774                  sizeof (struct GNUNET_DHT_GetResultMessage));
775   result = (struct GNUNET_DHT_GetResultMessage *) reply;
776   data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
777
778   result_data = (char *) &result[1];    /* Set data pointer to end of message */
779
780   get_handle->get_context.iter (get_handle->get_context.iter_cls,
781                                 result->expiration, &result->key,
782                                 ntohs (result->type), data_size, result_data);
783 }
784
785
786 /**
787  * Iterator called on each result obtained from a generic route
788  * operation
789  */
790 void
791 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
792 {
793   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
794   struct GNUNET_MessageHeader *hello;
795   size_t hello_size;
796
797   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
798     {
799       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800                   "Received wrong type of response to a find peer request...\n");
801       return;
802     }
803
804
805   GNUNET_assert (ntohs (reply->size) >=
806                  sizeof (struct GNUNET_MessageHeader));
807   hello_size = ntohs(reply->size) - sizeof(struct GNUNET_MessageHeader);
808   hello = (struct GNUNET_MessageHeader *)&reply[1];
809
810   if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
811     {
812       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813                   "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
814       return;
815     }
816   find_peer_handle->find_peer_context.proc (find_peer_handle->
817                                             find_peer_context.proc_cls,
818                                             (struct GNUNET_HELLO_Message *)hello);
819 }
820
821 /**
822  * Perform an asynchronous FIND_PEER operation on the DHT.
823  *
824  * @param handle handle to the DHT service
825  * @param key the key to look up
826  * @param desired_replication_level how many peers should ultimately receive
827  *                this message (advisory only, target may be too high for the
828  *                given DHT or not hit exactly).
829  * @param options options for routing
830  * @param enc send the encapsulated message to a peer close to the key
831  * @param iter function to call on each result, NULL if no replies are expected
832  * @param iter_cls closure for iter
833  * @param timeout when to abort with an error if we fail to get
834  *                a confirmation for the request (when necessary) or how long
835  *                to wait for tramission to the service
836  * @param cont continuation to call when done;
837  *             reason will be TIMEOUT on error,
838  *             reason will be PREREQ_DONE on success
839  * @param cont_cls closure for cont
840  *
841  * @return handle to stop the request, NULL if the request is "fire and forget"
842  */
843 struct GNUNET_DHT_RouteHandle *
844 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
845                         const GNUNET_HashCode * key,
846                         unsigned int desired_replication_level,
847                         enum GNUNET_DHT_RouteOption options,
848                         const struct GNUNET_MessageHeader *enc,
849                         struct GNUNET_TIME_Relative timeout,
850                         GNUNET_DHT_ReplyProcessor iter,
851                         void *iter_cls,
852                         GNUNET_SCHEDULER_Task cont, void *cont_cls)
853 {
854   struct GNUNET_DHT_RouteHandle *route_handle;
855   struct PendingMessage *pending;
856   struct GNUNET_DHT_RouteMessage *message;
857   uint16_t msize;
858   GNUNET_HashCode uid_key;
859
860   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
861     return NULL;
862
863   if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
864     {
865       GNUNET_break (0);
866       return NULL;
867     }
868
869   route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
870   memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
871   route_handle->iter = iter;
872   route_handle->iter_cls = iter_cls;
873   route_handle->dht_handle = handle;
874   if (iter != NULL)
875     {
876       route_handle->uid = handle->uid_gen++;
877       hash_from_uid (route_handle->uid, &uid_key);
878       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
879                                          &uid_key, route_handle,
880                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
881     }
882   else
883     {
884       route_handle->uid = 0;
885     }
886
887 #if DEBUG_DHT_API
888       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
889                   "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
890 #endif
891
892   msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
893   message = GNUNET_malloc (msize);
894   message->header.size = htons (msize);
895   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE);
896   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
897   message->options = htonl (options);
898   message->desired_replication_level = htonl (options);
899   message->unique_id = GNUNET_htonll (route_handle->uid);
900   memcpy (&message[1], enc, ntohs (enc->size));
901   pending = GNUNET_malloc (sizeof (struct PendingMessage));
902   pending->msg = &message->header;
903   pending->timeout = timeout;
904   pending->cont = cont;
905   pending->cont_cls = cont_cls;
906   pending->unique_id = route_handle->uid;
907   if (handle->current == NULL)
908     {
909       handle->current = pending;
910       process_pending_message (handle);
911     }
912   else if ((handle->current != NULL) && (handle->retransmit_stage == DHT_RETRANSMITTING))
913   {
914     handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
915     handle->retransmission_buffer = pending;
916   }
917
918   route_handle->message = message;
919   return route_handle;
920 }
921
922
923 /**
924  * Perform an asynchronous GET operation on the DHT identified.
925  *
926  * @param handle handle to the DHT service
927  * @param timeout how long to wait for transmission of this request to the service
928  * @param type expected type of the response object
929  * @param key the key to look up
930  * @param iter function to call on each result
931  * @param iter_cls closure for iter
932  * @param cont continuation to call once message sent
933  * @param cont_cls closure for continuation
934  *
935  * @return handle to stop the async get
936  */
937 struct GNUNET_DHT_GetHandle *
938 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
939                       struct GNUNET_TIME_Relative timeout,
940                       uint32_t type,
941                       const GNUNET_HashCode * key,
942                       GNUNET_DHT_GetIterator iter,
943                       void *iter_cls,
944                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
945 {
946   struct GNUNET_DHT_GetHandle *get_handle;
947   struct GNUNET_DHT_GetMessage get_msg;
948
949   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
950     return NULL;
951
952   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
953   get_handle->get_context.iter = iter;
954   get_handle->get_context.iter_cls = iter_cls;
955
956 #if DEBUG_DHT_API
957   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958               "`%s': Inserting pending get request with key %s\n", "DHT API",
959               GNUNET_h2s (key));
960 #endif
961
962   get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
963   get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
964   get_msg.type = htons (type);
965
966   get_handle->route_handle =
967     GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout,
968                             &get_reply_iterator, get_handle, cont, cont_cls);
969
970   return get_handle;
971 }
972
973
974 /**
975  * Stop a previously issued routing request
976  *
977  * @param route_handle handle to the request to stop
978  * @param cont continuation to call once this message is sent to the service or times out
979  * @param cont_cls closure for the continuation
980  */
981 void
982 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
983                        GNUNET_SCHEDULER_Task cont, void *cont_cls)
984 {
985   struct PendingMessage *pending;
986   struct GNUNET_DHT_StopMessage *message;
987   size_t msize;
988   GNUNET_HashCode uid_key;
989
990   msize = sizeof (struct GNUNET_DHT_StopMessage);
991   message = GNUNET_malloc (msize);
992   message->header.size = htons (msize);
993   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
994 #if DEBUG_DHT_API
995   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996               "`%s': Remove outstanding request for uid %llu\n", "DHT API",
997               route_handle->uid);
998 #endif
999   message->unique_id = GNUNET_htonll (route_handle->uid);
1000   pending = GNUNET_malloc (sizeof (struct PendingMessage));
1001   pending->msg = (struct GNUNET_MessageHeader *) message;
1002   pending->timeout = GNUNET_TIME_relative_get_forever();
1003   pending->cont = cont;
1004   pending->cont_cls = cont_cls;
1005   pending->unique_id = 0; /* When finished is called, free pending->msg */
1006
1007   if (route_handle->dht_handle->current == NULL)
1008     {
1009       route_handle->dht_handle->current = pending;
1010       process_pending_message (route_handle->dht_handle);
1011     }
1012   else if ((route_handle->dht_handle->current != NULL) && (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING))
1013     {
1014       route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1015       route_handle->dht_handle->retransmission_buffer = pending;
1016     }
1017   else
1018     {
1019       GNUNET_break(0);
1020     }
1021
1022   hash_from_uid (route_handle->uid, &uid_key);
1023   GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
1024                  (route_handle->dht_handle->outstanding_requests, &uid_key,
1025                   route_handle) == GNUNET_YES);
1026
1027   GNUNET_free(route_handle->message);
1028   GNUNET_free(route_handle);
1029 }
1030
1031
1032 /**
1033  * Stop async DHT-get.
1034  *
1035  * @param get_handle handle to the GET operation to stop
1036  * @param cont continuation to call once this message is sent to the service or times out
1037  * @param cont_cls closure for the continuation
1038  */
1039 void
1040 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
1041                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
1042 {
1043   if ((get_handle->route_handle->dht_handle->current != NULL) &&
1044       (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1045     {
1046       if (cont != NULL)
1047         {
1048           GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
1049                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1050         }
1051       return;
1052     }
1053
1054 #if DEBUG_DHT_API
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056               "`%s': Removing pending get request with key %s, uid %llu\n",
1057               "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
1058               get_handle->route_handle->uid);
1059 #endif
1060   GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
1061   GNUNET_free (get_handle);
1062 }
1063
1064
1065 /**
1066  * Perform an asynchronous FIND PEER operation on the DHT.
1067  *
1068  * @param handle handle to the DHT service
1069  * @param timeout timeout for this request to be sent to the
1070  *        service
1071  * @param options routing options for this message
1072  * @param key the key to look up
1073  * @param proc function to call on each result
1074  * @param proc_cls closure for proc
1075  * @param cont continuation to call once message sent
1076  * @param cont_cls closure for continuation
1077  *
1078  * @return handle to stop the async get, NULL on error
1079  */
1080 struct GNUNET_DHT_FindPeerHandle *
1081 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
1082                             struct GNUNET_TIME_Relative timeout,
1083                             enum GNUNET_DHT_RouteOption options,
1084                             const GNUNET_HashCode * key,
1085                             GNUNET_DHT_FindPeerProcessor proc,
1086                             void *proc_cls,
1087                             GNUNET_SCHEDULER_Task cont,
1088                             void *cont_cls)
1089 {
1090   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
1091   struct GNUNET_MessageHeader find_peer_msg;
1092
1093   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))  /* Can't send right now, we have a pending message... */
1094     return NULL;
1095
1096   find_peer_handle =
1097     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
1098   find_peer_handle->find_peer_context.proc = proc;
1099   find_peer_handle->find_peer_context.proc_cls = proc_cls;
1100
1101 #if DEBUG_DHT_API
1102   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103               "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
1104               "FIND PEER", GNUNET_h2s (key));
1105 #endif
1106
1107   find_peer_msg.size = htons(sizeof(struct GNUNET_MessageHeader));
1108   find_peer_msg.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
1109   find_peer_handle->route_handle =
1110     GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg,
1111                             timeout, &find_peer_reply_iterator,
1112                             find_peer_handle, cont, cont_cls);
1113   return find_peer_handle;
1114 }
1115
1116 /**
1117  * Stop async find peer.  Frees associated resources.
1118  *
1119  * @param find_peer_handle GET operation to stop.
1120  * @param cont continuation to call once this message is sent to the service or times out
1121  * @param cont_cls closure for the continuation
1122  */
1123 void
1124 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
1125                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
1126 {
1127   if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
1128       (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1129     {
1130       if (cont != NULL)
1131         {
1132           GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
1133                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1134         }
1135       return;
1136     }
1137
1138 #if DEBUG_DHT_API
1139   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140               "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1141               "DHT API", "FIND PEER",
1142               GNUNET_h2s (&find_peer_handle->route_handle->key),
1143               find_peer_handle->route_handle->uid);
1144 #endif
1145   GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1146   GNUNET_free (find_peer_handle);
1147
1148 }
1149
1150
1151 /**
1152  * Perform a PUT operation storing data in the DHT.
1153  *
1154  * @param handle handle to DHT service
1155  * @param key the key to store under
1156  * @param type type of the value
1157  * @param size number of bytes in data; must be less than 64k
1158  * @param data the data to store
1159  * @param exp desired expiration time for the value
1160  * @param timeout how long to wait for transmission of this request
1161  * @param cont continuation to call when done;
1162  *             reason will be TIMEOUT on error,
1163  *             reason will be PREREQ_DONE on success
1164  * @param cont_cls closure for cont
1165  *
1166  * @return GNUNET_YES if put message is queued for transmission
1167  */
1168 void
1169 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1170                 const GNUNET_HashCode * key,
1171                 uint32_t type,
1172                 uint32_t size,
1173                 const char *data,
1174                 struct GNUNET_TIME_Absolute exp,
1175                 struct GNUNET_TIME_Relative timeout,
1176                 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1177 {
1178   struct GNUNET_DHT_PutMessage *put_msg;
1179   struct GNUNET_DHT_RouteHandle *put_route;
1180   size_t msize;
1181
1182   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1183     {
1184       if (cont != NULL)
1185         {
1186           GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1187                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1188         }
1189       return;
1190     }
1191
1192 #if DEBUG_DHT_API
1193   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1194               "`%s': Inserting pending put request with key %s\n", "DHT API",
1195               GNUNET_h2s (key));
1196 #endif
1197
1198   msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1199   put_msg = GNUNET_malloc (msize);
1200   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1201   put_msg->header.size = htons (msize);
1202   put_msg->type = htons (type);
1203   put_msg->data_size = htons (size);
1204   put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
1205   memcpy (&put_msg[1], data, size);
1206
1207   put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
1208                                       NULL, cont, cont_cls);
1209
1210   if (put_route == NULL) /* Route start failed! */
1211     {
1212       if (cont != NULL)
1213         {
1214           GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1215                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1216         }
1217     }
1218   else
1219     GNUNET_free(put_route);
1220
1221   GNUNET_free (put_msg);
1222 }