nitpicks
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  *
27  * TODO: retransmission of pending requests maybe happens now, at least
28  *       the code is in place to do so.  Need to add checks when api calls
29  *       happen to check if retransmission is in progress, and if so set
30  *       the single pending message for transmission once the list of
31  *       retries are done.
32  */
33
34 #include "platform.h"
35 #include "gnunet_bandwidth_lib.h"
36 #include "gnunet_client_lib.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_container_lib.h"
39 #include "gnunet_arm_service.h"
40 #include "gnunet_hello_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_server_lib.h"
43 #include "gnunet_time_lib.h"
44 #include "gnunet_dht_service.h"
45 #include "dht.h"
46
47 #define DEBUG_DHT_API GNUNET_NO
48
49 struct PendingMessage
50 {
51   /**
52    * Message that is pending
53    */
54   struct GNUNET_MessageHeader *msg;
55
56   /**
57    * Timeout for this message
58    */
59   struct GNUNET_TIME_Relative timeout;
60
61   /**
62    * Continuation to call on message send
63    * or message receipt confirmation
64    */
65   GNUNET_SCHEDULER_Task cont;
66
67   /**
68    * Continuation closure
69    */
70   void *cont_cls;
71
72   /**
73    * Unique ID for this request
74    */
75   uint64_t unique_id;
76
77   /**
78    * Free the saved message once sent, set
79    * to GNUNET_YES for messages that don't
80    * receive responses!
81    */
82   int free_on_send;
83
84 };
85
86 struct PendingMessageList
87 {
88   /**
89    * This is a singly linked list.
90    */
91   struct PendingMessageList *next;
92
93   /**
94    * The pending message.
95    */
96   struct PendingMessage *message;
97 };
98
99 struct GNUNET_DHT_GetContext
100 {
101   /**
102    * Iterator to call on data receipt
103    */
104   GNUNET_DHT_GetIterator iter;
105
106   /**
107    * Closure for the iterator callback
108    */
109   void *iter_cls;
110
111 };
112
113 struct GNUNET_DHT_FindPeerContext
114 {
115   /**
116    * Iterator to call on data receipt
117    */
118   GNUNET_DHT_FindPeerProcessor proc;
119
120   /**
121    * Closure for the iterator callback
122    */
123   void *proc_cls;
124
125 };
126
127 /**
128  * Handle to a route request
129  */
130 struct GNUNET_DHT_RouteHandle
131 {
132
133   /**
134    * Unique identifier for this request (for key collisions)
135    */
136   uint64_t uid;
137
138   /**
139    * Key that this get request is for
140    */
141   GNUNET_HashCode key;
142
143   /**
144    * Iterator to call on data receipt
145    */
146   GNUNET_DHT_ReplyProcessor iter;
147
148   /**
149    * Closure for the iterator callback
150    */
151   void *iter_cls;
152
153   /**
154    * Main handle to this DHT api
155    */
156   struct GNUNET_DHT_Handle *dht_handle;
157
158   /**
159    * The actual message sent for this request,
160    * used for retransmitting requests on service
161    * failure/reconnect.  Freed on route_stop.
162    */
163   struct GNUNET_DHT_RouteMessage *message;
164 };
165
166
167 /**
168  * Handle to control a get operation.
169  */
170 struct GNUNET_DHT_GetHandle
171 {
172   /**
173    * Handle to the actual route operation for the get
174    */
175   struct GNUNET_DHT_RouteHandle *route_handle;
176
177   /**
178    * The context of the get request
179    */
180   struct GNUNET_DHT_GetContext get_context;
181 };
182
183
184 /**
185  * Handle to control a find peer operation.
186  */
187 struct GNUNET_DHT_FindPeerHandle
188 {
189   /**
190      * Handle to the actual route operation for the request
191      */
192   struct GNUNET_DHT_RouteHandle *route_handle;
193
194     /**
195      * The context of the find peer request
196      */
197   struct GNUNET_DHT_FindPeerContext find_peer_context;
198 };
199
200
201 enum DHT_Retransmit_Stage
202 {
203   /**
204    * The API is not retransmitting anything at this time.
205    */
206   DHT_NOT_RETRANSMITTING,
207
208   /**
209    * The API is retransmitting, and nothing has been single
210    * queued for sending.
211    */
212   DHT_RETRANSMITTING,
213
214   /**
215    * The API is retransmitting, and a single message has been
216    * queued for transmission once finished.
217    */
218   DHT_RETRANSMITTING_MESSAGE_QUEUED
219 };
220
221
222 /**
223  * Connection to the DHT service.
224  */
225 struct GNUNET_DHT_Handle
226 {
227   /**
228    * Our scheduler.
229    */
230   struct GNUNET_SCHEDULER_Handle *sched;
231
232   /**
233    * Configuration to use.
234    */
235   const struct GNUNET_CONFIGURATION_Handle *cfg;
236
237   /**
238    * Socket (if available).
239    */
240   struct GNUNET_CLIENT_Connection *client;
241
242   /**
243    * Currently pending transmission request.
244    */
245   struct GNUNET_CLIENT_TransmitHandle *th;
246
247   /**
248    * Message we are currently sending, only allow
249    * a single message to be queued.  If not unique
250    * (typically a put request), await a confirmation
251    * from the service that the message was received.
252    * If unique, just fire and forget.
253    */
254   struct PendingMessage *current;
255
256   /**
257    * Hash map containing the current outstanding unique requests
258    */
259   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
260
261   /**
262    * Generator for unique ids.
263    */
264   uint64_t uid_gen;
265
266   /**
267    * Are we currently retransmitting requests?  If so queue a _single_
268    * new request when received.
269    */
270   enum DHT_Retransmit_Stage retransmit_stage;
271
272   /**
273    * Linked list of retranmissions, to be used in the event
274    * of a dht service disconnect/reconnect.
275    */
276   struct PendingMessageList *retransmissions;
277
278   /**
279    * A single pending message allowed to be scheduled
280    * during retransmission phase.
281    */
282   struct PendingMessage *retransmission_buffer;
283 };
284
285
286 /**
287  * Convert unique ID to hash code.
288  *
289  * @param uid unique ID to convert
290  * @param hash set to uid (extended with zeros)
291  */
292 static void
293 hash_from_uid (uint64_t uid,
294                GNUNET_HashCode *hash)
295 {
296   memset (hash, 0, sizeof(GNUNET_HashCode));
297   *((uint64_t*)hash) = uid;
298 }
299
300 #if RETRANSMIT
301 /**
302  * Iterator callback to retransmit each outstanding request
303  * because the connection to the DHT service went down (and
304  * came back).
305  *
306  *
307  */
308 static int retransmit_iterator (void *cls,
309                                 const GNUNET_HashCode * key,
310                                 void *value)
311 {
312   struct GNUNET_DHT_RouteHandle *route_handle = value;
313   struct PendingMessageList *pending_message_list;
314
315   pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
316   pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
317   pending_message_list->message->msg = &route_handle->message->header;
318   pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
319   pending_message_list->message->cont = NULL;
320   pending_message_list->message->cont_cls = NULL;
321   pending_message_list->message->unique_id = route_handle->uid;
322   /* Add the new pending message to the front of the retransmission list */
323   pending_message_list->next = route_handle->dht_handle->retransmissions;
324   route_handle->dht_handle->retransmissions = pending_message_list;
325
326   return GNUNET_OK;
327 }
328 #endif
329
330 /**
331  * Try to (re)connect to the dht service.
332  *
333  * @return GNUNET_YES on success, GNUNET_NO on failure.
334  */
335 static int
336 try_connect (struct GNUNET_DHT_Handle *handle)
337 {
338   if (handle->client != NULL)
339     return GNUNET_OK;
340   handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
341   if (handle->client != NULL)
342     return GNUNET_YES;
343 #if DEBUG_STATISTICS
344   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
345               _("Failed to connect to the dht service!\n"));
346 #endif
347   return GNUNET_NO;
348 }
349
350 /**
351  * Send complete (or failed), call continuation if we have one.
352  */
353 static void
354 finish (struct GNUNET_DHT_Handle *handle, int code)
355 {
356   struct PendingMessage *pos = handle->current;
357   GNUNET_HashCode uid_hash;
358 #if DEBUG_DHT_API
359   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
360 #endif
361   GNUNET_assert (pos != NULL);
362   hash_from_uid (pos->unique_id, &uid_hash);
363   if (pos->cont != NULL)
364     {
365       if (code == GNUNET_SYSERR)
366         GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
367                                            pos->cont_cls,
368                                            GNUNET_SCHEDULER_REASON_TIMEOUT);
369       else
370         GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
371                                            pos->cont_cls,
372                                            GNUNET_SCHEDULER_REASON_PREREQ_DONE);
373     }
374
375   GNUNET_assert(handle->th == NULL);
376   if (pos->free_on_send == GNUNET_YES)
377     GNUNET_free(pos->msg);
378   GNUNET_free (pos);
379   handle->current = NULL;
380 }
381
382 /**
383  * Transmit the next pending message, called by notify_transmit_ready
384  */
385 static size_t
386 transmit_pending (void *cls, size_t size, void *buf)
387 {
388   struct GNUNET_DHT_Handle *handle = cls;
389   size_t tsize;
390
391 #if DEBUG_DHT_API
392   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
393               "`%s': In transmit_pending\n", "DHT API");
394 #endif
395   if (buf == NULL)
396     {
397 #if DEBUG_DHT_API
398       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
400 #endif
401       finish (handle, GNUNET_SYSERR);
402       return 0;
403     }
404
405   handle->th = NULL;
406
407   if (handle->current != NULL)
408     {
409       tsize = ntohs (handle->current->msg->size);
410       if (size >= tsize)
411         {
412 #if DEBUG_DHT_API
413           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
414                       "`%s': Sending message size %d\n", "DHT API", tsize);
415 #endif
416           memcpy (buf, handle->current->msg, tsize);
417           finish (handle, GNUNET_OK);
418           return tsize;
419         }
420       else
421         {
422           return 0;
423         }
424     }
425   /* Have no pending request */
426   return 0;
427 }
428
429 /**
430  * Try to send messages from list of messages to send
431  */
432 static void
433 process_pending_message (struct GNUNET_DHT_Handle *handle)
434 {
435
436   if (handle->current == NULL)
437     return;                     /* action already pending */
438   if (GNUNET_YES != try_connect (handle))
439     {
440       finish (handle, GNUNET_SYSERR);
441       return;
442     }
443
444   if (NULL ==
445       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
446                                                          ntohs (handle->
447                                                                 current->msg->
448                                                                 size),
449                                                          handle->current->
450                                                          timeout, GNUNET_YES,
451                                                          &transmit_pending,
452                                                          handle)))
453     {
454 #if DEBUG_DHT_API
455       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
456                   "Failed to transmit request to dht service.\n");
457 #endif
458       finish (handle, GNUNET_SYSERR);
459       return;
460     }
461 #if DEBUG_DHT_API
462   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
463               "`%s': Scheduled sending message of size %d to service\n",
464               "DHT API", ntohs (handle->current->msg->size));
465 #endif
466 }
467
468 /**
469  * Send complete (or failed), call continuation if we have one.
470  * Forward declaration.
471  */
472 static void
473 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
474
475 /* Forward declaration */
476 static size_t
477 transmit_pending_retransmission (void *cls, size_t size, void *buf);
478
479 /**
480  * Try to send messages from list of messages to send
481  */
482 static void
483 process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
484 {
485
486   if (handle->current == NULL)
487     return;                     /* action already pending */
488   if (GNUNET_YES != try_connect (handle))
489     {
490       finish_retransmission (handle, GNUNET_SYSERR);
491       return;
492     }
493
494   if (NULL ==
495       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
496                                                          ntohs (handle->
497                                                                 current->msg->
498                                                                 size),
499                                                          handle->current->
500                                                          timeout, GNUNET_YES,
501                                                          &transmit_pending_retransmission,
502                                                          handle)))
503     {
504 #if DEBUG_DHT_API
505       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
506                   "Failed to transmit request to dht service.\n");
507 #endif
508       finish_retransmission (handle, GNUNET_SYSERR);
509       return;
510     }
511 #if DEBUG_DHT_API
512   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513               "`%s': Scheduled sending message of size %d to service\n",
514               "DHT API", ntohs (handle->current->msg->size));
515 #endif
516 }
517
518 /**
519  * Send complete (or failed), call continuation if we have one.
520  */
521 static void
522 finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
523 {
524   struct PendingMessage *pos = handle->current;
525   struct PendingMessageList *pending_list;
526 #if DEBUG_DHT_API
527   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
528 #endif
529   GNUNET_assert (pos == handle->retransmissions->message);
530   pending_list = handle->retransmissions;
531   handle->retransmissions = handle->retransmissions->next;
532   GNUNET_free (pending_list);
533
534   if (handle->retransmissions == NULL)
535     {
536       handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
537     }
538
539   if (handle->retransmissions != NULL)
540     {
541       handle->current = handle->retransmissions->message;
542       process_pending_retransmissions(handle);
543     }
544   else if (handle->retransmission_buffer != NULL)
545     {
546       handle->current = handle->retransmission_buffer;
547       process_pending_message(handle);
548     }
549 }
550
551 /**
552  * Handler for messages received from the DHT service
553  * a demultiplexer which handles numerous message types
554  *
555  */
556 void
557 service_message_handler (void *cls,
558                          const struct GNUNET_MessageHeader *msg)
559 {
560   struct GNUNET_DHT_Handle *handle = cls;
561   struct GNUNET_DHT_RouteResultMessage *dht_msg;
562   struct GNUNET_MessageHeader *enc_msg;
563   struct GNUNET_DHT_RouteHandle *route_handle;
564   uint64_t uid;
565   GNUNET_HashCode uid_hash;
566   size_t enc_size;
567
568   if (msg == NULL)
569     {
570 #if DEBUG_DHT_API
571       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
572                   "`%s': Received NULL from server, connection down!\n",
573                   "DHT API");
574 #endif
575       GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
576       handle->client = GNUNET_CLIENT_connect (handle->sched, 
577                                               "dht",
578                                               handle->cfg);
579       if (handle->current != NULL)
580         {
581           handle->th = NULL;
582           finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */
583         }
584 #if RETRANSMIT
585       if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0))
586         {
587           handle->retransmit_stage = DHT_RETRANSMITTING;
588           handle->current = handle->retransmissions->message;
589           process_pending_retransmissions(handle);
590         }
591 #endif
592       return;
593     }
594
595   switch (ntohs (msg->type))
596     {
597     case GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT:
598       {
599         dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
600         uid = GNUNET_ntohll (dht_msg->unique_id);
601 #if DEBUG_DHT_API
602         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603                     "`%s': Received response to message (uid %llu)\n",
604                     "DHT API", uid);
605 #endif
606
607         hash_from_uid (uid, &uid_hash);
608         route_handle =
609           GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
610                                              &uid_hash);
611         if (route_handle == NULL)   /* We have no recollection of this request */
612           {
613 #if DEBUG_DHT_API
614             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
615                         "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
616                         "DHT API", uid);
617 #endif
618           }
619         else
620           {
621             enc_size =
622               ntohs (dht_msg->header.size) -
623               sizeof (struct GNUNET_DHT_RouteResultMessage);
624             GNUNET_assert (enc_size > 0);
625             enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
626             route_handle->iter (route_handle->iter_cls, enc_msg);
627           }
628
629         break;
630       }
631     default:
632       {
633         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
634                     "`%s': Received unknown message type %d\n", "DHT API",
635                     ntohs (msg->type));
636       }
637     }
638   GNUNET_CLIENT_receive (handle->client,
639                          &service_message_handler,
640                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
641
642 }
643
644
645 /**
646  * Initialize the connection with the DHT service.
647  *
648  * @param sched scheduler to use
649  * @param cfg configuration to use
650  * @param ht_len size of the internal hash table to use for
651  *               processing multiple GET/FIND requests in parallel
652  *
653  * @return handle to the DHT service, or NULL on error
654  */
655 struct GNUNET_DHT_Handle *
656 GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
657                     const struct GNUNET_CONFIGURATION_Handle *cfg,
658                     unsigned int ht_len)
659 {
660   struct GNUNET_DHT_Handle *handle;
661
662   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
663   handle->cfg = cfg;
664   handle->sched = sched;
665   handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
666   handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
667   if (handle->client == NULL)
668     {
669       GNUNET_free (handle);
670       return NULL;
671     }
672   handle->outstanding_requests =
673     GNUNET_CONTAINER_multihashmap_create (ht_len);
674 #if DEBUG_DHT_API
675   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676               "`%s': Connection to service in progress\n", "DHT API");
677 #endif
678   GNUNET_CLIENT_receive (handle->client,
679                          &service_message_handler,
680                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
681   return handle;
682 }
683
684
685 /**
686  * Shutdown connection with the DHT service.
687  *
688  * @param handle handle of the DHT connection to stop
689  */
690 void
691 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
692 {
693 #if DEBUG_DHT_API
694   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
695               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
696 #endif
697   GNUNET_assert (handle != NULL);
698   if (handle->th != NULL)       /* We have a live transmit request */
699     {
700       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
701       handle->th = NULL;
702     }
703   if (handle->current != NULL)  /* We are trying to send something now, clean it up */
704     GNUNET_free (handle->current);
705
706   if (handle->client != NULL)   /* Finally, disconnect from the service */
707     {
708       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
709       handle->client = NULL;
710     }
711
712   GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
713   GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
714   GNUNET_free (handle);
715 }
716
717
718 /**
719  * Transmit the next pending message, called by notify_transmit_ready
720  */
721 static size_t
722 transmit_pending_retransmission (void *cls, size_t size, void *buf)
723 {
724   struct GNUNET_DHT_Handle *handle = cls;
725   size_t tsize;
726
727 #if DEBUG_DHT_API
728   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
729               "`%s': In transmit_pending\n", "DHT API");
730 #endif
731   if (buf == NULL)
732     {
733 #if DEBUG_DHT_API
734       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
736 #endif
737       finish_retransmission (handle, GNUNET_SYSERR);
738       return 0;
739     }
740
741   handle->th = NULL;
742
743   if (handle->current != NULL)
744     {
745       tsize = ntohs (handle->current->msg->size);
746       if (size >= tsize)
747         {
748 #if DEBUG_DHT_API
749           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
750                       "`%s': Sending message size %d\n", "DHT API", tsize);
751 #endif
752           memcpy (buf, handle->current->msg, tsize);
753           finish_retransmission (handle, GNUNET_OK);
754           return tsize;
755         }
756       else
757         {
758           return 0;
759         }
760     }
761   /* Have no pending request */
762   return 0;
763 }
764
765
766 /**
767  * Iterator called on each result obtained from a generic route
768  * operation
769  */
770 void
771 get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
772 {
773   struct GNUNET_DHT_GetHandle *get_handle = cls;
774   struct GNUNET_DHT_GetResultMessage *result;
775   size_t data_size;
776   char *result_data;
777
778   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
779     return;
780
781   GNUNET_assert (ntohs (reply->size) >=
782                  sizeof (struct GNUNET_DHT_GetResultMessage));
783   result = (struct GNUNET_DHT_GetResultMessage *) reply;
784   data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
785
786   result_data = (char *) &result[1];    /* Set data pointer to end of message */
787
788   get_handle->get_context.iter (get_handle->get_context.iter_cls,
789                                 GNUNET_TIME_absolute_ntoh (result->expiration), &get_handle->route_handle->key,
790                                 ntohs (result->type), data_size, result_data);
791 }
792
793
794 /**
795  * Iterator called on each result obtained from a generic route
796  * operation
797  */
798 void
799 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
800 {
801   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
802   struct GNUNET_MessageHeader *hello;
803
804   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
805     {
806       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807                   "Received wrong type of response to a find peer request...\n");
808       return;
809     }
810
811
812   GNUNET_assert (ntohs (reply->size) >=
813                  sizeof (struct GNUNET_MessageHeader));
814   hello = (struct GNUNET_MessageHeader *)&reply[1];
815
816   if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
817     {
818       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819                   "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
820       return;
821     }
822   find_peer_handle->find_peer_context.proc (find_peer_handle->
823                                             find_peer_context.proc_cls,
824                                             (struct GNUNET_HELLO_Message *)hello);
825 }
826
827 /**
828  * Send a message to the DHT telling it to start issuing random GET
829  * requests every 'frequency' milliseconds.
830  *
831  * @param handle handle to the DHT service
832  * @param frequency delay (in milliseconds) between sending malicious messages
833  * @param cont continuation to call once the message is sent
834  * @param cont_cls closure for continuation
835  *
836  * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
837  */
838 int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
839 {
840   struct GNUNET_DHT_ControlMessage *msg;
841   struct PendingMessage *pending;
842
843   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
844     return GNUNET_NO;
845
846   msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
847   msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
848   msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
849   msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET);
850   msg->variable = htons(frequency);
851
852   pending = GNUNET_malloc (sizeof (struct PendingMessage));
853   pending->msg = &msg->header;
854   pending->timeout = GNUNET_TIME_relative_get_forever();
855   pending->free_on_send = GNUNET_YES;
856   pending->cont = cont;
857   pending->cont_cls = cont_cls;
858   pending->unique_id = 0;
859
860   if (handle->current == NULL)
861     {
862       handle->current = pending;
863       process_pending_message (handle);
864     }
865   else
866   {
867     handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
868     handle->retransmission_buffer = pending;
869   }
870
871   return GNUNET_YES;
872 }
873
874 /**
875  * Send a message to the DHT telling it to issue a single find
876  * peer request using the peers unique identifier as key.  This
877  * is used to fill the routing table, and is normally controlled
878  * by the DHT itself.  However, for testing and perhaps more
879  * close control over the DHT, this can be explicitly managed.
880  *
881  * @param handle handle to the DHT service
882  * @param cont continuation to call once the message is sent
883  * @param cont_cls closure for continuation
884  *
885  * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
886  */
887 int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
888                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
889 {
890   struct GNUNET_DHT_ControlMessage *msg;
891   struct PendingMessage *pending;
892
893   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
894     return GNUNET_NO;
895
896   msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
897   msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
898   msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
899   msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
900
901   pending = GNUNET_malloc (sizeof (struct PendingMessage));
902   pending->msg = &msg->header;
903   pending->timeout = GNUNET_TIME_relative_get_forever();
904   pending->free_on_send = GNUNET_YES;
905   pending->cont = cont;
906   pending->cont_cls = cont_cls;
907   pending->unique_id = 0;
908
909   if (handle->current == NULL)
910     {
911       handle->current = pending;
912       process_pending_message (handle);
913     }
914   else
915   {
916     handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
917     handle->retransmission_buffer = pending;
918   }
919
920   return GNUNET_YES;
921 }
922
923 /**
924  * Send a message to the DHT telling it to start issuing random PUT
925  * requests every 'frequency' milliseconds.
926  *
927  * @param handle handle to the DHT service
928  * @param frequency delay (in milliseconds) between sending malicious messages
929  * @param cont continuation to call once the message is sent
930  * @param cont_cls closure for continuation
931  *
932  * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
933  */
934 int GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
935 {
936   struct GNUNET_DHT_ControlMessage *msg;
937   struct PendingMessage *pending;
938
939   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
940     return GNUNET_NO;
941
942   msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
943   msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
944   msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
945   msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT);
946   msg->variable = htons(frequency);
947
948   pending = GNUNET_malloc (sizeof (struct PendingMessage));
949   pending->msg = &msg->header;
950   pending->timeout = GNUNET_TIME_relative_get_forever();
951   pending->free_on_send = GNUNET_YES;
952   pending->cont = cont;
953   pending->cont_cls = cont_cls;
954   pending->unique_id = 0;
955
956   if (handle->current == NULL)
957     {
958       handle->current = pending;
959       process_pending_message (handle);
960     }
961   else
962   {
963     handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
964     handle->retransmission_buffer = pending;
965   }
966
967   return GNUNET_YES;
968 }
969
970 /**
971  * Send a message to the DHT telling it to start dropping
972  * all requests received.
973  *
974  * @param handle handle to the DHT service
975  * @param cont continuation to call once the message is sent
976  * @param cont_cls closure for continuation
977  *
978  * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
979  */
980 int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls)
981 {
982   struct GNUNET_DHT_ControlMessage *msg;
983   struct PendingMessage *pending;
984
985   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
986     return GNUNET_NO;
987
988   msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
989   msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
990   msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
991   msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP);
992   msg->variable = htons(0);
993
994   pending = GNUNET_malloc (sizeof (struct PendingMessage));
995   pending->msg = &msg->header;
996   pending->timeout = GNUNET_TIME_relative_get_forever();
997   pending->free_on_send = GNUNET_YES;
998   pending->cont = cont;
999   pending->cont_cls = cont_cls;
1000   pending->unique_id = 0;
1001
1002   if (handle->current == NULL)
1003     {
1004       handle->current = pending;
1005       process_pending_message (handle);
1006     }
1007   else
1008   {
1009     handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1010     handle->retransmission_buffer = pending;
1011   }
1012
1013   return GNUNET_YES;
1014 }
1015
1016
1017 /**
1018  * Initiate a generic DHT route operation.
1019  *
1020  * @param handle handle to the DHT service
1021  * @param key the key to look up
1022  * @param desired_replication_level how many peers should ultimately receive
1023  *                this message (advisory only, target may be too high for the
1024  *                given DHT or not hit exactly).
1025  * @param options options for routing
1026  * @param enc send the encapsulated message to a peer close to the key
1027  * @param iter function to call on each result, NULL if no replies are expected
1028  * @param iter_cls closure for iter
1029  * @param timeout when to abort with an error if we fail to get
1030  *                a confirmation for the request (when necessary) or how long
1031  *                to wait for tramission to the service
1032  * @param cont continuation to call when done;
1033  *             reason will be TIMEOUT on error,
1034  *             reason will be PREREQ_DONE on success
1035  * @param cont_cls closure for cont
1036  *
1037  * @return handle to stop the request, NULL if the request is "fire and forget"
1038  */
1039 struct GNUNET_DHT_RouteHandle *
1040 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
1041                         const GNUNET_HashCode * key,
1042                         unsigned int desired_replication_level,
1043                         enum GNUNET_DHT_RouteOption options,
1044                         const struct GNUNET_MessageHeader *enc,
1045                         struct GNUNET_TIME_Relative timeout,
1046                         GNUNET_DHT_ReplyProcessor iter,
1047                         void *iter_cls,
1048                         GNUNET_SCHEDULER_Task cont, void *cont_cls)
1049 {
1050   struct GNUNET_DHT_RouteHandle *route_handle;
1051   struct PendingMessage *pending;
1052   struct GNUNET_DHT_RouteMessage *message;
1053   uint16_t msize;
1054   GNUNET_HashCode uid_key;
1055
1056   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1057     return NULL;
1058
1059   if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1060     {
1061       GNUNET_break (0);
1062       return NULL;
1063     }
1064
1065   route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
1066   memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
1067   route_handle->iter = iter;
1068   route_handle->iter_cls = iter_cls;
1069   route_handle->dht_handle = handle;
1070   route_handle->uid = handle->uid_gen++;
1071   if (iter != NULL)
1072     {
1073       hash_from_uid (route_handle->uid, &uid_key);
1074       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
1075                                          &uid_key, route_handle,
1076                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1077     }
1078
1079 #if DEBUG_DHT_API
1080       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1081                   "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
1082 #endif
1083
1084   msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
1085   message = GNUNET_malloc (msize);
1086   message->header.size = htons (msize);
1087   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
1088   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
1089   message->options = htonl (options);
1090   message->desired_replication_level = htonl (desired_replication_level);
1091   message->unique_id = GNUNET_htonll (route_handle->uid);
1092   memcpy (&message[1], enc, ntohs (enc->size));
1093   pending = GNUNET_malloc (sizeof (struct PendingMessage));
1094   pending->msg = &message->header;
1095   pending->timeout = timeout;
1096   if (iter == NULL)
1097     pending->free_on_send = GNUNET_YES;
1098   pending->cont = cont;
1099   pending->cont_cls = cont_cls;
1100   pending->unique_id = route_handle->uid;
1101   if (handle->current == NULL)
1102     {
1103       handle->current = pending;
1104       process_pending_message (handle);
1105     }
1106   else
1107   {
1108     handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1109     handle->retransmission_buffer = pending;
1110   }
1111
1112   route_handle->message = message;
1113   return route_handle;
1114 }
1115
1116
1117 /**
1118  * Perform an asynchronous GET operation on the DHT identified.
1119  *
1120  * @param handle handle to the DHT service
1121  * @param timeout how long to wait for transmission of this request to the service
1122  * @param type expected type of the response object
1123  * @param key the key to look up
1124  * @param iter function to call on each result
1125  * @param iter_cls closure for iter
1126  * @param cont continuation to call once message sent
1127  * @param cont_cls closure for continuation
1128  *
1129  * @return handle to stop the async get
1130  */
1131 struct GNUNET_DHT_GetHandle *
1132 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1133                       struct GNUNET_TIME_Relative timeout,
1134                       uint32_t type,
1135                       const GNUNET_HashCode * key,
1136                       GNUNET_DHT_GetIterator iter,
1137                       void *iter_cls,
1138                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
1139 {
1140   struct GNUNET_DHT_GetHandle *get_handle;
1141   struct GNUNET_DHT_GetMessage get_msg;
1142
1143   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
1144     return NULL;
1145
1146   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
1147   get_handle->get_context.iter = iter;
1148   get_handle->get_context.iter_cls = iter_cls;
1149
1150 #if DEBUG_DHT_API
1151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152               "`%s': Inserting pending get request with key %s\n", "DHT API",
1153               GNUNET_h2s (key));
1154 #endif
1155
1156   get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
1157   get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
1158   get_msg.type = htons (type);
1159
1160   get_handle->route_handle =
1161     GNUNET_DHT_route_start (handle, key, DEFAULT_GET_REPLICATION, 0, &get_msg.header, timeout,
1162                             &get_reply_iterator, get_handle, cont, cont_cls);
1163
1164   return get_handle;
1165 }
1166
1167
1168 /**
1169  * Stop a previously issued routing request
1170  *
1171  * @param route_handle handle to the request to stop
1172  * @param cont continuation to call once this message is sent to the service or times out
1173  * @param cont_cls closure for the continuation
1174  */
1175 void
1176 GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
1177                        GNUNET_SCHEDULER_Task cont, void *cont_cls)
1178 {
1179   struct PendingMessage *pending;
1180   struct GNUNET_DHT_StopMessage *message;
1181   size_t msize;
1182   GNUNET_HashCode uid_key;
1183
1184   msize = sizeof (struct GNUNET_DHT_StopMessage);
1185   message = GNUNET_malloc (msize);
1186   message->header.size = htons (msize);
1187   message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
1188 #if DEBUG_DHT_API
1189   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1190               "`%s': Remove outstanding request for uid %llu\n", "DHT API",
1191               route_handle->uid);
1192 #endif
1193   message->unique_id = GNUNET_htonll (route_handle->uid);
1194   memcpy(&message->key, &route_handle->key, sizeof(GNUNET_HashCode));
1195   pending = GNUNET_malloc (sizeof (struct PendingMessage));
1196   pending->msg = (struct GNUNET_MessageHeader *) message;
1197   pending->timeout = GNUNET_TIME_relative_get_forever();
1198   pending->cont = cont;
1199   pending->cont_cls = cont_cls;
1200   pending->free_on_send = GNUNET_YES;
1201   pending->unique_id = 0; /* When finished is called, free pending->msg */
1202
1203   if (route_handle->dht_handle->current == NULL)
1204     {
1205       route_handle->dht_handle->current = pending;
1206       process_pending_message (route_handle->dht_handle);
1207     }
1208   else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)
1209     {
1210       route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
1211       route_handle->dht_handle->retransmission_buffer = pending;
1212     }
1213   else
1214     {
1215       GNUNET_free(pending);
1216       GNUNET_break(0);
1217     }
1218
1219   hash_from_uid (route_handle->uid, &uid_key);
1220   GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
1221                  (route_handle->dht_handle->outstanding_requests, &uid_key,
1222                   route_handle) == GNUNET_YES);
1223
1224   GNUNET_free(route_handle->message);
1225   GNUNET_free(route_handle);
1226 }
1227
1228
1229 /**
1230  * Stop async DHT-get.
1231  *
1232  * @param get_handle handle to the GET operation to stop
1233  * @param cont continuation to call once this message is sent to the service or times out
1234  * @param cont_cls closure for the continuation
1235  */
1236 void
1237 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
1238                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
1239 {
1240   if ((get_handle->route_handle->dht_handle->current != NULL) &&
1241       (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1242     {
1243       if (cont != NULL)
1244         {
1245           GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
1246                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1247         }
1248       return;
1249     }
1250
1251 #if DEBUG_DHT_API
1252   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1253               "`%s': Removing pending get request with key %s, uid %llu\n",
1254               "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
1255               get_handle->route_handle->uid);
1256 #endif
1257   GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
1258   GNUNET_free (get_handle);
1259 }
1260
1261
1262 /**
1263  * Perform an asynchronous FIND PEER operation on the DHT.
1264  *
1265  * @param handle handle to the DHT service
1266  * @param timeout timeout for this request to be sent to the
1267  *        service
1268  * @param options routing options for this message
1269  * @param key the key to look up
1270  * @param proc function to call on each result
1271  * @param proc_cls closure for proc
1272  * @param cont continuation to call once message sent
1273  * @param cont_cls closure for continuation
1274  *
1275  * @return handle to stop the async get, NULL on error
1276  */
1277 struct GNUNET_DHT_FindPeerHandle *
1278 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
1279                             struct GNUNET_TIME_Relative timeout,
1280                             enum GNUNET_DHT_RouteOption options,
1281                             const GNUNET_HashCode * key,
1282                             GNUNET_DHT_FindPeerProcessor proc,
1283                             void *proc_cls,
1284                             GNUNET_SCHEDULER_Task cont,
1285                             void *cont_cls)
1286 {
1287   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
1288   struct GNUNET_DHT_FindPeerMessage find_peer_msg;
1289
1290   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))  /* Can't send right now, we have a pending message... */
1291     return NULL;
1292
1293   find_peer_handle =
1294     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
1295   find_peer_handle->find_peer_context.proc = proc;
1296   find_peer_handle->find_peer_context.proc_cls = proc_cls;
1297
1298 #if DEBUG_DHT_API
1299   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1300               "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
1301               "FIND PEER", GNUNET_h2s (key));
1302 #endif
1303
1304   find_peer_msg.header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
1305   find_peer_msg.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
1306   find_peer_handle->route_handle =
1307     GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg.header,
1308                             timeout, &find_peer_reply_iterator,
1309                             find_peer_handle, cont, cont_cls);
1310   return find_peer_handle;
1311 }
1312
1313 /**
1314  * Stop async find peer.  Frees associated resources.
1315  *
1316  * @param find_peer_handle GET operation to stop.
1317  * @param cont continuation to call once this message is sent to the service or times out
1318  * @param cont_cls closure for the continuation
1319  */
1320 void
1321 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
1322                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
1323 {
1324   if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
1325       (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1326     {
1327       if (cont != NULL)
1328         {
1329           GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
1330                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1331         }
1332       return;
1333     }
1334
1335 #if DEBUG_DHT_API
1336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337               "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1338               "DHT API", "FIND PEER",
1339               GNUNET_h2s (&find_peer_handle->route_handle->key),
1340               find_peer_handle->route_handle->uid);
1341 #endif
1342   GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1343   GNUNET_free (find_peer_handle);
1344
1345 }
1346
1347
1348 /**
1349  * Perform a PUT operation storing data in the DHT.
1350  *
1351  * @param handle handle to DHT service
1352  * @param key the key to store under
1353  * @param type type of the value
1354  * @param size number of bytes in data; must be less than 64k
1355  * @param data the data to store
1356  * @param exp desired expiration time for the value
1357  * @param timeout how long to wait for transmission of this request
1358  * @param cont continuation to call when done;
1359  *             reason will be TIMEOUT on error,
1360  *             reason will be PREREQ_DONE on success
1361  * @param cont_cls closure for cont
1362  *
1363  * @return GNUNET_YES if put message is queued for transmission
1364  */
1365 void
1366 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1367                 const GNUNET_HashCode * key,
1368                 uint32_t type,
1369                 uint32_t size,
1370                 const char *data,
1371                 struct GNUNET_TIME_Absolute exp,
1372                 struct GNUNET_TIME_Relative timeout,
1373                 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1374 {
1375   struct GNUNET_DHT_PutMessage *put_msg;
1376   struct GNUNET_DHT_RouteHandle *put_route;
1377   size_t msize;
1378
1379   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1380     {
1381       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "handle->current is not null!\n");
1382       if (cont != NULL)
1383         {
1384           GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1385                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1386         }
1387       return;
1388     }
1389
1390 #if DEBUG_DHT_API
1391   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1392               "`%s': Inserting pending put request with key %s\n", "DHT API",
1393               GNUNET_h2s (key));
1394 #endif
1395
1396   msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1397   put_msg = GNUNET_malloc (msize);
1398   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1399   put_msg->header.size = htons (msize);
1400   put_msg->type = htons (type);
1401   put_msg->data_size = htons (size);
1402   put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
1403   memcpy (&put_msg[1], data, size);
1404
1405   put_route = GNUNET_DHT_route_start (handle, key, DEFAULT_PUT_REPLICATION, 0, &put_msg->header, timeout, NULL,
1406                                       NULL, cont, cont_cls);
1407
1408   if (put_route == NULL) /* Route start failed! */
1409     {
1410       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "route start for PUT failed!\n");
1411       if (cont != NULL)
1412         {
1413           GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1414                                              GNUNET_SCHEDULER_REASON_TIMEOUT);
1415         }
1416     }
1417   else
1418     {
1419       GNUNET_free(put_route);
1420     }
1421
1422   GNUNET_free (put_msg);
1423 }