doxygen fixes
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  * TODO: retransmission of pending requests maybe happens now, at least
28  *       the code is in place to do so.  Need to add checks when api calls
29  *       happen to check if retransmission is in progress, and if so set
30  *       the single pending message for transmission once the list of
31  *       retries are done.
32  */
33
34 #include "platform.h"
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
45 #include "dht.h"
46
47 #define DEBUG_DHT_API GNUNET_NO
48
49 #define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
50
51 struct PendingMessage
52 {
53   /**
54    * Message that is pending
55    */
56   struct GNUNET_MessageHeader *msg;
57
58   /**
59    * Timeout for this message
60    */
61   struct GNUNET_TIME_Relative timeout;
62
63   /**
64    * Continuation to call on message send
65    * or message receipt confirmation
66    */
67   GNUNET_SCHEDULER_Task cont;
68
69   /**
70    * Continuation closure
71    */
72   void *cont_cls;
73
74   /**
75    * Unique ID for this request
76    */
77   uint64_t unique_id;
78
79 };
80
81 struct PendingMessageList
82 {
83   /**
84    * This is a singly linked list.
85    */
86   struct PendingMessageList *next;
87
88   /**
89    * The pending message.
90    */
91   struct PendingMessage *message;
92 };
93
94 struct GNUNET_DHT_GetContext
95 {
96   /**
97    * Iterator to call on data receipt
98    */
99   GNUNET_DHT_GetIterator iter;
100
101   /**
102    * Closure for the iterator callback
103    */
104   void *iter_cls;
105
106 };
107
108 struct GNUNET_DHT_FindPeerContext
109 {
110   /**
111    * Iterator to call on data receipt
112    */
113   GNUNET_DHT_FindPeerProcessor proc;
114
115   /**
116    * Closure for the iterator callback
117    */
118   void *proc_cls;
119
120 };
121
122 /**
123  * Handle to a route request
124  */
125 struct GNUNET_DHT_RouteHandle
126 {
127
128   /**
129    * Unique identifier for this request (for key collisions)
130    */
131   uint64_t uid;
132
133   /**
134    * Key that this get request is for
135    */
136   GNUNET_HashCode key;
137
138   /**
139    * Iterator to call on data receipt
140    */
141   GNUNET_DHT_ReplyProcessor iter;
142
143   /**
144    * Closure for the iterator callback
145    */
146   void *iter_cls;
147
148   /**
149    * Main handle to this DHT api
150    */
151   struct GNUNET_DHT_Handle *dht_handle;
152
153   /**
154    * The actual message sent for this request,
155    * used for retransmitting requests on service
156    * failure/reconnect.  Freed on route_stop.
157    */
158   struct GNUNET_DHT_RouteMessage *message;
159 };
160
161
162 /**
163  * Handle to control a get operation.
164  */
165 struct GNUNET_DHT_GetHandle
166 {
167   /**
168    * Handle to the actual route operation for the get
169    */
170   struct GNUNET_DHT_RouteHandle *route_handle;
171
172   /**
173    * The context of the get request
174    */
175   struct GNUNET_DHT_GetContext get_context;
176 };
177
178
179 /**
180  * Handle to control a find peer operation.
181  */
182 struct GNUNET_DHT_FindPeerHandle
183 {
184   /**
185      * Handle to the actual route operation for the request
186      */
187   struct GNUNET_DHT_RouteHandle *route_handle;
188
189     /**
190      * The context of the find peer request
191      */
192   struct GNUNET_DHT_FindPeerContext find_peer_context;
193 };
194
195
196 enum DHT_Retransmit_Stage
197 {
198   /**
199    * The API is not retransmitting anything at this time.
200    */
201   DHT_NOT_RETRANSMITTING,
202
203   /**
204    * The API is retransmitting, and nothing has been single
205    * queued for sending.
206    */
207   DHT_RETRANSMITTING,
208
209   /**
210    * The API is retransmitting, and a single message has been
211    * queued for transmission once finished.
212    */
213   DHT_RETRANSMITTING_MESSAGE_QUEUED
214 };
215
216
217 /**
218  * Connection to the DHT service.
219  */
220 struct GNUNET_DHT_Handle
221 {
222   /**
223    * Our scheduler.
224    */
225   struct GNUNET_SCHEDULER_Handle *sched;
226
227   /**
228    * Configuration to use.
229    */
230   const struct GNUNET_CONFIGURATION_Handle *cfg;
231
232   /**
233    * Socket (if available).
234    */
235   struct GNUNET_CLIENT_Connection *client;
236
237   /**
238    * Currently pending transmission request.
239    */
240   struct GNUNET_CLIENT_TransmitHandle *th;
241
242   /**
243    * Message we are currently sending, only allow
244    * a single message to be queued.  If not unique
245    * (typically a put request), await a confirmation
246    * from the service that the message was received.
247    * If unique, just fire and forget.
248    */
249   struct PendingMessage *current;
250
251   /**
252    * Hash map containing the current outstanding unique requests
253    */
254   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
255
256   /**
257    * Generator for unique ids.
258    */
259   uint64_t uid_gen;
260
261   /**
262    * Are we currently retransmitting requests?  If so queue a _single_
263    * new request when received.
264    */
265   enum DHT_Retransmit_Stage retransmit_stage;
266
267   /**
268    * Linked list of retranmissions, to be used in the event
269    * of a dht service disconnect/reconnect.
270    */
271   struct PendingMessageList *retransmissions;
272
273   /**
274    * A single pending message allowed to be scheduled
275    * during retransmission phase.
276    */
277   struct PendingMessage *retransmission_buffer;
278 };
279
280
281 /**
282  * Convert unique ID to hash code.
283  *
284  * @param uid unique ID to convert
285  * @param hash set to uid (extended with zeros)
286  */
287 static void
288 hash_from_uid (uint64_t uid,
289                GNUNET_HashCode *hash)
290 {
291   memset (hash, 0, sizeof(GNUNET_HashCode));
292   *((uint64_t*)hash) = uid;
293 }
294
295 /**
296  * Iterator callback to retransmit each outstanding request
297  * because the connection to the DHT service went down (and
298  * came back).
299  *
300  *
301  */
302 static int retransmit_iterator (void *cls,
303                                 const GNUNET_HashCode * key,
304                                 void *value)
305 {
306   struct GNUNET_DHT_RouteHandle *route_handle = value;
307   struct PendingMessageList *pending_message_list;
308
309   pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
310   pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
311   pending_message_list->message->msg = &route_handle->message->header;
312   pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
313   pending_message_list->message->cont = NULL;
314   pending_message_list->message->cont_cls = NULL;
315   pending_message_list->message->unique_id = route_handle->uid;
316   /* Add the new pending message to the front of the retransmission list */
317   pending_message_list->next = route_handle->dht_handle->retransmissions;
318
319   return GNUNET_OK;
320 }
321
322 /**
323  * Try to (re)connect to the dht service.
324  *
325  * @return GNUNET_YES on success, GNUNET_NO on failure.
326  */
327 static int
328 try_connect (struct GNUNET_DHT_Handle *handle)
329 {
330   if (handle->client != NULL)
331     return GNUNET_OK;
332   handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
333   if (handle->client != NULL)
334     return GNUNET_YES;
335 #if DEBUG_STATISTICS
336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
337               _("Failed to connect to the dht service!\n"));
338 #endif
339   return GNUNET_NO;
340 }
341
342 /**
343  * Send complete (or failed), call continuation if we have one.
344  */
345 static void
346 finish (struct GNUNET_DHT_Handle *handle, int code)
347 {
348   struct PendingMessage *pos = handle->current;
349   GNUNET_HashCode uid_hash;
350   hash_from_uid (pos->unique_id, &uid_hash);
351 #if DEBUG_DHT_API
352   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
353 #endif
354   GNUNET_assert (pos != NULL);
355
356   if (pos->cont != NULL)
357     {
358       if (code == GNUNET_SYSERR)
359         GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
360                                            pos->cont_cls,
361                                            GNUNET_SCHEDULER_REASON_TIMEOUT);
362       else
363         GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
364                                            pos->cont_cls,
365                                            GNUNET_SCHEDULER_REASON_PREREQ_DONE);
366     }
367
368   if (pos->unique_id == 0)
369     GNUNET_free(pos->msg);
370   GNUNET_free (pos);
371   handle->current = NULL;
372 }
373
374 /**
375  * Transmit the next pending message, called by notify_transmit_ready
376  */
377 static size_t
378 transmit_pending (void *cls, size_t size, void *buf)
379 {
380   struct GNUNET_DHT_Handle *handle = cls;
381   size_t tsize;
382
383 #if DEBUG_DHT_API
384   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
385               "`%s': In transmit_pending\n", "DHT API");
386 #endif
387   if (buf == NULL)
388     {
389 #if DEBUG_DHT_API
390       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
391                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
392 #endif
393       finish (handle, GNUNET_SYSERR);
394       return 0;
395     }
396
397   handle->th = NULL;
398
399   if (handle->current != NULL)
400     {
401       tsize = ntohs (handle->current->msg->size);
402       if (size >= tsize)
403         {
404 #if DEBUG_DHT_API
405           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
406                       "`%s': Sending message size %d\n", "DHT API", tsize);
407 #endif
408           memcpy (buf, handle->current->msg, tsize);
409           finish (handle, GNUNET_OK);
410           return tsize;
411         }
412       else
413         {
414           return 0;
415         }
416     }
417   /* Have no pending request */
418   return 0;
419 }
420
421 /**
422  * Try to send messages from list of messages to send
423  */
424 static void
425 process_pending_message (struct GNUNET_DHT_Handle *handle)
426 {
427
428   if (handle->current == NULL)
429     return;                     /* action already pending */
430   if (GNUNET_YES != try_connect (handle))
431     {
432       finish (handle, GNUNET_SYSERR);
433       return;
434     }
435
436   if (NULL ==
437       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
438                                                          ntohs (handle->
439                                                                 current->msg->
440                                                                 size),
441                                                          handle->current->
442                                                          timeout, GNUNET_YES,
443                                                          &transmit_pending,
444                                                          handle)))
445     {
446 #if DEBUG_DHT_API
447       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
448                   "Failed to transmit request to dht service.\n");
449 #endif
450       finish (handle, GNUNET_SYSERR);
451       return;
452     }
453 #if DEBUG_DHT_API
454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
455               "`%s': Scheduled sending message of size %d to service\n",
456               "DHT API", ntohs (handle->current->msg->size));
457 #endif
458 }
459
460 /**
461  * Send complete (or failed), call continuation if we have one.
462  * Forward declaration.
463  */
464 static void
465 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
466
467 /* Forward declaration */
468 static size_t
469 transmit_pending_retransmission (void *cls, size_t size, void *buf);
470
471 /**
472  * Try to send messages from list of messages to send
473  */
474 static void
475 process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
476 {
477
478   if (handle->current == NULL)
479     return;                     /* action already pending */
480   if (GNUNET_YES != try_connect (handle))
481     {
482       finish_retransmission (handle, GNUNET_SYSERR);
483       return;
484     }
485
486   if (NULL ==
487       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
488                                                          ntohs (handle->
489                                                                 current->msg->
490                                                                 size),
491                                                          handle->current->
492                                                          timeout, GNUNET_YES,
493                                                          &transmit_pending_retransmission,
494                                                          handle)))
495     {
496 #if DEBUG_DHT_API
497       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498                   "Failed to transmit request to dht service.\n");
499 #endif
500       finish_retransmission (handle, GNUNET_SYSERR);
501       return;
502     }
503 #if DEBUG_DHT_API
504   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
505               "`%s': Scheduled sending message of size %d to service\n",
506               "DHT API", ntohs (handle->current->msg->size));
507 #endif
508 }
509
510 /**
511  * Send complete (or failed), call continuation if we have one.
512  */
513 static void
514 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
515 {
516   struct PendingMessage *pos = handle->current;
517   struct PendingMessageList *pending_list;
518 #if DEBUG_DHT_API
519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
520 #endif
521   GNUNET_assert (pos == handle->retransmissions->message);
522   pending_list = handle->retransmissions;
523   handle->retransmissions = handle->retransmissions->next;
524   GNUNET_free (pending_list);
525
526   if (handle->retransmissions == NULL)
527     {
528       handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
529     }
530
531   if (handle->retransmissions != NULL)
532     {
533       handle->current = handle->retransmissions->message;
534       process_pending_retransmissions(handle);
535     }
536   else if (handle->retransmission_buffer != NULL)
537     {
538       handle->current = handle->retransmission_buffer;
539       process_pending_message(handle);
540     }
541 }
542
543 /**
544  * Handler for messages received from the DHT service
545  * a demultiplexer which handles numerous message types
546  *
547  */
548 void
549 service_message_handler (void *cls,
550                          const struct GNUNET_MessageHeader *msg)
551 {
552   struct GNUNET_DHT_Handle *handle = cls;
553   struct GNUNET_DHT_RouteResultMessage *dht_msg;
554   struct GNUNET_MessageHeader *enc_msg;
555   struct GNUNET_DHT_RouteHandle *route_handle;
556   uint64_t uid;
557   GNUNET_HashCode uid_hash;
558   size_t enc_size;
559
560   if (msg == NULL)
561     {
562 #if DEBUG_DHT_API
563       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
564                   "`%s': Received NULL from server, connection down!\n",
565                   "DHT API");
566 #endif
567       GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
568       handle->client = GNUNET_CLIENT_connect (handle->sched, 
569                                               "dht", 
570                                               handle->cfg);
571
572       handle->retransmit_stage = DHT_RETRANSMITTING;
573       GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle);
574       handle->current = handle->retransmissions->message;
575       process_pending_retransmissions(handle);
576       return;
577     }
578
579   switch (ntohs (msg->type))
580     {
581     case GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT:
582       {
583         dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
584         uid = GNUNET_ntohll (dht_msg->unique_id);
585 #if DEBUG_DHT_API
586         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
587                     "`%s': Received response to message (uid %llu)\n",
588                     "DHT API", uid);
589 #endif
590
591         hash_from_uid (uid, &uid_hash);
592         route_handle =
593           GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
594                                              &uid_hash);
595         if (route_handle == NULL)   /* We have no recollection of this request */
596           {
597 #if DEBUG_DHT_API
598             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599                         "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
600                         "DHT API", uid);
601 #endif
602           }
603         else
604           {
605             enc_size =
606               ntohs (dht_msg->header.size) -
607               sizeof (struct GNUNET_DHT_RouteResultMessage);
608             GNUNET_assert (enc_size > 0);
609             enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
610             route_handle->iter (route_handle->iter_cls, enc_msg);
611           }
612
613         break;
614       }
615     default:
616       {
617         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
618                     "`%s': Received unknown message type %d\n", "DHT API",
619                     ntohs (msg->type));
620       }
621     }
622   GNUNET_CLIENT_receive (handle->client,
623                          &service_message_handler,
624                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
625
626 }
627
628
629 /**
630  * Initialize the connection with the DHT service.
631  *
632  * @param sched scheduler to use
633  * @param cfg configuration to use
634  * @param ht_len size of the internal hash table to use for
635  *               processing multiple GET/FIND requests in parallel
636  *
637  * @return handle to the DHT service, or NULL on error
638  */
639 struct GNUNET_DHT_Handle *
640 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
641                     const struct GNUNET_CONFIGURATION_Handle *cfg,
642                     unsigned int ht_len)
643 {
644   struct GNUNET_DHT_Handle *handle;
645
646   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
647   handle->cfg = cfg;
648   handle->sched = sched;
649   handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
650   handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
651   if (handle->client == NULL)
652     {
653       GNUNET_free (handle);
654       return NULL;
655     }
656   handle->outstanding_requests =
657     GNUNET_CONTAINER_multihashmap_create (ht_len);
658 #if DEBUG_DHT_API
659   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
660               "`%s': Connection to service in progress\n", "DHT API");
661 #endif
662   GNUNET_CLIENT_receive (handle->client,
663                          &service_message_handler,
664                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
665   return handle;
666 }
667
668
669 /**
670  * Shutdown connection with the DHT service.
671  *
672  * @param handle handle of the DHT connection to stop
673  */
674 void
675 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
676 {
677 #if DEBUG_DHT_API
678   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
679               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
680 #endif
681   GNUNET_assert (handle != NULL);
682   if (handle->th != NULL)       /* We have a live transmit request in the Aether */
683     {
684       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
685       handle->th = NULL;
686     }
687   if (handle->current != NULL)  /* We are trying to send something now, clean it up */
688     GNUNET_free (handle->current);
689
690   if (handle->client != NULL)   /* Finally, disconnect from the service */
691     {
692       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
693       handle->client = NULL;
694     }
695
696   GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
697   GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
698   GNUNET_free (handle);
699 }
700
701
702 /**
703  * Transmit the next pending message, called by notify_transmit_ready
704  */
705 static size_t
706 transmit_pending_retransmission (void *cls, size_t size, void *buf)
707 {
708   struct GNUNET_DHT_Handle *handle = cls;
709   size_t tsize;
710
711 #if DEBUG_DHT_API
712   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
713               "`%s': In transmit_pending\n", "DHT API");
714 #endif
715   if (buf == NULL)
716     {
717 #if DEBUG_DHT_API
718       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
720 #endif
721       finish_retransmission (handle, GNUNET_SYSERR);
722       return 0;
723     }
724
725   handle->th = NULL;
726
727   if (handle->current != NULL)
728     {
729       tsize = ntohs (handle->current->msg->size);
730       if (size >= tsize)
731         {
732 #if DEBUG_DHT_API
733           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734                       "`%s': Sending message size %d\n", "DHT API", tsize);
735 #endif
736           memcpy (buf, handle->current->msg, tsize);
737           finish_retransmission (handle, GNUNET_OK);
738           return tsize;
739         }
740       else
741         {
742           return 0;
743         }
744     }
745   /* Have no pending request */
746   return 0;
747 }
748
749
750 /**
751  * Iterator called on each result obtained from a generic route
752  * operation
753  */
754 void
755 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
756 {
757   struct GNUNET_DHT_GetHandle *get_handle = cls;
758   struct GNUNET_DHT_GetResultMessage *result;
759   size_t data_size;
760   char *result_data;
761
762   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
763     return;
764
765   GNUNET_assert (ntohs (reply->size) >=
766                  sizeof (struct GNUNET_DHT_GetResultMessage));
767   result = (struct GNUNET_DHT_GetResultMessage *) reply;
768   data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
769
770   result_data = (char *) &result[1];    /* Set data pointer to end of message */
771
772   get_handle->get_context.iter (get_handle->get_context.iter_cls,
773                                 result->expiration, &result->key,
774                                 ntohs (result->type), data_size, result_data);
775 }
776
777
778 /**
779  * Iterator called on each result obtained from a generic route
780  * operation
781  */
782 void
783 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
784 {
785   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
786   struct GNUNET_MessageHeader *hello;
787   size_t hello_size;
788
789   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
790     {
791       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792                   "Received wrong type of response to a find peer request...\n");
793       return;
794     }
795
796
797   GNUNET_assert (ntohs (reply->size) >=
798                  sizeof (struct GNUNET_MessageHeader));
799   hello_size = ntohs(reply->size) - sizeof(struct GNUNET_MessageHeader);
800   hello = (struct GNUNET_MessageHeader *)&reply[1];
801
802   if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
803     {
804       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
805                   "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
806       return;
807     }
808   find_peer_handle->find_peer_context.proc (find_peer_handle->
809                                             find_peer_context.proc_cls,
810                                             (struct GNUNET_HELLO_Message *)hello);
811 }
812
813 /**
814  * Perform an asynchronous FIND_PEER operation on the DHT.
815  *
816  * @param handle handle to the DHT service
817  * @param key the key to look up
818  * @param desired_replication_level how many peers should ultimately receive
819  *                this message (advisory only, target may be too high for the
820  *                given DHT or not hit exactly).
821  * @param options options for routing
822  * @param enc send the encapsulated message to a peer close to the key
823  * @param iter function to call on each result, NULL if no replies are expected
824  * @param iter_cls closure for iter
825  * @param timeout when to abort with an error if we fail to get
826  *                a confirmation for the request (when necessary) or how long
827  *                to wait for tramission to the service
828  * @param cont continuation to call when done;
829  *             reason will be TIMEOUT on error,
830  *             reason will be PREREQ_DONE on success
831  * @param cont_cls closure for cont
832  *
833  * @return handle to stop the request, NULL if the request is "fire and forget"
834  */
835 struct GNUNET_DHT_RouteHandle *
836 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
837                         const GNUNET_HashCode * key,
838                         unsigned int desired_replication_level,
839                         enum GNUNET_DHT_RouteOption options,
840                         const struct GNUNET_MessageHeader *enc,
841                         struct GNUNET_TIME_Relative timeout,
842                         GNUNET_DHT_ReplyProcessor iter,
843                         void *iter_cls,
844                         GNUNET_SCHEDULER_Task cont, void *cont_cls)
845 {
846   struct GNUNET_DHT_RouteHandle *route_handle;
847   struct PendingMessage *pending;
848   struct GNUNET_DHT_RouteMessage *message;
849   uint16_t msize;
850   GNUNET_HashCode uid_key;
851
852   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
853     return NULL;
854
855   if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
856     {
857       GNUNET_break (0);
858       return NULL;
859     }
860
861   route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
862   memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
863   route_handle->iter = iter;
864   route_handle->iter_cls = iter_cls;
865   route_handle->dht_handle = handle;
866   if (iter != NULL)
867     {
868       route_handle->uid = handle->uid_gen++;
869       hash_from_uid (route_handle->uid, &uid_key);
870       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
871                                          &uid_key, route_handle,
872                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
873     }
874   else
875     {
876       route_handle->uid = 0;
877     }
878
879 #if DEBUG_DHT_API
880       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881                   "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
882 #endif
883
884   msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
885   message = GNUNET_malloc (msize);
886   message->header.size = htons (msize);
887   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE);
888   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
889   message->options = htonl (options);
890   message->desired_replication_level = htonl (options);
891   message->unique_id = GNUNET_htonll (route_handle->uid);
892   memcpy (&message[1], enc, ntohs (enc->size));
893   pending = GNUNET_malloc (sizeof (struct PendingMessage));
894   pending->msg = &message->header;
895   pending->timeout = timeout;
896   pending->cont = cont;
897   pending->cont_cls = cont_cls;
898   pending->unique_id = route_handle->uid;
899   if (handle->current == NULL)
900     {
901       handle->current = pending;
902       process_pending_message (handle);
903     }
904   else if ((handle->current != NULL) && (handle->retransmit_stage == DHT_RETRANSMITTING))
905   {
906     handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
907     handle->retransmission_buffer = pending;
908   }
909
910   route_handle->message = message;
911   return route_handle;
912 }
913
914
915 /**
916  * Perform an asynchronous GET operation on the DHT identified.
917  *
918  * @param handle handle to the DHT service
919  * @param timeout how long to wait for transmission of this request to the service
920  * @param type expected type of the response object
921  * @param key the key to look up
922  * @param iter function to call on each result
923  * @param iter_cls closure for iter
924  * @param cont continuation to call once message sent
925  * @param cont_cls closure for continuation
926  *
927  * @return handle to stop the async get
928  */
929 struct GNUNET_DHT_GetHandle *
930 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
931                       struct GNUNET_TIME_Relative timeout,
932                       uint32_t type,
933                       const GNUNET_HashCode * key,
934                       GNUNET_DHT_GetIterator iter,
935                       void *iter_cls,
936                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
937 {
938   struct GNUNET_DHT_GetHandle *get_handle;
939   struct GNUNET_DHT_GetMessage get_msg;
940
941   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
942     return NULL;
943
944   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
945   get_handle->get_context.iter = iter;
946   get_handle->get_context.iter_cls = iter_cls;
947
948 #if DEBUG_DHT_API
949   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
950               "`%s': Inserting pending get request with key %s\n", "DHT API",
951               GNUNET_h2s (key));
952 #endif
953
954   get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
955   get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
956   get_msg.type = htons (type);
957
958   get_handle->route_handle =
959     GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout,
960                             &get_reply_iterator, get_handle, cont, cont_cls);
961
962   return get_handle;
963 }
964
965
966 /**
967  * Stop a previously issued routing request
968  *
969  * @param route_handle handle to the request to stop
970  * @param cont continuation to call once this message is sent to the service or times out
971  * @param cont_cls closure for the continuation
972  */
973 void
974 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
975                        GNUNET_SCHEDULER_Task cont, void *cont_cls)
976 {
977   struct PendingMessage *pending;
978   struct GNUNET_DHT_StopMessage *message;
979   size_t msize;
980   GNUNET_HashCode uid_key;
981
982   msize = sizeof (struct GNUNET_DHT_StopMessage);
983   message = GNUNET_malloc (msize);
984   message->header.size = htons (msize);
985   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
986 #if DEBUG_DHT_API
987   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
988               "`%s': Remove outstanding request for uid %llu\n", "DHT API",
989               route_handle->uid);
990 #endif
991   message->unique_id = GNUNET_htonll (route_handle->uid);
992   GNUNET_assert (route_handle->dht_handle->current == NULL);
993   pending = GNUNET_malloc (sizeof (struct PendingMessage));
994   pending->msg = (struct GNUNET_MessageHeader *) message;
995   pending->timeout = DEFAULT_DHT_TIMEOUT;
996   pending->cont = cont;
997   pending->cont_cls = cont_cls;
998   pending->unique_id = 0; /* When finished is called, free pending->msg */
999
1000   if (route_handle->dht_handle->current == NULL)
1001     {
1002       route_handle->dht_handle->current = pending;
1003       process_pending_message (route_handle->dht_handle);
1004     }
1005   else if ((route_handle->dht_handle->current != NULL) && (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING))
1006     {
1007       route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1008       route_handle->dht_handle->retransmission_buffer = pending;
1009     }
1010   else
1011     {
1012       GNUNET_break(0);
1013     }
1014
1015   hash_from_uid (route_handle->uid, &uid_key);
1016   GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
1017                  (route_handle->dht_handle->outstanding_requests, &uid_key,
1018                   route_handle) == GNUNET_YES);
1019
1020   GNUNET_free(route_handle->message);
1021   GNUNET_free(route_handle);
1022 }
1023
1024
1025 /**
1026  * Stop async DHT-get.
1027  *
1028  * @param get_handle handle to the GET operation to stop
1029  * @param cont continuation to call once this message is sent to the service or times out
1030  * @param cont_cls closure for the continuation
1031  */
1032 void
1033 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
1034                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
1035 {
1036   if ((get_handle->route_handle->dht_handle->current != NULL) &&
1037       (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1038     {
1039       if (cont != NULL)
1040         {
1041           GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
1042                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1043         }
1044       return;
1045     }
1046
1047 #if DEBUG_DHT_API
1048   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049               "`%s': Removing pending get request with key %s, uid %llu\n",
1050               "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
1051               get_handle->route_handle->uid);
1052 #endif
1053   GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
1054   GNUNET_free (get_handle);
1055 }
1056
1057
1058 /**
1059  * Perform an asynchronous FIND PEER operation on the DHT.
1060  *
1061  * @param handle handle to the DHT service
1062  * @param timeout timeout for this request to be sent to the
1063  *        service
1064  * @param options routing options for this message
1065  * @param key the key to look up
1066  * @param proc function to call on each result
1067  * @param proc_cls closure for proc
1068  * @param cont continuation to call once message sent
1069  * @param cont_cls closure for continuation
1070  *
1071  * @return handle to stop the async get, NULL on error
1072  */
1073 struct GNUNET_DHT_FindPeerHandle *
1074 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
1075                             struct GNUNET_TIME_Relative timeout,
1076                             enum GNUNET_DHT_RouteOption options,
1077                             const GNUNET_HashCode * key,
1078                             GNUNET_DHT_FindPeerProcessor proc,
1079                             void *proc_cls,
1080                             GNUNET_SCHEDULER_Task cont,
1081                             void *cont_cls)
1082 {
1083   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
1084   struct GNUNET_MessageHeader find_peer_msg;
1085
1086   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))  /* Can't send right now, we have a pending message... */
1087     return NULL;
1088
1089   find_peer_handle =
1090     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
1091   find_peer_handle->find_peer_context.proc = proc;
1092   find_peer_handle->find_peer_context.proc_cls = proc_cls;
1093
1094 #if DEBUG_DHT_API
1095   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1096               "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
1097               "FIND PEER", GNUNET_h2s (key));
1098 #endif
1099
1100   find_peer_msg.size = htons(sizeof(struct GNUNET_MessageHeader));
1101   find_peer_msg.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
1102   find_peer_handle->route_handle =
1103     GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg,
1104                             timeout, &find_peer_reply_iterator,
1105                             find_peer_handle, cont, cont_cls);
1106   return find_peer_handle;
1107 }
1108
1109 /**
1110  * Stop async find peer.  Frees associated resources.
1111  *
1112  * @param find_peer_handle GET operation to stop.
1113  * @param cont continuation to call once this message is sent to the service or times out
1114  * @param cont_cls closure for the continuation
1115  */
1116 void
1117 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
1118                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
1119 {
1120   if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
1121       (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1122     {
1123       if (cont != NULL)
1124         {
1125           GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
1126                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1127         }
1128       return;
1129     }
1130
1131 #if DEBUG_DHT_API
1132   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1133               "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1134               "DHT API", "FIND PEER",
1135               GNUNET_h2s (&find_peer_handle->route_handle->key),
1136               find_peer_handle->route_handle->uid);
1137 #endif
1138   GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1139   GNUNET_free (find_peer_handle);
1140
1141 }
1142
1143
1144 /**
1145  * Perform a PUT operation storing data in the DHT.
1146  *
1147  * @param handle handle to DHT service
1148  * @param key the key to store under
1149  * @param type type of the value
1150  * @param size number of bytes in data; must be less than 64k
1151  * @param data the data to store
1152  * @param exp desired expiration time for the value
1153  * @param timeout how long to wait for transmission of this request
1154  * @param cont continuation to call when done;
1155  *             reason will be TIMEOUT on error,
1156  *             reason will be PREREQ_DONE on success
1157  * @param cont_cls closure for cont
1158  *
1159  * @return GNUNET_YES if put message is queued for transmission
1160  */
1161 void
1162 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1163                 const GNUNET_HashCode * key,
1164                 uint32_t type,
1165                 uint32_t size,
1166                 const char *data,
1167                 struct GNUNET_TIME_Absolute exp,
1168                 struct GNUNET_TIME_Relative timeout,
1169                 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1170 {
1171   struct GNUNET_DHT_PutMessage *put_msg;
1172   struct GNUNET_DHT_RouteHandle *put_route;
1173   size_t msize;
1174
1175   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1176     {
1177       if (cont != NULL)
1178         {
1179           GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1180                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1181         }
1182       return;
1183     }
1184
1185 #if DEBUG_DHT_API
1186   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1187               "`%s': Inserting pending put request with key %s\n", "DHT API",
1188               GNUNET_h2s (key));
1189 #endif
1190
1191   msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1192   put_msg = GNUNET_malloc (msize);
1193   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1194   put_msg->header.size = htons (msize);
1195   put_msg->type = htons (type);
1196   put_msg->data_size = htons (size);
1197   put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
1198   memcpy (&put_msg[1], data, size);
1199
1200   put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
1201                                       NULL, cont, cont_cls);
1202
1203   if (put_route == NULL) /* Route start failed! */
1204     {
1205       if (cont != NULL)
1206         {
1207           GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1208                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1209         }
1210     }
1211   else
1212     GNUNET_free(put_route);
1213
1214   GNUNET_free (put_msg);
1215 }