7964ba98f985c6725e15cb0b52a643dced519d65
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
38
39 /**
40  * Entry in our list of messages to be (re-)transmitted.
41  */
42 struct PendingMessage
43 {
44   /**
45    * This is a doubly-linked list.
46    */
47   struct PendingMessage *prev;
48
49   /**
50    * This is a doubly-linked list.
51    */
52   struct PendingMessage *next;
53
54   /**
55    * Message that is pending, allocated at the end
56    * of this struct.
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60   /**
61    * Handle to the DHT API context.
62    */
63   struct GNUNET_DHT_Handle *handle;
64
65   /**
66    * Continuation to call when the request has been
67    * transmitted (for the first time) to the service; can be NULL.
68    */
69   GNUNET_SCHEDULER_Task cont;
70
71   /**
72    * Closure for 'cont'.
73    */
74   void *cont_cls;
75
76   /**
77    * Unique ID for this request
78    */
79   uint64_t unique_id;
80
81   /**
82    * Free the saved message once sent, set to GNUNET_YES for messages
83    * that do not receive responses; GNUNET_NO if this pending message
84    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
85    * from there.
86    */
87   int free_on_send;
88
89   /**
90    * GNUNET_YES if this message is in our pending queue right now.
91    */
92   int in_pending_queue;
93
94 };
95
96
97 /**
98  * Handle to a PUT request.
99  */
100 struct GNUNET_DHT_PutHandle
101 {
102   /**
103    * Kept in a DLL.
104    */
105   struct GNUNET_DHT_PutHandle *next;
106
107   /**
108    * Kept in a DLL.
109    */
110   struct GNUNET_DHT_PutHandle *prev;
111
112   /**
113    * Continuation to call when done.
114    */
115   GNUNET_DHT_PutContinuation cont;
116
117   /**
118    * Pending message associated with this PUT operation, 
119    * NULL after the message has been transmitted to the service.
120    */
121   struct PendingMessage *pending;
122
123   /**
124    * Main handle to this DHT api
125    */
126   struct GNUNET_DHT_Handle *dht_handle;
127
128   /**
129    * Closure for 'cont'.
130    */
131   void *cont_cls;
132
133   /**
134    * Timeout task for this operation.
135    */
136   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
137
138   /**
139    * Unique ID for the PUT operation.
140    */
141   uint64_t unique_id;
142
143 };
144
145
146
147 /**
148  * Handle to a GET request
149  */
150 struct GNUNET_DHT_GetHandle
151 {
152
153   /**
154    * Iterator to call on data receipt
155    */
156   GNUNET_DHT_GetIterator iter;
157
158   /**
159    * Closure for the iterator callback
160    */
161   void *iter_cls;
162
163   /**
164    * Main handle to this DHT api
165    */
166   struct GNUNET_DHT_Handle *dht_handle;
167
168   /**
169    * The actual message sent for this request,
170    * used for retransmitting requests on service
171    * failure/reconnect.  Freed on route_stop.
172    */
173   struct PendingMessage *message;
174
175   /**
176    * Key that this get request is for
177    */
178   struct GNUNET_HashCode key;
179
180   /**
181    * Unique identifier for this request (for key collisions).
182    */
183   uint64_t unique_id;
184
185 };
186
187
188 /**
189  * Handle to a monitoring request.
190  */
191 struct GNUNET_DHT_MonitorHandle
192 {
193   /**
194    * DLL.
195    */
196   struct GNUNET_DHT_MonitorHandle *next;
197
198   /**
199    * DLL.
200    */
201   struct GNUNET_DHT_MonitorHandle *prev;
202   
203   /**
204    * Main handle to this DHT api.
205    */
206   struct GNUNET_DHT_Handle *dht_handle;
207
208   /**
209    * Type of block looked for.
210    */
211   enum GNUNET_BLOCK_Type type;
212
213   /**
214    * Key being looked for, NULL == all.
215    */
216   struct GNUNET_HashCode *key;
217
218   /**
219    * Callback for each received message of type get.
220    */
221   GNUNET_DHT_MonitorGetCB get_cb;
222
223   /**
224    * Callback for each received message of type get response.
225    */
226   GNUNET_DHT_MonitorGetRespCB get_resp_cb;
227
228   /**
229    * Callback for each received message of type put.
230    */
231   GNUNET_DHT_MonitorPutCB put_cb;
232
233   /**
234    * Closure for cb.
235    */
236   void *cb_cls;
237   
238 };
239
240
241 /**
242  * Connection to the DHT service.
243  */
244 struct GNUNET_DHT_Handle
245 {
246
247   /**
248    * Configuration to use.
249    */
250   const struct GNUNET_CONFIGURATION_Handle *cfg;
251
252   /**
253    * Socket (if available).
254    */
255   struct GNUNET_CLIENT_Connection *client;
256
257   /**
258    * Currently pending transmission request (or NULL).
259    */
260   struct GNUNET_CLIENT_TransmitHandle *th;
261
262   /**
263    * Head of linked list of messages we would like to transmit.
264    */
265   struct PendingMessage *pending_head;
266
267   /**
268    * Tail of linked list of messages we would like to transmit.
269    */
270   struct PendingMessage *pending_tail;
271
272   /**
273    * Head of linked list of messages we would like to monitor. 
274    */
275   struct GNUNET_DHT_MonitorHandle *monitor_head;
276
277   /**
278    * Tail of linked list of messages we would like to monitor.
279    */
280   struct GNUNET_DHT_MonitorHandle *monitor_tail;
281
282   /**
283    * Head of active PUT requests.
284    */
285   struct GNUNET_DHT_PutHandle *put_head;
286
287   /**
288    * Tail of active PUT requests.
289    */
290   struct GNUNET_DHT_PutHandle *put_tail;
291
292   /**
293    * Hash map containing the current outstanding unique GET requests
294    * (values are of type 'struct GNUNET_DHT_GetHandle').
295    */
296   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
297
298   /**
299    * Task for trying to reconnect.
300    */
301   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
302
303   /**
304    * How quickly should we retry?  Used for exponential back-off on
305    * connect-errors.
306    */
307   struct GNUNET_TIME_Relative retry_time;
308
309   /**
310    * Generator for unique ids.
311    */
312   uint64_t uid_gen;
313
314   /**
315    * Did we start our receive loop yet?
316    */
317   int in_receive;
318 };
319
320
321 /**
322  * Handler for messages received from the DHT service
323  * a demultiplexer which handles numerous message types
324  *
325  * @param cls the 'struct GNUNET_DHT_Handle'
326  * @param msg the incoming message
327  */
328 static void
329 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
330
331
332 /**
333  * Try to (re)connect to the DHT service.
334  *
335  * @param handle DHT handle to reconnect
336  * @return GNUNET_YES on success, GNUNET_NO on failure.
337  */
338 static int
339 try_connect (struct GNUNET_DHT_Handle *handle)
340 {
341   if (NULL != handle->client)
342     return GNUNET_OK;
343   handle->in_receive = GNUNET_NO;
344   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
345   if (NULL == handle->client)
346   {
347     LOG (GNUNET_ERROR_TYPE_WARNING,
348          _("Failed to connect to the DHT service!\n"));
349     return GNUNET_NO;
350   }
351   return GNUNET_YES;
352 }
353
354
355 /**
356  * Add the request corresponding to the given route handle
357  * to the pending queue (if it is not already in there).
358  *
359  * @param cls the 'struct GNUNET_DHT_Handle*'
360  * @param key key for the request (not used)
361  * @param value the 'struct GNUNET_DHT_GetHandle*'
362  * @return GNUNET_YES (always)
363  */
364 static int
365 add_request_to_pending (void *cls, const struct GNUNET_HashCode * key, void *value)
366 {
367   struct GNUNET_DHT_Handle *handle = cls;
368   struct GNUNET_DHT_GetHandle *rh = value;
369
370   if (GNUNET_NO == rh->message->in_pending_queue)
371   {
372     LOG (GNUNET_ERROR_TYPE_DEBUG,
373          "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
374          handle);
375     GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
376                                  rh->message);
377     rh->message->in_pending_queue = GNUNET_YES;
378   }
379   return GNUNET_YES;
380 }
381
382
383 /**
384  * Try to send messages from list of messages to send
385  *
386  * @param handle DHT_Handle
387  */
388 static void
389 process_pending_messages (struct GNUNET_DHT_Handle *handle);
390
391
392 /**
393  * Try reconnecting to the dht service.
394  *
395  * @param cls GNUNET_DHT_Handle
396  * @param tc scheduler context
397  */
398 static void
399 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
400 {
401   struct GNUNET_DHT_Handle *handle = cls;
402
403   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with DHT %p\n", handle);
404   handle->retry_time = GNUNET_TIME_STD_BACKOFF (handle->retry_time);
405   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
406   if (GNUNET_YES != try_connect (handle))
407   {
408     LOG (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
409     return;
410   }
411   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
412                                          &add_request_to_pending, handle);
413   process_pending_messages (handle);
414 }
415
416
417 /**
418  * Try reconnecting to the DHT service.
419  *
420  * @param handle handle to dht to (possibly) disconnect and reconnect
421  */
422 static void
423 do_disconnect (struct GNUNET_DHT_Handle *handle)
424 {
425   struct GNUNET_DHT_PutHandle *ph;
426   struct GNUNET_DHT_PutHandle *next;
427
428   if (NULL == handle->client)
429     return;
430   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task);
431   if (NULL != handle->th)
432     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
433   handle->th = NULL;
434   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
435               "Disconnecting from DHT service, will try to reconnect in %s\n",
436               GNUNET_STRINGS_relative_time_to_string (handle->retry_time,
437                                                       GNUNET_YES));
438   GNUNET_CLIENT_disconnect (handle->client);
439   handle->client = NULL;
440
441   /* signal disconnect to all PUT requests that were transmitted but waiting
442      for the put confirmation */
443   next = handle->put_head;
444   while (NULL != (ph = next))
445   {
446     next = ph->next;
447     if (NULL == ph->pending)
448     {
449       if (NULL != ph->cont)
450         ph->cont (ph->cont_cls, GNUNET_SYSERR);
451       GNUNET_DHT_put_cancel (ph);
452     }
453   }
454   handle->reconnect_task =
455       GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
456 }
457
458
459 /**
460  * Transmit the next pending message, called by notify_transmit_ready
461  *
462  * @param cls the DHT handle
463  * @param size number of bytes available in 'buf' for transmission
464  * @param buf where to copy messages for the service
465  * @return number of bytes written to 'buf'
466  */
467 static size_t
468 transmit_pending (void *cls, size_t size, void *buf);
469
470
471 /**
472  * Try to send messages from list of messages to send
473  *
474  * @param handle handle to DHT
475  */
476 static void
477 process_pending_messages (struct GNUNET_DHT_Handle *handle)
478 {
479   struct PendingMessage *head;
480
481   if (NULL == handle->client)
482   {
483     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
484                 "process_pending_messages called, but client is NULL, reconnecting\n");
485     do_disconnect (handle);
486     return;
487   }
488   if (NULL != handle->th)
489     return;
490   if (NULL == (head = handle->pending_head))
491     return;
492   handle->th =
493       GNUNET_CLIENT_notify_transmit_ready (handle->client,
494                                            ntohs (head->msg->size),
495                                            GNUNET_TIME_UNIT_FOREVER_REL,
496                                            GNUNET_YES, &transmit_pending,
497                                            handle);
498   if (NULL != handle->th)
499     return;
500   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501               "notify_transmit_ready returned NULL, reconnecting\n");
502   do_disconnect (handle);
503 }
504
505
506 /**
507  * Transmit the next pending message, called by notify_transmit_ready
508  *
509  * @param cls the DHT handle
510  * @param size number of bytes available in 'buf' for transmission
511  * @param buf where to copy messages for the service
512  * @return number of bytes written to 'buf'
513  */
514 static size_t
515 transmit_pending (void *cls, size_t size, void *buf)
516 {
517   struct GNUNET_DHT_Handle *handle = cls;
518   struct PendingMessage *head;
519   size_t tsize;
520
521   handle->th = NULL;
522   if (NULL == buf)
523   {    
524     LOG (GNUNET_ERROR_TYPE_DEBUG,
525          "Transmission to DHT service failed!  Reconnecting!\n");
526     do_disconnect (handle);
527     return 0;
528   }
529   if (NULL == (head = handle->pending_head))
530     return 0;
531
532   tsize = ntohs (head->msg->size);
533   if (size < tsize)
534   {
535     process_pending_messages (handle);
536     return 0;
537   }
538   memcpy (buf, head->msg, tsize);
539   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
540                                head);
541   head->in_pending_queue = GNUNET_NO;
542   if (NULL != head->cont)
543   {
544     head->cont (head->cont_cls, NULL);
545     head->cont = NULL;
546     head->cont_cls = NULL;
547   }
548   if (GNUNET_YES == head->free_on_send)
549     GNUNET_free (head);
550   process_pending_messages (handle);
551   LOG (GNUNET_ERROR_TYPE_DEBUG,
552        "Forwarded request of %u bytes to DHT service\n", (unsigned int) tsize);
553   if (GNUNET_NO == handle->in_receive)
554   {
555     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n");
556     handle->in_receive = GNUNET_YES;
557     GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
558                            GNUNET_TIME_UNIT_FOREVER_REL);
559   }
560   return tsize;
561 }
562
563
564 /**
565  * Process a given reply that might match the given
566  * request.
567  *
568  * @param cls the 'struct GNUNET_DHT_ClientResultMessage'
569  * @param key query of the request
570  * @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key
571  * @return GNUNET_YES to continue to iterate over all results,
572  *         GNUNET_NO if the reply is malformed
573  */
574 static int
575 process_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
576 {
577   const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
578   struct GNUNET_DHT_GetHandle *get_handle = value;
579   const struct GNUNET_PeerIdentity *put_path;
580   const struct GNUNET_PeerIdentity *get_path;
581   uint32_t put_path_length;
582   uint32_t get_path_length;
583   size_t data_length;
584   size_t msize;
585   size_t meta_length;
586   const void *data;
587
588   if (dht_msg->unique_id != get_handle->unique_id)
589   {
590     /* UID mismatch */
591     LOG (GNUNET_ERROR_TYPE_DEBUG,
592          "Ignoring reply for %s: UID mismatch: %llu/%llu\n", GNUNET_h2s (key),
593          dht_msg->unique_id, get_handle->unique_id);
594     return GNUNET_YES;
595   }
596   msize = ntohs (dht_msg->header.size);
597   put_path_length = ntohl (dht_msg->put_path_length);
598   get_path_length = ntohl (dht_msg->get_path_length);
599   meta_length =
600       sizeof (struct GNUNET_DHT_ClientResultMessage) +
601       sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
602   if ((msize < meta_length) ||
603       (get_path_length >
604        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
605       (put_path_length >
606        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
607   {
608     GNUNET_break (0);
609     return GNUNET_NO;
610   }
611   data_length = msize - meta_length;
612   LOG (GNUNET_ERROR_TYPE_DEBUG, "Giving %u byte reply for %s to application\n",
613        (unsigned int) data_length, GNUNET_h2s (key));
614   put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
615   get_path = &put_path[put_path_length];
616   data = &get_path[get_path_length];
617   get_handle->iter (get_handle->iter_cls,
618                     GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
619                     get_path, get_path_length, put_path, put_path_length,
620                     ntohl (dht_msg->type), data_length, data);
621   return GNUNET_YES;
622 }
623
624 /**
625  * Process a get monitor message from the service.
626  *
627  * @param handle The DHT handle.
628  * @param msg Monitor get message from the service.
629  * 
630  * @return GNUNET_OK if everything went fine,
631  *         GNUNET_SYSERR if the message is malformed.
632  */
633 static int
634 process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
635                              const struct GNUNET_DHT_MonitorGetMessage *msg)
636 {
637   struct GNUNET_DHT_MonitorHandle *h;
638
639   for (h = handle->monitor_head; NULL != h; h = h->next)
640   {
641     int type_ok;
642     int key_ok;
643
644     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
645     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
646                                                sizeof (struct GNUNET_HashCode)));
647     if (type_ok && key_ok && (NULL != h->get_cb))
648       h->get_cb (h->cb_cls,
649                  ntohl (msg->options),
650                  (enum GNUNET_BLOCK_Type) ntohl(msg->type),
651                  ntohl (msg->hop_count),
652                  ntohl (msg->desired_replication_level),
653                  ntohl (msg->get_path_length),
654                  (struct GNUNET_PeerIdentity *) &msg[1],
655                  &msg->key);    
656   }
657   return GNUNET_OK;
658 }
659
660
661 /**
662  * Process a get response monitor message from the service.
663  *
664  * @param handle The DHT handle.
665  * @param msg monitor get response message from the service
666  * @return GNUNET_OK if everything went fine,
667  *         GNUNET_SYSERR if the message is malformed.
668  */
669 static int
670 process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
671                                   const struct GNUNET_DHT_MonitorGetRespMessage
672                                   *msg)
673 {
674   struct GNUNET_DHT_MonitorHandle *h;
675   struct GNUNET_PeerIdentity *path;
676   uint32_t getl;
677   uint32_t putl;
678   size_t msize;
679
680   msize = ntohs (msg->header.size);
681   path = (struct GNUNET_PeerIdentity *) &msg[1];
682   getl = ntohl (msg->get_path_length);
683   putl = ntohl (msg->put_path_length);
684   if ( (getl + putl < getl) ||
685        ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) )
686   {
687     GNUNET_break (0);
688     return GNUNET_SYSERR;
689   }
690   for (h = handle->monitor_head; NULL != h; h = h->next)
691   {
692     int type_ok;
693     int key_ok;
694
695     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
696     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
697                                                sizeof (struct GNUNET_HashCode)));
698     if (type_ok && key_ok && (NULL != h->get_resp_cb))
699       h->get_resp_cb (h->cb_cls,
700                       (enum GNUNET_BLOCK_Type) ntohl(msg->type),
701                       path, getl,
702                       &path[getl], putl,
703                       GNUNET_TIME_absolute_ntoh(msg->expiration_time),
704                       &msg->key,
705                       (void *) &path[getl + putl],
706                       msize -
707                       sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
708                       sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
709   }
710   return GNUNET_OK;
711 }
712
713
714 /**
715  * Process a put monitor message from the service.
716  *
717  * @param handle The DHT handle.
718  * @param msg Monitor put message from the service.
719  * 
720  * @return GNUNET_OK if everything went fine,
721  *         GNUNET_SYSERR if the message is malformed.
722  */
723 static int
724 process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
725                              const struct GNUNET_DHT_MonitorPutMessage *msg)
726 {
727   struct GNUNET_DHT_MonitorHandle *h;
728   size_t msize;
729   struct GNUNET_PeerIdentity *path;
730   uint32_t putl;
731
732   msize = ntohs (msg->header.size);
733   path = (struct GNUNET_PeerIdentity *) &msg[1];
734   putl = ntohl (msg->put_path_length);
735   if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl)
736   {
737     GNUNET_break (0);
738     return GNUNET_SYSERR;
739   }
740   for (h = handle->monitor_head; NULL != h; h = h->next)
741   {
742     int type_ok;
743     int key_ok;
744
745     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
746     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
747                                                sizeof (struct GNUNET_HashCode)));
748     if (type_ok && key_ok && (NULL != h->put_cb))
749       h->put_cb (h->cb_cls,
750                  ntohl (msg->options),
751                  (enum GNUNET_BLOCK_Type) ntohl(msg->type),
752                  ntohl (msg->hop_count),
753                  ntohl (msg->desired_replication_level),
754                  putl, path,
755                  GNUNET_TIME_absolute_ntoh(msg->expiration_time),
756                  &msg->key,
757                  (void *) &path[putl],
758                  msize -
759                  sizeof (struct GNUNET_DHT_MonitorPutMessage) -
760                  sizeof (struct GNUNET_PeerIdentity) * putl);
761   }
762   return GNUNET_OK;
763 }
764
765
766 /**
767  * Process a put confirmation message from the service.
768  *
769  * @param handle The DHT handle.
770  * @param msg confirmation message from the service.
771  * @return GNUNET_OK if everything went fine,
772  *         GNUNET_SYSERR if the message is malformed.
773  */
774 static int
775 process_put_confirmation_message (struct GNUNET_DHT_Handle *handle,
776                                   const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
777 {
778   struct GNUNET_DHT_PutHandle *ph;
779   GNUNET_DHT_PutContinuation cont;
780   void *cont_cls;
781
782   for (ph = handle->put_head; NULL != ph; ph = ph->next)
783     if (ph->unique_id == msg->unique_id)
784       break;
785   if (NULL == ph)
786     return GNUNET_OK;
787   cont = ph->cont;
788   cont_cls = ph->cont_cls;
789   GNUNET_DHT_put_cancel (ph);
790   if (NULL != cont) 
791     cont (cont_cls, GNUNET_OK);
792   return GNUNET_OK;
793 }
794
795
796 /**
797  * Handler for messages received from the DHT service
798  * a demultiplexer which handles numerous message types
799  *
800  * @param cls the 'struct GNUNET_DHT_Handle'
801  * @param msg the incoming message
802  */
803 static void
804 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
805 {
806   struct GNUNET_DHT_Handle *handle = cls;
807   const struct GNUNET_DHT_ClientResultMessage *dht_msg;
808   uint16_t msize;
809   int ret;
810
811   if (NULL == msg)
812   {
813     LOG (GNUNET_ERROR_TYPE_DEBUG,
814          "Error receiving data from DHT service, reconnecting\n");
815     do_disconnect (handle);
816     return;
817   }
818   ret = GNUNET_SYSERR;
819   msize = ntohs (msg->size);
820   switch (ntohs (msg->type))
821   {
822   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
823     if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
824     {
825       GNUNET_break (0);
826       break;
827     }
828     ret = process_monitor_get_message(handle,
829                                       (const struct GNUNET_DHT_MonitorGetMessage *) msg);
830     break;
831   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
832     if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
833     {
834       GNUNET_break (0);
835       break;
836     }
837     ret = process_monitor_get_resp_message(handle,
838                                            (const struct GNUNET_DHT_MonitorGetRespMessage *) msg);
839     break;
840   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
841     if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
842     {
843       GNUNET_break (0);
844       break;
845     }
846     ret = process_monitor_put_message(handle,
847                                       (const struct GNUNET_DHT_MonitorPutMessage *) msg);
848     break;
849   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
850     /* Not implemented yet */
851     GNUNET_break(0);
852     break;
853   case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT:
854     if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
855     {
856       GNUNET_break (0);
857       break;
858     }
859     ret = GNUNET_OK;
860     dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
861     LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
862          GNUNET_h2s (&dht_msg->key), handle);
863     GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
864                                                 &dht_msg->key, &process_reply,
865                                                 (void *) dht_msg);
866     break;
867   case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK:
868     if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage))
869     {
870       GNUNET_break (0);
871       break;
872     }
873     ret = process_put_confirmation_message (handle,
874                                             (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg);
875     break;
876   default:
877     GNUNET_break(0);
878     LOG (GNUNET_ERROR_TYPE_WARNING,
879          "Unknown DHT message type: %hu (%hu) size: %hu\n",
880          ntohs (msg->type), msg->type, msize);
881     break;
882   }
883   if (GNUNET_OK != ret)
884   {
885     GNUNET_break (0);
886     do_disconnect (handle);
887     return;
888   }
889   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
890                          GNUNET_TIME_UNIT_FOREVER_REL);
891 }
892
893
894 /**
895  * Initialize the connection with the DHT service.
896  *
897  * @param cfg configuration to use
898  * @param ht_len size of the internal hash table to use for
899  *               processing multiple GET/FIND requests in parallel
900  *
901  * @return handle to the DHT service, or NULL on error
902  */
903 struct GNUNET_DHT_Handle *
904 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
905                     unsigned int ht_len)
906 {
907   struct GNUNET_DHT_Handle *handle;
908
909   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
910   handle->cfg = cfg;
911   handle->uid_gen =
912       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
913   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len, GNUNET_NO);
914   if (GNUNET_NO == try_connect (handle))
915   {
916     GNUNET_DHT_disconnect (handle);
917     return NULL;
918   }
919   return handle;
920 }
921
922
923 /**
924  * Shutdown connection with the DHT service.
925  *
926  * @param handle handle of the DHT connection to stop
927  */
928 void
929 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
930 {
931   struct PendingMessage *pm;
932   struct GNUNET_DHT_PutHandle *ph;
933
934   GNUNET_assert (NULL != handle);
935   GNUNET_assert (0 ==
936                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
937   if (NULL != handle->th)
938   {
939     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
940     handle->th = NULL;
941   }
942   while (NULL != (pm = handle->pending_head))
943   {
944     GNUNET_assert (GNUNET_YES == pm->in_pending_queue);
945     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
946                                  pm);
947     pm->in_pending_queue = GNUNET_NO;
948     GNUNET_assert (GNUNET_YES == pm->free_on_send);
949     if (NULL != pm->cont)
950       pm->cont (pm->cont_cls, NULL);
951     GNUNET_free (pm);
952   }
953   while (NULL != (ph = handle->put_head))
954   {
955     GNUNET_break (NULL == ph->pending);
956     if (NULL != ph->cont)
957       ph->cont (ph->cont_cls, GNUNET_SYSERR);
958     GNUNET_DHT_put_cancel (ph);
959   }
960
961   if (NULL != handle->client)
962   {
963     GNUNET_CLIENT_disconnect (handle->client);
964     handle->client = NULL;
965   }
966   if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
967     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
968   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
969   GNUNET_free (handle);
970 }
971
972
973 /**
974  * Timeout for the transmission of a fire&forget-request.  Clean it up.
975  *
976  * @param cls the 'struct PendingMessage'
977  * @param tc scheduler context
978  */
979 static void
980 timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
981 {
982   struct GNUNET_DHT_PutHandle *ph = cls;
983   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
984
985   ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
986   if (NULL != ph->pending)
987   {
988     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
989                                  ph->pending);
990     ph->pending->in_pending_queue = GNUNET_NO;
991     GNUNET_free (ph->pending);
992   }
993   if (NULL != ph->cont)
994     ph->cont (ph->cont_cls, GNUNET_NO);
995   GNUNET_CONTAINER_DLL_remove (handle->put_head,
996                                handle->put_tail,
997                                ph);
998   GNUNET_free (ph);
999 }
1000
1001
1002 /**
1003  * Function called whenever the PUT message leaves the queue.  Sets
1004  * the message pointer in the put handle to NULL.
1005  *
1006  * @param cls the 'struct GNUNET_DHT_PutHandle'
1007  * @param tc unused
1008  */
1009 static void
1010 mark_put_message_gone (void *cls,
1011                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1012 {
1013   struct GNUNET_DHT_PutHandle *ph = cls;
1014
1015   ph->pending = NULL;
1016 }
1017
1018
1019 /**
1020  * Perform a PUT operation storing data in the DHT.  FIXME: we should
1021  * change the protocol to get a confirmation for the PUT from the DHT
1022  * and call 'cont' only after getting the confirmation; otherwise, the
1023  * client has no good way of telling if the 'PUT' message actually got
1024  * to the DHT service!
1025  *
1026  * @param handle handle to DHT service
1027  * @param key the key to store under
1028  * @param desired_replication_level estimate of how many
1029  *                nearest peers this request should reach
1030  * @param options routing options for this message
1031  * @param type type of the value
1032  * @param size number of bytes in data; must be less than 64k
1033  * @param data the data to store
1034  * @param exp desired expiration time for the value
1035  * @param timeout how long to wait for transmission of this request
1036  * @param cont continuation to call when done (transmitting request to service)
1037  *        You must not call GNUNET_DHT_DISCONNECT in this continuation
1038  * @param cont_cls closure for cont
1039  */
1040 struct GNUNET_DHT_PutHandle *
1041 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const struct GNUNET_HashCode * key,
1042                 uint32_t desired_replication_level,
1043                 enum GNUNET_DHT_RouteOption options,
1044                 enum GNUNET_BLOCK_Type type, size_t size, const void *data,
1045                 struct GNUNET_TIME_Absolute exp,
1046                 struct GNUNET_TIME_Relative timeout, GNUNET_DHT_PutContinuation cont,
1047                 void *cont_cls)
1048 {
1049   struct GNUNET_DHT_ClientPutMessage *put_msg;
1050   size_t msize;
1051   struct PendingMessage *pending;
1052   struct GNUNET_DHT_PutHandle *ph;
1053
1054   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
1055   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1056       (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
1057   {
1058     GNUNET_break (0);
1059     return NULL;
1060   }
1061   ph = GNUNET_malloc (sizeof (struct GNUNET_DHT_PutHandle));
1062   ph->dht_handle = handle;
1063   ph->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, ph);
1064   ph->cont = cont;
1065   ph->cont_cls = cont_cls;
1066   ph->unique_id = ++handle->uid_gen;
1067   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1068   ph->pending = pending;
1069   put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
1070   pending->msg = &put_msg->header;
1071   pending->handle = handle;
1072   pending->cont = &mark_put_message_gone;
1073   pending->cont_cls = ph;
1074   pending->free_on_send = GNUNET_YES;
1075   put_msg->header.size = htons (msize);
1076   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
1077   put_msg->type = htonl (type);
1078   put_msg->options = htonl ((uint32_t) options);
1079   put_msg->desired_replication_level = htonl (desired_replication_level);
1080   put_msg->unique_id = ph->unique_id;
1081   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
1082   put_msg->key = *key;
1083   memcpy (&put_msg[1], data, size);
1084   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1085                                pending);
1086   pending->in_pending_queue = GNUNET_YES;
1087   GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1088                                     handle->put_tail,
1089                                     ph);
1090   process_pending_messages (handle);
1091   return ph;
1092 }
1093
1094
1095 /**
1096  * Cancels a DHT PUT operation.  Note that the PUT request may still
1097  * go out over the network (we can't stop that); However, if the PUT
1098  * has not yet been sent to the service, cancelling the PUT will stop
1099  * this from happening (but there is no way for the user of this API
1100  * to tell if that is the case).  The only use for this API is to 
1101  * prevent a later call to 'cont' from "GNUNET_DHT_put" (i.e. because
1102  * the system is shutting down).
1103  *
1104  * @param ph put operation to cancel ('cont' will no longer be called)
1105  */
1106 void
1107 GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1108 {
1109   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1110
1111   if (NULL != ph->pending)
1112   {
1113     GNUNET_CONTAINER_DLL_remove (handle->pending_head,
1114                                  handle->pending_tail,
1115                                  ph->pending);
1116     GNUNET_free (ph->pending);
1117     ph->pending = NULL;
1118   }
1119   if (ph->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1120   {
1121     GNUNET_SCHEDULER_cancel (ph->timeout_task);
1122     ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1123   }
1124   GNUNET_CONTAINER_DLL_remove (handle->put_head,
1125                                handle->put_tail,
1126                                ph);
1127   GNUNET_free (ph);
1128 }
1129
1130
1131 /**
1132  * Perform an asynchronous GET operation on the DHT identified. See
1133  * also "GNUNET_BLOCK_evaluate".
1134  *
1135  * @param handle handle to the DHT service
1136  * @param type expected type of the response object
1137  * @param key the key to look up
1138  * @param desired_replication_level estimate of how many
1139                   nearest peers this request should reach
1140  * @param options routing options for this message
1141  * @param xquery extended query data (can be NULL, depending on type)
1142  * @param xquery_size number of bytes in xquery
1143  * @param iter function to call on each result
1144  * @param iter_cls closure for iter
1145  * @return handle to stop the async get
1146  */
1147 struct GNUNET_DHT_GetHandle *
1148 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1149                       enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode * key,
1150                       uint32_t desired_replication_level,
1151                       enum GNUNET_DHT_RouteOption options, const void *xquery,
1152                       size_t xquery_size, GNUNET_DHT_GetIterator iter,
1153                       void *iter_cls)
1154 {
1155   struct GNUNET_DHT_ClientGetMessage *get_msg;
1156   struct GNUNET_DHT_GetHandle *get_handle;
1157   size_t msize;
1158   struct PendingMessage *pending;
1159
1160   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
1161   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1162       (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
1163   {
1164     GNUNET_break (0);
1165     return NULL;
1166   }
1167   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending query for %s to DHT %p\n",
1168        GNUNET_h2s (key), handle);
1169   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1170   get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
1171   pending->msg = &get_msg->header;
1172   pending->handle = handle;
1173   pending->free_on_send = GNUNET_NO;
1174   get_msg->header.size = htons (msize);
1175   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
1176   get_msg->options = htonl ((uint32_t) options);
1177   get_msg->desired_replication_level = htonl (desired_replication_level);
1178   get_msg->type = htonl (type);
1179   get_msg->key = *key;
1180   get_msg->unique_id = ++handle->uid_gen;
1181   memcpy (&get_msg[1], xquery, xquery_size);
1182   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1183                                pending);
1184   pending->in_pending_queue = GNUNET_YES;
1185   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
1186   get_handle->iter = iter;
1187   get_handle->iter_cls = iter_cls;
1188   get_handle->message = pending;
1189   get_handle->unique_id = get_msg->unique_id;
1190   GNUNET_CONTAINER_multihashmap_put (handle->active_requests, key, get_handle,
1191                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1192   process_pending_messages (handle);
1193   return get_handle;
1194 }
1195
1196
1197 /**
1198  * Stop async DHT-get.
1199  *
1200  * @param get_handle handle to the GET operation to stop
1201  */
1202 void
1203 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1204 {
1205   struct GNUNET_DHT_Handle *handle;
1206   const struct GNUNET_DHT_ClientGetMessage *get_msg;
1207   struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
1208   struct PendingMessage *pending;
1209
1210   handle = get_handle->message->handle;
1211   get_msg =
1212       (const struct GNUNET_DHT_ClientGetMessage *) get_handle->message->msg;
1213   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending STOP for %s to DHT via %p\n",
1214        GNUNET_h2s (&get_msg->key), handle);
1215   /* generate STOP */
1216   pending =
1217       GNUNET_malloc (sizeof (struct PendingMessage) +
1218                      sizeof (struct GNUNET_DHT_ClientGetStopMessage));
1219   stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
1220   pending->msg = &stop_msg->header;
1221   pending->handle = handle;
1222   pending->free_on_send = GNUNET_YES;
1223   stop_msg->header.size =
1224       htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
1225   stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
1226   stop_msg->reserved = htonl (0);
1227   stop_msg->unique_id = get_msg->unique_id;
1228   stop_msg->key = get_msg->key;
1229   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1230                                pending);
1231   pending->in_pending_queue = GNUNET_YES;
1232
1233   /* remove 'GET' from active status */
1234   GNUNET_assert (GNUNET_YES ==
1235                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
1236                                                        &get_msg->key,
1237                                                        get_handle));
1238   if (GNUNET_YES == get_handle->message->in_pending_queue)
1239   {
1240     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
1241                                  get_handle->message);
1242     get_handle->message->in_pending_queue = GNUNET_NO;
1243   }
1244   GNUNET_free (get_handle->message);
1245   GNUNET_free (get_handle);
1246
1247   process_pending_messages (handle);
1248 }
1249
1250
1251 /**
1252  * Start monitoring the local DHT service.
1253  *
1254  * @param handle Handle to the DHT service.
1255  * @param type Type of blocks that are of interest.
1256  * @param key Key of data of interest, NULL for all.
1257  * @param get_cb Callback to process monitored get messages.
1258  * @param get_resp_cb Callback to process monitored get response messages.
1259  * @param put_cb Callback to process monitored put messages.
1260  * @param cb_cls Closure for cb.
1261  *
1262  * @return Handle to stop monitoring.
1263  */
1264 struct GNUNET_DHT_MonitorHandle *
1265 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
1266                           enum GNUNET_BLOCK_Type type,
1267                           const struct GNUNET_HashCode *key,
1268                           GNUNET_DHT_MonitorGetCB get_cb,
1269                           GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1270                           GNUNET_DHT_MonitorPutCB put_cb,
1271                           void *cb_cls)
1272 {
1273   struct GNUNET_DHT_MonitorHandle *h;
1274   struct GNUNET_DHT_MonitorStartStopMessage *m;
1275   struct PendingMessage *pending;
1276
1277   h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
1278   GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
1279
1280   h->get_cb = get_cb;
1281   h->get_resp_cb = get_resp_cb;
1282   h->put_cb = put_cb;
1283   h->cb_cls = cb_cls;
1284   h->type = type;
1285   h->dht_handle = handle;
1286   if (NULL != key)
1287   {
1288     h->key = GNUNET_malloc (sizeof(struct GNUNET_HashCode));
1289     memcpy (h->key, key, sizeof(struct GNUNET_HashCode));
1290   }
1291
1292   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
1293                            sizeof (struct PendingMessage));
1294   m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
1295   pending->msg = &m->header;
1296   pending->handle = handle;
1297   pending->free_on_send = GNUNET_YES;
1298   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
1299   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
1300   m->type = htonl(type);
1301   m->get = htons(NULL != get_cb);
1302   m->get_resp = htons(NULL != get_resp_cb);
1303   m->put = htons(NULL != put_cb);
1304   if (NULL != key) {
1305     m->filter_key = htons(1);
1306     memcpy (&m->key, key, sizeof(struct GNUNET_HashCode));
1307   }
1308   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1309                                pending);
1310   pending->in_pending_queue = GNUNET_YES;
1311   process_pending_messages (handle);
1312
1313   return h;
1314 }
1315
1316
1317 /**
1318  * Stop monitoring.
1319  *
1320  * @param handle The handle to the monitor request returned by monitor_start.
1321  *
1322  * On return get_handle will no longer be valid, caller must not use again!!!
1323  */
1324 void
1325 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle)
1326 {
1327   struct GNUNET_DHT_MonitorStartStopMessage *m;
1328   struct PendingMessage *pending;
1329
1330   GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head,
1331                                handle->dht_handle->monitor_tail,
1332                                handle);
1333
1334   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
1335                            sizeof (struct PendingMessage));
1336   m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
1337   pending->msg = &m->header;
1338   pending->handle = handle->dht_handle;
1339   pending->free_on_send = GNUNET_YES;
1340   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
1341   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
1342   m->type = htonl(handle->type);
1343   m->get = htons(NULL != handle->get_cb);
1344   m->get_resp = htons(NULL != handle->get_resp_cb);
1345   m->put = htons(NULL != handle->put_cb);
1346   if (NULL != handle->key) {
1347     m->filter_key = htons(1);
1348     memcpy (&m->key, handle->key, sizeof(struct GNUNET_HashCode));
1349   }
1350   GNUNET_CONTAINER_DLL_insert (handle->dht_handle->pending_head,
1351                                handle->dht_handle->pending_tail,
1352                                pending);
1353   pending->in_pending_queue = GNUNET_YES;
1354   process_pending_messages (handle->dht_handle);
1355   
1356   GNUNET_free_non_null (handle->key);
1357   GNUNET_free (handle);
1358 }
1359
1360
1361
1362 /* end of dht_api.c */