58493dd0fe1da020699282d62dab488bbbd94fa1
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  * TODO: retransmission of pending requests maybe happens now, at least
28  *       the code is in place to do so.  Need to add checks when api calls
29  *       happen to check if retransmission is in progress, and if so set
30  *       the single pending message for transmission once the list of
31  *       retries are done.
32  */
33
34 #include "platform.h"
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
45 #include "dht.h"
46
47 #define DEBUG_DHT_API GNUNET_NO
48
49 struct PendingMessage
50 {
51   /**
52    * Message that is pending
53    */
54   struct GNUNET_MessageHeader *msg;
55
56   /**
57    * Timeout for this message
58    */
59   struct GNUNET_TIME_Relative timeout;
60
61   /**
62    * Continuation to call on message send
63    * or message receipt confirmation
64    */
65   GNUNET_SCHEDULER_Task cont;
66
67   /**
68    * Continuation closure
69    */
70   void *cont_cls;
71
72   /**
73    * Unique ID for this request
74    */
75   uint64_t unique_id;
76
77 };
78
79 struct PendingMessageList
80 {
81   /**
82    * This is a singly linked list.
83    */
84   struct PendingMessageList *next;
85
86   /**
87    * The pending message.
88    */
89   struct PendingMessage *message;
90 };
91
92 struct GNUNET_DHT_GetContext
93 {
94   /**
95    * Iterator to call on data receipt
96    */
97   GNUNET_DHT_GetIterator iter;
98
99   /**
100    * Closure for the iterator callback
101    */
102   void *iter_cls;
103
104 };
105
106 struct GNUNET_DHT_FindPeerContext
107 {
108   /**
109    * Iterator to call on data receipt
110    */
111   GNUNET_DHT_FindPeerProcessor proc;
112
113   /**
114    * Closure for the iterator callback
115    */
116   void *proc_cls;
117
118 };
119
120 /**
121  * Handle to a route request
122  */
123 struct GNUNET_DHT_RouteHandle
124 {
125
126   /**
127    * Unique identifier for this request (for key collisions)
128    */
129   uint64_t uid;
130
131   /**
132    * Key that this get request is for
133    */
134   GNUNET_HashCode key;
135
136   /**
137    * Iterator to call on data receipt
138    */
139   GNUNET_DHT_ReplyProcessor iter;
140
141   /**
142    * Closure for the iterator callback
143    */
144   void *iter_cls;
145
146   /**
147    * Main handle to this DHT api
148    */
149   struct GNUNET_DHT_Handle *dht_handle;
150
151   /**
152    * The actual message sent for this request,
153    * used for retransmitting requests on service
154    * failure/reconnect.  Freed on route_stop.
155    */
156   struct GNUNET_DHT_RouteMessage *message;
157 };
158
159
160 /**
161  * Handle to control a get operation.
162  */
163 struct GNUNET_DHT_GetHandle
164 {
165   /**
166    * Handle to the actual route operation for the get
167    */
168   struct GNUNET_DHT_RouteHandle *route_handle;
169
170   /**
171    * The context of the get request
172    */
173   struct GNUNET_DHT_GetContext get_context;
174 };
175
176
177 /**
178  * Handle to control a find peer operation.
179  */
180 struct GNUNET_DHT_FindPeerHandle
181 {
182   /**
183      * Handle to the actual route operation for the request
184      */
185   struct GNUNET_DHT_RouteHandle *route_handle;
186
187     /**
188      * The context of the find peer request
189      */
190   struct GNUNET_DHT_FindPeerContext find_peer_context;
191 };
192
193
194 enum DHT_Retransmit_Stage
195 {
196   /**
197    * The API is not retransmitting anything at this time.
198    */
199   DHT_NOT_RETRANSMITTING,
200
201   /**
202    * The API is retransmitting, and nothing has been single
203    * queued for sending.
204    */
205   DHT_RETRANSMITTING,
206
207   /**
208    * The API is retransmitting, and a single message has been
209    * queued for transmission once finished.
210    */
211   DHT_RETRANSMITTING_MESSAGE_QUEUED
212 };
213
214
215 /**
216  * Connection to the DHT service.
217  */
218 struct GNUNET_DHT_Handle
219 {
220   /**
221    * Our scheduler.
222    */
223   struct GNUNET_SCHEDULER_Handle *sched;
224
225   /**
226    * Configuration to use.
227    */
228   const struct GNUNET_CONFIGURATION_Handle *cfg;
229
230   /**
231    * Socket (if available).
232    */
233   struct GNUNET_CLIENT_Connection *client;
234
235   /**
236    * Currently pending transmission request.
237    */
238   struct GNUNET_CLIENT_TransmitHandle *th;
239
240   /**
241    * Message we are currently sending, only allow
242    * a single message to be queued.  If not unique
243    * (typically a put request), await a confirmation
244    * from the service that the message was received.
245    * If unique, just fire and forget.
246    */
247   struct PendingMessage *current;
248
249   /**
250    * Hash map containing the current outstanding unique requests
251    */
252   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
253
254   /**
255    * Generator for unique ids.
256    */
257   uint64_t uid_gen;
258
259   /**
260    * Are we currently retransmitting requests?  If so queue a _single_
261    * new request when received.
262    */
263   enum DHT_Retransmit_Stage retransmit_stage;
264
265   /**
266    * Linked list of retranmissions, to be used in the event
267    * of a dht service disconnect/reconnect.
268    */
269   struct PendingMessageList *retransmissions;
270
271   /**
272    * A single pending message allowed to be scheduled
273    * during retransmission phase.
274    */
275   struct PendingMessage *retransmission_buffer;
276 };
277
278
279 /**
280  * Convert unique ID to hash code.
281  *
282  * @param uid unique ID to convert
283  * @param hash set to uid (extended with zeros)
284  */
285 static void
286 hash_from_uid (uint64_t uid,
287                GNUNET_HashCode *hash)
288 {
289   memset (hash, 0, sizeof(GNUNET_HashCode));
290   *((uint64_t*)hash) = uid;
291 }
292
293 #if RETRANSMIT
294 /**
295  * Iterator callback to retransmit each outstanding request
296  * because the connection to the DHT service went down (and
297  * came back).
298  *
299  *
300  */
301 static int retransmit_iterator (void *cls,
302                                 const GNUNET_HashCode * key,
303                                 void *value)
304 {
305   struct GNUNET_DHT_RouteHandle *route_handle = value;
306   struct PendingMessageList *pending_message_list;
307
308   pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
309   pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
310   pending_message_list->message->msg = &route_handle->message->header;
311   pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
312   pending_message_list->message->cont = NULL;
313   pending_message_list->message->cont_cls = NULL;
314   pending_message_list->message->unique_id = route_handle->uid;
315   /* Add the new pending message to the front of the retransmission list */
316   pending_message_list->next = route_handle->dht_handle->retransmissions;
317   route_handle->dht_handle->retransmissions = pending_message_list;
318
319   return GNUNET_OK;
320 }
321 #endif
322
323 /**
324  * Try to (re)connect to the dht service.
325  *
326  * @return GNUNET_YES on success, GNUNET_NO on failure.
327  */
328 static int
329 try_connect (struct GNUNET_DHT_Handle *handle)
330 {
331   if (handle->client != NULL)
332     return GNUNET_OK;
333   handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
334   if (handle->client != NULL)
335     return GNUNET_YES;
336 #if DEBUG_STATISTICS
337   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
338               _("Failed to connect to the dht service!\n"));
339 #endif
340   return GNUNET_NO;
341 }
342
343 /**
344  * Send complete (or failed), call continuation if we have one.
345  */
346 static void
347 finish (struct GNUNET_DHT_Handle *handle, int code)
348 {
349   struct PendingMessage *pos = handle->current;
350   GNUNET_HashCode uid_hash;
351 #if DEBUG_DHT_API
352   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
353 #endif
354   GNUNET_assert (pos != NULL);
355   hash_from_uid (pos->unique_id, &uid_hash);
356   if (pos->cont != NULL)
357     {
358       if (code == GNUNET_SYSERR)
359         GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
360                                            pos->cont_cls,
361                                            GNUNET_SCHEDULER_REASON_TIMEOUT);
362       else
363         GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
364                                            pos->cont_cls,
365                                            GNUNET_SCHEDULER_REASON_PREREQ_DONE);
366     }
367
368   GNUNET_assert(handle->th == NULL);
369   if (pos->unique_id == 0)
370     GNUNET_free(pos->msg);
371   GNUNET_free (pos);
372   handle->current = NULL;
373 }
374
375 /**
376  * Transmit the next pending message, called by notify_transmit_ready
377  */
378 static size_t
379 transmit_pending (void *cls, size_t size, void *buf)
380 {
381   struct GNUNET_DHT_Handle *handle = cls;
382   size_t tsize;
383
384 #if DEBUG_DHT_API
385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386               "`%s': In transmit_pending\n", "DHT API");
387 #endif
388   if (buf == NULL)
389     {
390 #if DEBUG_DHT_API
391       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
393 #endif
394       finish (handle, GNUNET_SYSERR);
395       return 0;
396     }
397
398   handle->th = NULL;
399
400   if (handle->current != NULL)
401     {
402       tsize = ntohs (handle->current->msg->size);
403       if (size >= tsize)
404         {
405 #if DEBUG_DHT_API
406           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
407                       "`%s': Sending message size %d\n", "DHT API", tsize);
408 #endif
409           memcpy (buf, handle->current->msg, tsize);
410           finish (handle, GNUNET_OK);
411           return tsize;
412         }
413       else
414         {
415           return 0;
416         }
417     }
418   /* Have no pending request */
419   return 0;
420 }
421
422 /**
423  * Try to send messages from list of messages to send
424  */
425 static void
426 process_pending_message (struct GNUNET_DHT_Handle *handle)
427 {
428
429   if (handle->current == NULL)
430     return;                     /* action already pending */
431   if (GNUNET_YES != try_connect (handle))
432     {
433       finish (handle, GNUNET_SYSERR);
434       return;
435     }
436
437   if (NULL ==
438       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
439                                                          ntohs (handle->
440                                                                 current->msg->
441                                                                 size),
442                                                          handle->current->
443                                                          timeout, GNUNET_YES,
444                                                          &transmit_pending,
445                                                          handle)))
446     {
447 #if DEBUG_DHT_API
448       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
449                   "Failed to transmit request to dht service.\n");
450 #endif
451       finish (handle, GNUNET_SYSERR);
452       return;
453     }
454 #if DEBUG_DHT_API
455   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
456               "`%s': Scheduled sending message of size %d to service\n",
457               "DHT API", ntohs (handle->current->msg->size));
458 #endif
459 }
460
461 /**
462  * Send complete (or failed), call continuation if we have one.
463  * Forward declaration.
464  */
465 static void
466 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
467
468 /* Forward declaration */
469 static size_t
470 transmit_pending_retransmission (void *cls, size_t size, void *buf);
471
472 /**
473  * Try to send messages from list of messages to send
474  */
475 static void
476 process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
477 {
478
479   if (handle->current == NULL)
480     return;                     /* action already pending */
481   if (GNUNET_YES != try_connect (handle))
482     {
483       finish_retransmission (handle, GNUNET_SYSERR);
484       return;
485     }
486
487   if (NULL ==
488       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
489                                                          ntohs (handle->
490                                                                 current->msg->
491                                                                 size),
492                                                          handle->current->
493                                                          timeout, GNUNET_YES,
494                                                          &transmit_pending_retransmission,
495                                                          handle)))
496     {
497 #if DEBUG_DHT_API
498       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
499                   "Failed to transmit request to dht service.\n");
500 #endif
501       finish_retransmission (handle, GNUNET_SYSERR);
502       return;
503     }
504 #if DEBUG_DHT_API
505   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
506               "`%s': Scheduled sending message of size %d to service\n",
507               "DHT API", ntohs (handle->current->msg->size));
508 #endif
509 }
510
511 /**
512  * Send complete (or failed), call continuation if we have one.
513  */
514 static void
515 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
516 {
517   struct PendingMessage *pos = handle->current;
518   struct PendingMessageList *pending_list;
519 #if DEBUG_DHT_API
520   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
521 #endif
522   GNUNET_assert (pos == handle->retransmissions->message);
523   pending_list = handle->retransmissions;
524   handle->retransmissions = handle->retransmissions->next;
525   GNUNET_free (pending_list);
526
527   if (handle->retransmissions == NULL)
528     {
529       handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
530     }
531
532   if (handle->retransmissions != NULL)
533     {
534       handle->current = handle->retransmissions->message;
535       process_pending_retransmissions(handle);
536     }
537   else if (handle->retransmission_buffer != NULL)
538     {
539       handle->current = handle->retransmission_buffer;
540       process_pending_message(handle);
541     }
542 }
543
544 /**
545  * Handler for messages received from the DHT service
546  * a demultiplexer which handles numerous message types
547  *
548  */
549 void
550 service_message_handler (void *cls,
551                          const struct GNUNET_MessageHeader *msg)
552 {
553   struct GNUNET_DHT_Handle *handle = cls;
554   struct GNUNET_DHT_RouteResultMessage *dht_msg;
555   struct GNUNET_MessageHeader *enc_msg;
556   struct GNUNET_DHT_RouteHandle *route_handle;
557   uint64_t uid;
558   GNUNET_HashCode uid_hash;
559   size_t enc_size;
560
561   if (msg == NULL)
562     {
563 #if DEBUG_DHT_API
564       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565                   "`%s': Received NULL from server, connection down!\n",
566                   "DHT API");
567 #endif
568       GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
569       handle->client = GNUNET_CLIENT_connect (handle->sched, 
570                                               "dht",
571                                               handle->cfg);
572       if (handle->current != NULL)
573         {
574           handle->th = NULL;
575           finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */
576         }
577 #if RETRANSMIT
578       if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0))
579         {
580           handle->retransmit_stage = DHT_RETRANSMITTING;
581           handle->current = handle->retransmissions->message;
582           process_pending_retransmissions(handle);
583         }
584 #endif
585       return;
586     }
587
588   switch (ntohs (msg->type))
589     {
590     case GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT:
591       {
592         dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
593         uid = GNUNET_ntohll (dht_msg->unique_id);
594 #if DEBUG_DHT_API
595         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
596                     "`%s': Received response to message (uid %llu)\n",
597                     "DHT API", uid);
598 #endif
599
600         hash_from_uid (uid, &uid_hash);
601         route_handle =
602           GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
603                                              &uid_hash);
604         if (route_handle == NULL)   /* We have no recollection of this request */
605           {
606 #if DEBUG_DHT_API
607             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
608                         "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
609                         "DHT API", uid);
610 #endif
611           }
612         else
613           {
614             enc_size =
615               ntohs (dht_msg->header.size) -
616               sizeof (struct GNUNET_DHT_RouteResultMessage);
617             GNUNET_assert (enc_size > 0);
618             enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
619             route_handle->iter (route_handle->iter_cls, enc_msg);
620           }
621
622         break;
623       }
624     default:
625       {
626         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
627                     "`%s': Received unknown message type %d\n", "DHT API",
628                     ntohs (msg->type));
629       }
630     }
631   GNUNET_CLIENT_receive (handle->client,
632                          &service_message_handler,
633                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
634
635 }
636
637
638 /**
639  * Initialize the connection with the DHT service.
640  *
641  * @param sched scheduler to use
642  * @param cfg configuration to use
643  * @param ht_len size of the internal hash table to use for
644  *               processing multiple GET/FIND requests in parallel
645  *
646  * @return handle to the DHT service, or NULL on error
647  */
648 struct GNUNET_DHT_Handle *
649 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
650                     const struct GNUNET_CONFIGURATION_Handle *cfg,
651                     unsigned int ht_len)
652 {
653   struct GNUNET_DHT_Handle *handle;
654
655   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
656   handle->cfg = cfg;
657   handle->sched = sched;
658   handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
659   handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
660   if (handle->client == NULL)
661     {
662       GNUNET_free (handle);
663       return NULL;
664     }
665   handle->outstanding_requests =
666     GNUNET_CONTAINER_multihashmap_create (ht_len);
667 #if DEBUG_DHT_API
668   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669               "`%s': Connection to service in progress\n", "DHT API");
670 #endif
671   GNUNET_CLIENT_receive (handle->client,
672                          &service_message_handler,
673                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
674   return handle;
675 }
676
677
678 /**
679  * Shutdown connection with the DHT service.
680  *
681  * @param handle handle of the DHT connection to stop
682  */
683 void
684 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
685 {
686 #if DEBUG_DHT_API
687   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
689 #endif
690   GNUNET_assert (handle != NULL);
691   if (handle->th != NULL)       /* We have a live transmit request */
692     {
693       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
694       handle->th = NULL;
695     }
696   if (handle->current != NULL)  /* We are trying to send something now, clean it up */
697     GNUNET_free (handle->current);
698
699   if (handle->client != NULL)   /* Finally, disconnect from the service */
700     {
701       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
702       handle->client = NULL;
703     }
704
705   GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
706   GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
707   GNUNET_free (handle);
708 }
709
710
711 /**
712  * Transmit the next pending message, called by notify_transmit_ready
713  */
714 static size_t
715 transmit_pending_retransmission (void *cls, size_t size, void *buf)
716 {
717   struct GNUNET_DHT_Handle *handle = cls;
718   size_t tsize;
719
720 #if DEBUG_DHT_API
721   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
722               "`%s': In transmit_pending\n", "DHT API");
723 #endif
724   if (buf == NULL)
725     {
726 #if DEBUG_DHT_API
727       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
728                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
729 #endif
730       finish_retransmission (handle, GNUNET_SYSERR);
731       return 0;
732     }
733
734   handle->th = NULL;
735
736   if (handle->current != NULL)
737     {
738       tsize = ntohs (handle->current->msg->size);
739       if (size >= tsize)
740         {
741 #if DEBUG_DHT_API
742           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
743                       "`%s': Sending message size %d\n", "DHT API", tsize);
744 #endif
745           memcpy (buf, handle->current->msg, tsize);
746           finish_retransmission (handle, GNUNET_OK);
747           return tsize;
748         }
749       else
750         {
751           return 0;
752         }
753     }
754   /* Have no pending request */
755   return 0;
756 }
757
758
759 /**
760  * Iterator called on each result obtained from a generic route
761  * operation
762  */
763 void
764 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
765 {
766   struct GNUNET_DHT_GetHandle *get_handle = cls;
767   struct GNUNET_DHT_GetResultMessage *result;
768   size_t data_size;
769   char *result_data;
770
771   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
772     return;
773
774   GNUNET_assert (ntohs (reply->size) >=
775                  sizeof (struct GNUNET_DHT_GetResultMessage));
776   result = (struct GNUNET_DHT_GetResultMessage *) reply;
777   data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
778
779   result_data = (char *) &result[1];    /* Set data pointer to end of message */
780
781   get_handle->get_context.iter (get_handle->get_context.iter_cls,
782                                 GNUNET_TIME_absolute_ntoh (result->expiration), &get_handle->route_handle->key,
783                                 ntohs (result->type), data_size, result_data);
784 }
785
786
787 /**
788  * Iterator called on each result obtained from a generic route
789  * operation
790  */
791 void
792 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
793 {
794   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
795   struct GNUNET_MessageHeader *hello;
796
797   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
798     {
799       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800                   "Received wrong type of response to a find peer request...\n");
801       return;
802     }
803
804
805   GNUNET_assert (ntohs (reply->size) >=
806                  sizeof (struct GNUNET_MessageHeader));
807   hello = (struct GNUNET_MessageHeader *)&reply[1];
808
809   if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
810     {
811       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812                   "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
813       return;
814     }
815   find_peer_handle->find_peer_context.proc (find_peer_handle->
816                                             find_peer_context.proc_cls,
817                                             (struct GNUNET_HELLO_Message *)hello);
818 }
819
820 /**
821  * Perform an asynchronous FIND_PEER operation on the DHT.
822  *
823  * @param handle handle to the DHT service
824  * @param key the key to look up
825  * @param desired_replication_level how many peers should ultimately receive
826  *                this message (advisory only, target may be too high for the
827  *                given DHT or not hit exactly).
828  * @param options options for routing
829  * @param enc send the encapsulated message to a peer close to the key
830  * @param iter function to call on each result, NULL if no replies are expected
831  * @param iter_cls closure for iter
832  * @param timeout when to abort with an error if we fail to get
833  *                a confirmation for the request (when necessary) or how long
834  *                to wait for tramission to the service
835  * @param cont continuation to call when done;
836  *             reason will be TIMEOUT on error,
837  *             reason will be PREREQ_DONE on success
838  * @param cont_cls closure for cont
839  *
840  * @return handle to stop the request, NULL if the request is "fire and forget"
841  */
842 struct GNUNET_DHT_RouteHandle *
843 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
844                         const GNUNET_HashCode * key,
845                         unsigned int desired_replication_level,
846                         enum GNUNET_DHT_RouteOption options,
847                         const struct GNUNET_MessageHeader *enc,
848                         struct GNUNET_TIME_Relative timeout,
849                         GNUNET_DHT_ReplyProcessor iter,
850                         void *iter_cls,
851                         GNUNET_SCHEDULER_Task cont, void *cont_cls)
852 {
853   struct GNUNET_DHT_RouteHandle *route_handle;
854   struct PendingMessage *pending;
855   struct GNUNET_DHT_RouteMessage *message;
856   uint16_t msize;
857   GNUNET_HashCode uid_key;
858
859   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
860     return NULL;
861
862   if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
863     {
864       GNUNET_break (0);
865       return NULL;
866     }
867
868   route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
869   memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
870   route_handle->iter = iter;
871   route_handle->iter_cls = iter_cls;
872   route_handle->dht_handle = handle;
873   route_handle->uid = handle->uid_gen++;
874   if (iter != NULL)
875     {
876       hash_from_uid (route_handle->uid, &uid_key);
877       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
878                                          &uid_key, route_handle,
879                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
880     }
881
882 #if DEBUG_DHT_API
883       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
884                   "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
885 #endif
886
887   msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
888   message = GNUNET_malloc (msize);
889   message->header.size = htons (msize);
890   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
891   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
892   message->options = htonl (options);
893   message->desired_replication_level = htonl (options);
894   message->unique_id = GNUNET_htonll (route_handle->uid);
895   memcpy (&message[1], enc, ntohs (enc->size));
896   pending = GNUNET_malloc (sizeof (struct PendingMessage));
897   pending->msg = &message->header;
898   pending->timeout = timeout;
899   pending->cont = cont;
900   pending->cont_cls = cont_cls;
901   pending->unique_id = route_handle->uid;
902   if (handle->current == NULL)
903     {
904       handle->current = pending;
905       process_pending_message (handle);
906     }
907   else
908   {
909     handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
910     handle->retransmission_buffer = pending;
911   }
912
913   route_handle->message = message;
914   return route_handle;
915 }
916
917
918 /**
919  * Perform an asynchronous GET operation on the DHT identified.
920  *
921  * @param handle handle to the DHT service
922  * @param timeout how long to wait for transmission of this request to the service
923  * @param type expected type of the response object
924  * @param key the key to look up
925  * @param iter function to call on each result
926  * @param iter_cls closure for iter
927  * @param cont continuation to call once message sent
928  * @param cont_cls closure for continuation
929  *
930  * @return handle to stop the async get
931  */
932 struct GNUNET_DHT_GetHandle *
933 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
934                       struct GNUNET_TIME_Relative timeout,
935                       uint32_t type,
936                       const GNUNET_HashCode * key,
937                       GNUNET_DHT_GetIterator iter,
938                       void *iter_cls,
939                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
940 {
941   struct GNUNET_DHT_GetHandle *get_handle;
942   struct GNUNET_DHT_GetMessage get_msg;
943
944   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
945     return NULL;
946
947   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
948   get_handle->get_context.iter = iter;
949   get_handle->get_context.iter_cls = iter_cls;
950
951 #if DEBUG_DHT_API
952   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
953               "`%s': Inserting pending get request with key %s\n", "DHT API",
954               GNUNET_h2s (key));
955 #endif
956
957   get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
958   get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
959   get_msg.type = htons (type);
960
961   get_handle->route_handle =
962     GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout,
963                             &get_reply_iterator, get_handle, cont, cont_cls);
964
965   return get_handle;
966 }
967
968
969 /**
970  * Stop a previously issued routing request
971  *
972  * @param route_handle handle to the request to stop
973  * @param cont continuation to call once this message is sent to the service or times out
974  * @param cont_cls closure for the continuation
975  */
976 void
977 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
978                        GNUNET_SCHEDULER_Task cont, void *cont_cls)
979 {
980   struct PendingMessage *pending;
981   struct GNUNET_DHT_StopMessage *message;
982   size_t msize;
983   GNUNET_HashCode uid_key;
984
985   msize = sizeof (struct GNUNET_DHT_StopMessage);
986   message = GNUNET_malloc (msize);
987   message->header.size = htons (msize);
988   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
989 #if DEBUG_DHT_API
990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991               "`%s': Remove outstanding request for uid %llu\n", "DHT API",
992               route_handle->uid);
993 #endif
994   message->unique_id = GNUNET_htonll (route_handle->uid);
995   memcpy(&message->key, &route_handle->key, sizeof(GNUNET_HashCode));
996   pending = GNUNET_malloc (sizeof (struct PendingMessage));
997   pending->msg = (struct GNUNET_MessageHeader *) message;
998   pending->timeout = GNUNET_TIME_relative_get_forever();
999   pending->cont = cont;
1000   pending->cont_cls = cont_cls;
1001   pending->unique_id = 0; /* When finished is called, free pending->msg */
1002
1003   if (route_handle->dht_handle->current == NULL)
1004     {
1005       route_handle->dht_handle->current = pending;
1006       process_pending_message (route_handle->dht_handle);
1007     }
1008   else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)
1009     {
1010       route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1011       route_handle->dht_handle->retransmission_buffer = pending;
1012     }
1013   else
1014     {
1015       GNUNET_free(pending);
1016       GNUNET_break(0);
1017     }
1018
1019   hash_from_uid (route_handle->uid, &uid_key);
1020   GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
1021                  (route_handle->dht_handle->outstanding_requests, &uid_key,
1022                   route_handle) == GNUNET_YES);
1023
1024   GNUNET_free(route_handle->message);
1025   GNUNET_free(route_handle);
1026 }
1027
1028
1029 /**
1030  * Stop async DHT-get.
1031  *
1032  * @param get_handle handle to the GET operation to stop
1033  * @param cont continuation to call once this message is sent to the service or times out
1034  * @param cont_cls closure for the continuation
1035  */
1036 void
1037 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
1038                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
1039 {
1040   if ((get_handle->route_handle->dht_handle->current != NULL) &&
1041       (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1042     {
1043       if (cont != NULL)
1044         {
1045           GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
1046                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1047         }
1048       return;
1049     }
1050
1051 #if DEBUG_DHT_API
1052   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1053               "`%s': Removing pending get request with key %s, uid %llu\n",
1054               "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
1055               get_handle->route_handle->uid);
1056 #endif
1057   GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
1058   GNUNET_free (get_handle);
1059 }
1060
1061
1062 /**
1063  * Perform an asynchronous FIND PEER operation on the DHT.
1064  *
1065  * @param handle handle to the DHT service
1066  * @param timeout timeout for this request to be sent to the
1067  *        service
1068  * @param options routing options for this message
1069  * @param key the key to look up
1070  * @param proc function to call on each result
1071  * @param proc_cls closure for proc
1072  * @param cont continuation to call once message sent
1073  * @param cont_cls closure for continuation
1074  *
1075  * @return handle to stop the async get, NULL on error
1076  */
1077 struct GNUNET_DHT_FindPeerHandle *
1078 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
1079                             struct GNUNET_TIME_Relative timeout,
1080                             enum GNUNET_DHT_RouteOption options,
1081                             const GNUNET_HashCode * key,
1082                             GNUNET_DHT_FindPeerProcessor proc,
1083                             void *proc_cls,
1084                             GNUNET_SCHEDULER_Task cont,
1085                             void *cont_cls)
1086 {
1087   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
1088   struct GNUNET_MessageHeader find_peer_msg;
1089
1090   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))  /* Can't send right now, we have a pending message... */
1091     return NULL;
1092
1093   find_peer_handle =
1094     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
1095   find_peer_handle->find_peer_context.proc = proc;
1096   find_peer_handle->find_peer_context.proc_cls = proc_cls;
1097
1098 #if DEBUG_DHT_API
1099   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1100               "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
1101               "FIND PEER", GNUNET_h2s (key));
1102 #endif
1103
1104   find_peer_msg.size = htons(sizeof(struct GNUNET_MessageHeader));
1105   find_peer_msg.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
1106   find_peer_handle->route_handle =
1107     GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg,
1108                             timeout, &find_peer_reply_iterator,
1109                             find_peer_handle, cont, cont_cls);
1110   return find_peer_handle;
1111 }
1112
1113 /**
1114  * Stop async find peer.  Frees associated resources.
1115  *
1116  * @param find_peer_handle GET operation to stop.
1117  * @param cont continuation to call once this message is sent to the service or times out
1118  * @param cont_cls closure for the continuation
1119  */
1120 void
1121 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
1122                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
1123 {
1124   if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
1125       (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1126     {
1127       if (cont != NULL)
1128         {
1129           GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
1130                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1131         }
1132       return;
1133     }
1134
1135 #if DEBUG_DHT_API
1136   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137               "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1138               "DHT API", "FIND PEER",
1139               GNUNET_h2s (&find_peer_handle->route_handle->key),
1140               find_peer_handle->route_handle->uid);
1141 #endif
1142   GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1143   GNUNET_free (find_peer_handle);
1144
1145 }
1146
1147
1148 /**
1149  * Perform a PUT operation storing data in the DHT.
1150  *
1151  * @param handle handle to DHT service
1152  * @param key the key to store under
1153  * @param type type of the value
1154  * @param size number of bytes in data; must be less than 64k
1155  * @param data the data to store
1156  * @param exp desired expiration time for the value
1157  * @param timeout how long to wait for transmission of this request
1158  * @param cont continuation to call when done;
1159  *             reason will be TIMEOUT on error,
1160  *             reason will be PREREQ_DONE on success
1161  * @param cont_cls closure for cont
1162  *
1163  * @return GNUNET_YES if put message is queued for transmission
1164  */
1165 void
1166 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1167                 const GNUNET_HashCode * key,
1168                 uint32_t type,
1169                 uint32_t size,
1170                 const char *data,
1171                 struct GNUNET_TIME_Absolute exp,
1172                 struct GNUNET_TIME_Relative timeout,
1173                 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1174 {
1175   struct GNUNET_DHT_PutMessage *put_msg;
1176   struct GNUNET_DHT_RouteHandle *put_route;
1177   size_t msize;
1178
1179   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1180     {
1181       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "handle->current is not null!\n");
1182       if (cont != NULL)
1183         {
1184           GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1185                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1186         }
1187       return;
1188     }
1189
1190 #if DEBUG_DHT_API
1191   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1192               "`%s': Inserting pending put request with key %s\n", "DHT API",
1193               GNUNET_h2s (key));
1194 #endif
1195
1196   msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1197   put_msg = GNUNET_malloc (msize);
1198   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1199   put_msg->header.size = htons (msize);
1200   put_msg->type = htons (type);
1201   put_msg->data_size = htons (size);
1202   put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
1203   memcpy (&put_msg[1], data, size);
1204
1205   put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
1206                                       NULL, cont, cont_cls);
1207
1208   if (put_route == NULL) /* Route start failed! */
1209     {
1210       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "route start for PUT failed!\n");
1211       if (cont != NULL)
1212         {
1213           GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1214                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1215         }
1216     }
1217   else
1218     GNUNET_free(put_route);
1219
1220   GNUNET_free (put_msg);
1221 }