-fixing #2340
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
38
39 /**
40  * Entry in our list of messages to be (re-)transmitted.
41  */
42 struct PendingMessage
43 {
44   /**
45    * This is a doubly-linked list.
46    */
47   struct PendingMessage *prev;
48
49   /**
50    * This is a doubly-linked list.
51    */
52   struct PendingMessage *next;
53
54   /**
55    * Message that is pending, allocated at the end
56    * of this struct.
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60   /**
61    * Handle to the DHT API context.
62    */
63   struct GNUNET_DHT_Handle *handle;
64
65   /**
66    * Continuation to call when the request has been
67    * transmitted (for the first time) to the service; can be NULL.
68    */
69   GNUNET_SCHEDULER_Task cont;
70
71   /**
72    * Closure for 'cont'.
73    */
74   void *cont_cls;
75
76   /**
77    * Unique ID for this request
78    */
79   uint64_t unique_id;
80
81   /**
82    * Free the saved message once sent, set to GNUNET_YES for messages
83    * that do not receive responses; GNUNET_NO if this pending message
84    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
85    * from there.
86    */
87   int free_on_send;
88
89   /**
90    * GNUNET_YES if this message is in our pending queue right now.
91    */
92   int in_pending_queue;
93
94 };
95
96
97 /**
98  * Handle to a PUT request.
99  */
100 struct GNUNET_DHT_PutHandle
101 {
102   /**
103    * Kept in a DLL.
104    */
105   struct GNUNET_DHT_PutHandle *next;
106
107   /**
108    * Kept in a DLL.
109    */
110   struct GNUNET_DHT_PutHandle *prev;
111
112   /**
113    * Continuation to call when done.
114    */
115   GNUNET_DHT_PutContinuation cont;
116
117   /**
118    * Pending message associated with this PUT operation, 
119    * NULL after the message has been transmitted to the service.
120    */
121   struct PendingMessage *pending;
122
123   /**
124    * Main handle to this DHT api
125    */
126   struct GNUNET_DHT_Handle *dht_handle;
127
128   /**
129    * Closure for 'cont'.
130    */
131   void *cont_cls;
132
133   /**
134    * Timeout task for this operation.
135    */
136   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
137
138   /**
139    * Unique ID for the PUT operation.
140    */
141   uint64_t unique_id;
142
143 };
144
145
146
147 /**
148  * Handle to a GET request
149  */
150 struct GNUNET_DHT_GetHandle
151 {
152
153   /**
154    * Iterator to call on data receipt
155    */
156   GNUNET_DHT_GetIterator iter;
157
158   /**
159    * Closure for the iterator callback
160    */
161   void *iter_cls;
162
163   /**
164    * Main handle to this DHT api
165    */
166   struct GNUNET_DHT_Handle *dht_handle;
167
168   /**
169    * The actual message sent for this request,
170    * used for retransmitting requests on service
171    * failure/reconnect.  Freed on route_stop.
172    */
173   struct PendingMessage *message;
174
175   /**
176    * Key that this get request is for
177    */
178   GNUNET_HashCode key;
179
180   /**
181    * Unique identifier for this request (for key collisions).
182    */
183   uint64_t unique_id;
184
185 };
186
187
188 /**
189  * Handle to a monitoring request.
190  */
191 struct GNUNET_DHT_MonitorHandle
192 {
193   /**
194    * DLL.
195    */
196   struct GNUNET_DHT_MonitorHandle *next;
197
198   /**
199    * DLL.
200    */
201   struct GNUNET_DHT_MonitorHandle *prev;
202   
203   /**
204    * Main handle to this DHT api.
205    */
206   struct GNUNET_DHT_Handle *dht_handle;
207
208   /**
209    * Type of block looked for.
210    */
211   enum GNUNET_BLOCK_Type type;
212
213   /**
214    * Key being looked for, NULL == all.
215    */
216   GNUNET_HashCode *key;
217
218   /**
219    * Callback for each received message of type get.
220    */
221   GNUNET_DHT_MonitorGetCB get_cb;
222
223   /**
224    * Callback for each received message of type get response.
225    */
226   GNUNET_DHT_MonitorGetRespCB get_resp_cb;
227
228   /**
229    * Callback for each received message of type put.
230    */
231   GNUNET_DHT_MonitorPutCB put_cb;
232
233   /**
234    * Closure for cb.
235    */
236   void *cb_cls;
237   
238 };
239
240
241 /**
242  * Connection to the DHT service.
243  */
244 struct GNUNET_DHT_Handle
245 {
246
247   /**
248    * Configuration to use.
249    */
250   const struct GNUNET_CONFIGURATION_Handle *cfg;
251
252   /**
253    * Socket (if available).
254    */
255   struct GNUNET_CLIENT_Connection *client;
256
257   /**
258    * Currently pending transmission request (or NULL).
259    */
260   struct GNUNET_CLIENT_TransmitHandle *th;
261
262   /**
263    * Head of linked list of messages we would like to transmit.
264    */
265   struct PendingMessage *pending_head;
266
267   /**
268    * Tail of linked list of messages we would like to transmit.
269    */
270   struct PendingMessage *pending_tail;
271
272   /**
273    * Head of linked list of messages we would like to monitor. 
274    */
275   struct GNUNET_DHT_MonitorHandle *monitor_head;
276
277   /**
278    * Tail of linked list of messages we would like to monitor.
279    */
280   struct GNUNET_DHT_MonitorHandle *monitor_tail;
281
282   /**
283    * Head of active PUT requests.
284    */
285   struct GNUNET_DHT_PutHandle *put_head;
286
287   /**
288    * Tail of active PUT requests.
289    */
290   struct GNUNET_DHT_PutHandle *put_tail;
291
292   /**
293    * Hash map containing the current outstanding unique GET requests
294    * (values are of type 'struct GNUNET_DHT_GetHandle').
295    */
296   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
297
298   /**
299    * Task for trying to reconnect.
300    */
301   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
302
303   /**
304    * How quickly should we retry?  Used for exponential back-off on
305    * connect-errors.
306    */
307   struct GNUNET_TIME_Relative retry_time;
308
309   /**
310    * Generator for unique ids.
311    */
312   uint64_t uid_gen;
313
314   /**
315    * Did we start our receive loop yet?
316    */
317   int in_receive;
318 };
319
320
321 /**
322  * Handler for messages received from the DHT service
323  * a demultiplexer which handles numerous message types
324  *
325  * @param cls the 'struct GNUNET_DHT_Handle'
326  * @param msg the incoming message
327  */
328 static void
329 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
330
331
332 /**
333  * Try to (re)connect to the DHT service.
334  *
335  * @param handle DHT handle to reconnect
336  * @return GNUNET_YES on success, GNUNET_NO on failure.
337  */
338 static int
339 try_connect (struct GNUNET_DHT_Handle *handle)
340 {
341   if (NULL != handle->client)
342     return GNUNET_OK;
343   handle->in_receive = GNUNET_NO;
344   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
345   if (NULL == handle->client)
346   {
347     LOG (GNUNET_ERROR_TYPE_WARNING,
348          _("Failed to connect to the DHT service!\n"));
349     return GNUNET_NO;
350   }
351   return GNUNET_YES;
352 }
353
354
355 /**
356  * Add the request corresponding to the given route handle
357  * to the pending queue (if it is not already in there).
358  *
359  * @param cls the 'struct GNUNET_DHT_Handle*'
360  * @param key key for the request (not used)
361  * @param value the 'struct GNUNET_DHT_GetHandle*'
362  * @return GNUNET_YES (always)
363  */
364 static int
365 add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
366 {
367   struct GNUNET_DHT_Handle *handle = cls;
368   struct GNUNET_DHT_GetHandle *rh = value;
369
370   if (GNUNET_NO == rh->message->in_pending_queue)
371   {
372     LOG (GNUNET_ERROR_TYPE_DEBUG,
373          "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
374          handle);
375     GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
376                                  rh->message);
377     rh->message->in_pending_queue = GNUNET_YES;
378   }
379   return GNUNET_YES;
380 }
381
382
383 /**
384  * Try to send messages from list of messages to send
385  *
386  * @param handle DHT_Handle
387  */
388 static void
389 process_pending_messages (struct GNUNET_DHT_Handle *handle);
390
391
392 /**
393  * Try reconnecting to the dht service.
394  *
395  * @param cls GNUNET_DHT_Handle
396  * @param tc scheduler context
397  */
398 static void
399 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
400 {
401   struct GNUNET_DHT_Handle *handle = cls;
402
403   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with DHT %p\n", handle);
404   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
405   if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
406     handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
407   else
408     handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
409   if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
410     handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
411   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
412   if (GNUNET_YES != try_connect (handle))
413   {
414     LOG (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
415     return;
416   }
417   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
418                                          &add_request_to_pending, handle);
419   process_pending_messages (handle);
420 }
421
422
423 /**
424  * Try reconnecting to the DHT service.
425  *
426  * @param handle handle to dht to (possibly) disconnect and reconnect
427  */
428 static void
429 do_disconnect (struct GNUNET_DHT_Handle *handle)
430 {
431   struct GNUNET_DHT_PutHandle *ph;
432   struct GNUNET_DHT_PutHandle *next;
433
434   if (NULL == handle->client)
435     return;
436   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task);
437   if (NULL != handle->th)
438     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
439   handle->th = NULL;
440   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
441               "Disconnecting from DHT service, will try to reconnect in %llu ms\n",
442               (unsigned long long) handle->retry_time.rel_value);
443   GNUNET_CLIENT_disconnect (handle->client);
444   handle->client = NULL;
445
446   /* signal disconnect to all PUT requests that were transmitted but waiting
447      for the put confirmation */
448   next = handle->put_head;
449   while (NULL != (ph = next))
450   {
451     next = ph->next;
452     if (NULL == ph->pending)
453     {
454       if (NULL != ph->cont)
455         ph->cont (ph->cont_cls, GNUNET_SYSERR);
456       GNUNET_DHT_put_cancel (ph);
457     }
458   }
459   handle->reconnect_task =
460       GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
461 }
462
463
464 /**
465  * Transmit the next pending message, called by notify_transmit_ready
466  *
467  * @param cls the DHT handle
468  * @param size number of bytes available in 'buf' for transmission
469  * @param buf where to copy messages for the service
470  * @return number of bytes written to 'buf'
471  */
472 static size_t
473 transmit_pending (void *cls, size_t size, void *buf);
474
475
476 /**
477  * Try to send messages from list of messages to send
478  *
479  * @param handle handle to DHT
480  */
481 static void
482 process_pending_messages (struct GNUNET_DHT_Handle *handle)
483 {
484   struct PendingMessage *head;
485
486   if (NULL == handle->client)
487   {
488     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489                 "process_pending_messages called, but client is NULL, reconnecting\n");
490     do_disconnect (handle);
491     return;
492   }
493   if (NULL != handle->th)
494     return;
495   if (NULL == (head = handle->pending_head))
496     return;
497   handle->th =
498       GNUNET_CLIENT_notify_transmit_ready (handle->client,
499                                            ntohs (head->msg->size),
500                                            GNUNET_TIME_UNIT_FOREVER_REL,
501                                            GNUNET_YES, &transmit_pending,
502                                            handle);
503   if (NULL != handle->th)
504     return;
505   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
506               "notify_transmit_ready returned NULL, reconnecting\n");
507   do_disconnect (handle);
508 }
509
510
511 /**
512  * Transmit the next pending message, called by notify_transmit_ready
513  *
514  * @param cls the DHT handle
515  * @param size number of bytes available in 'buf' for transmission
516  * @param buf where to copy messages for the service
517  * @return number of bytes written to 'buf'
518  */
519 static size_t
520 transmit_pending (void *cls, size_t size, void *buf)
521 {
522   struct GNUNET_DHT_Handle *handle = cls;
523   struct PendingMessage *head;
524   size_t tsize;
525
526   handle->th = NULL;
527   if (NULL == buf)
528   {    
529     LOG (GNUNET_ERROR_TYPE_DEBUG,
530          "Transmission to DHT service failed!  Reconnecting!\n");
531     do_disconnect (handle);
532     return 0;
533   }
534   if (NULL == (head = handle->pending_head))
535     return 0;
536
537   tsize = ntohs (head->msg->size);
538   if (size < tsize)
539   {
540     process_pending_messages (handle);
541     return 0;
542   }
543   memcpy (buf, head->msg, tsize);
544   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
545                                head);
546   head->in_pending_queue = GNUNET_NO;
547   if (NULL != head->cont)
548   {
549     head->cont (head->cont_cls, NULL);
550     head->cont = NULL;
551     head->cont_cls = NULL;
552   }
553   if (GNUNET_YES == head->free_on_send)
554     GNUNET_free (head);
555   process_pending_messages (handle);
556   LOG (GNUNET_ERROR_TYPE_DEBUG,
557        "Forwarded request of %u bytes to DHT service\n", (unsigned int) tsize);
558   if (GNUNET_NO == handle->in_receive)
559   {
560     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n");
561     handle->in_receive = GNUNET_YES;
562     GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
563                            GNUNET_TIME_UNIT_FOREVER_REL);
564   }
565   return tsize;
566 }
567
568
569 /**
570  * Process a given reply that might match the given
571  * request.
572  *
573  * @param cls the 'struct GNUNET_DHT_ClientResultMessage'
574  * @param key query of the request
575  * @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key
576  * @return GNUNET_YES to continue to iterate over all results,
577  *         GNUNET_NO if the reply is malformed
578  */
579 static int
580 process_reply (void *cls, const GNUNET_HashCode * key, void *value)
581 {
582   const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
583   struct GNUNET_DHT_GetHandle *get_handle = value;
584   const struct GNUNET_PeerIdentity *put_path;
585   const struct GNUNET_PeerIdentity *get_path;
586   uint32_t put_path_length;
587   uint32_t get_path_length;
588   size_t data_length;
589   size_t msize;
590   size_t meta_length;
591   const void *data;
592
593   if (dht_msg->unique_id != get_handle->unique_id)
594   {
595     /* UID mismatch */
596     LOG (GNUNET_ERROR_TYPE_DEBUG,
597          "Ignoring reply for %s: UID mismatch: %llu/%llu\n", GNUNET_h2s (key),
598          dht_msg->unique_id, get_handle->unique_id);
599     return GNUNET_YES;
600   }
601   msize = ntohs (dht_msg->header.size);
602   put_path_length = ntohl (dht_msg->put_path_length);
603   get_path_length = ntohl (dht_msg->get_path_length);
604   meta_length =
605       sizeof (struct GNUNET_DHT_ClientResultMessage) +
606       sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
607   if ((msize < meta_length) ||
608       (get_path_length >
609        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
610       (put_path_length >
611        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
612   {
613     GNUNET_break (0);
614     return GNUNET_NO;
615   }
616   data_length = msize - meta_length;
617   LOG (GNUNET_ERROR_TYPE_DEBUG, "Giving %u byte reply for %s to application\n",
618        (unsigned int) data_length, GNUNET_h2s (key));
619   put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
620   get_path = &put_path[put_path_length];
621   data = &get_path[get_path_length];
622   get_handle->iter (get_handle->iter_cls,
623                     GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
624                     get_path, get_path_length, put_path, put_path_length,
625                     ntohl (dht_msg->type), data_length, data);
626   return GNUNET_YES;
627 }
628
629 /**
630  * Process a get monitor message from the service.
631  *
632  * @param handle The DHT handle.
633  * @param msg Monitor get message from the service.
634  * 
635  * @return GNUNET_OK if everything went fine,
636  *         GNUNET_SYSERR if the message is malformed.
637  */
638 static int
639 process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
640                              const struct GNUNET_DHT_MonitorGetMessage *msg)
641 {
642   struct GNUNET_DHT_MonitorHandle *h;
643
644   for (h = handle->monitor_head; NULL != h; h = h->next)
645   {
646     int type_ok;
647     int key_ok;
648
649     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
650     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
651                                                sizeof (GNUNET_HashCode)));
652     if (type_ok && key_ok && (NULL != h->get_cb))
653       h->get_cb (h->cb_cls,
654                  ntohl (msg->options),
655                  (enum GNUNET_BLOCK_Type) ntohl(msg->type),
656                  ntohl (msg->hop_count),
657                  ntohl (msg->desired_replication_level),
658                  ntohl (msg->get_path_length),
659                  (struct GNUNET_PeerIdentity *) &msg[1],
660                  &msg->key);    
661   }
662   return GNUNET_OK;
663 }
664
665
666 /**
667  * Process a get response monitor message from the service.
668  *
669  * @param handle The DHT handle.
670  * @param msg monitor get response message from the service
671  * @return GNUNET_OK if everything went fine,
672  *         GNUNET_SYSERR if the message is malformed.
673  */
674 static int
675 process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
676                                   const struct GNUNET_DHT_MonitorGetRespMessage
677                                   *msg)
678 {
679   struct GNUNET_DHT_MonitorHandle *h;
680   struct GNUNET_PeerIdentity *path;
681   uint32_t getl;
682   uint32_t putl;
683   size_t msize;
684
685   msize = ntohs (msg->header.size);
686   path = (struct GNUNET_PeerIdentity *) &msg[1];
687   getl = ntohl (msg->get_path_length);
688   putl = ntohl (msg->put_path_length);
689   if ( (getl + putl < getl) ||
690        ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) )
691   {
692     GNUNET_break (0);
693     return GNUNET_SYSERR;
694   }
695   for (h = handle->monitor_head; NULL != h; h = h->next)
696   {
697     int type_ok;
698     int key_ok;
699
700     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
701     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
702                                                sizeof (GNUNET_HashCode)));
703     if (type_ok && key_ok && (NULL != h->get_resp_cb))
704       h->get_resp_cb (h->cb_cls,
705                       (enum GNUNET_BLOCK_Type) ntohl(msg->type),
706                       path, getl,
707                       &path[getl], putl,
708                       GNUNET_TIME_absolute_ntoh(msg->expiration_time),
709                       &msg->key,
710                       (void *) &path[getl + putl],
711                       msize -
712                       sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
713                       sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
714   }
715   return GNUNET_OK;
716 }
717
718
719 /**
720  * Process a put monitor message from the service.
721  *
722  * @param handle The DHT handle.
723  * @param msg Monitor put message from the service.
724  * 
725  * @return GNUNET_OK if everything went fine,
726  *         GNUNET_SYSERR if the message is malformed.
727  */
728 static int
729 process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
730                              const struct GNUNET_DHT_MonitorPutMessage *msg)
731 {
732   struct GNUNET_DHT_MonitorHandle *h;
733   size_t msize;
734   struct GNUNET_PeerIdentity *path;
735   uint32_t putl;
736
737   msize = ntohs (msg->header.size);
738   path = (struct GNUNET_PeerIdentity *) &msg[1];
739   putl = ntohl (msg->put_path_length);
740   if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl)
741   {
742     GNUNET_break (0);
743     return GNUNET_SYSERR;
744   }
745   for (h = handle->monitor_head; NULL != h; h = h->next)
746   {
747     int type_ok;
748     int key_ok;
749
750     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
751     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
752                                                sizeof (GNUNET_HashCode)));
753     if (type_ok && key_ok && (NULL != h->put_cb))
754       h->put_cb (h->cb_cls,
755                  ntohl (msg->options),
756                  (enum GNUNET_BLOCK_Type) ntohl(msg->type),
757                  ntohl (msg->hop_count),
758                  ntohl (msg->desired_replication_level),
759                  putl, path,
760                  GNUNET_TIME_absolute_ntoh(msg->expiration_time),
761                  &msg->key,
762                  (void *) &path[putl],
763                  msize -
764                  sizeof (struct GNUNET_DHT_MonitorPutMessage) -
765                  sizeof (struct GNUNET_PeerIdentity) * putl);
766   }
767   return GNUNET_OK;
768 }
769
770
771 /**
772  * Process a put confirmation message from the service.
773  *
774  * @param handle The DHT handle.
775  * @param msg confirmation message from the service.
776  * @return GNUNET_OK if everything went fine,
777  *         GNUNET_SYSERR if the message is malformed.
778  */
779 static int
780 process_put_confirmation_message (struct GNUNET_DHT_Handle *handle,
781                                   const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
782 {
783   struct GNUNET_DHT_PutHandle *ph;
784   GNUNET_DHT_PutContinuation cont;
785   void *cont_cls;
786
787   for (ph = handle->put_head; NULL != ph; ph = ph->next)
788     if (ph->unique_id == msg->unique_id)
789       break;
790   if (NULL == ph)
791     return GNUNET_OK;
792   cont = ph->cont;
793   cont_cls = ph->cont_cls;
794   GNUNET_DHT_put_cancel (ph);
795   if (NULL != cont) 
796     cont (cont_cls, GNUNET_OK);
797   return GNUNET_OK;
798 }
799
800
801 /**
802  * Handler for messages received from the DHT service
803  * a demultiplexer which handles numerous message types
804  *
805  * @param cls the 'struct GNUNET_DHT_Handle'
806  * @param msg the incoming message
807  */
808 static void
809 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
810 {
811   struct GNUNET_DHT_Handle *handle = cls;
812   const struct GNUNET_DHT_ClientResultMessage *dht_msg;
813   uint16_t msize;
814   int ret;
815
816   if (NULL == msg)
817   {
818     LOG (GNUNET_ERROR_TYPE_DEBUG,
819          "Error receiving data from DHT service, reconnecting\n");
820     do_disconnect (handle);
821     return;
822   }
823   ret = GNUNET_SYSERR;
824   msize = ntohs (msg->size);
825   switch (ntohs (msg->type))
826   {
827   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
828     if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
829     {
830       GNUNET_break (0);
831       break;
832     }
833     ret = process_monitor_get_message(handle,
834                                       (const struct GNUNET_DHT_MonitorGetMessage *) msg);
835     break;
836   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
837     if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
838     {
839       GNUNET_break (0);
840       break;
841     }
842     ret = process_monitor_get_resp_message(handle,
843                                            (const struct GNUNET_DHT_MonitorGetRespMessage *) msg);
844     break;
845   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
846     if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
847     {
848       GNUNET_break (0);
849       break;
850     }
851     ret = process_monitor_put_message(handle,
852                                       (const struct GNUNET_DHT_MonitorPutMessage *) msg);
853     break;
854   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
855     /* Not implemented yet */
856     GNUNET_break(0);
857     break;
858   case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT:
859     if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
860     {
861       GNUNET_break (0);
862       break;
863     }
864     ret = GNUNET_OK;
865     dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
866     LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
867          GNUNET_h2s (&dht_msg->key), handle);
868     GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
869                                                 &dht_msg->key, &process_reply,
870                                                 (void *) dht_msg);
871     break;
872   case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK:
873     if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage))
874     {
875       GNUNET_break (0);
876       break;
877     }
878     ret = process_put_confirmation_message (handle,
879                                             (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg);
880     break;
881   default:
882     GNUNET_break(0);
883     break;
884   }
885   if (GNUNET_OK != ret)
886   {
887     GNUNET_break (0);
888     do_disconnect (handle);
889     return;
890   }
891   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
892                          GNUNET_TIME_UNIT_FOREVER_REL);
893 }
894
895
896 /**
897  * Initialize the connection with the DHT service.
898  *
899  * @param cfg configuration to use
900  * @param ht_len size of the internal hash table to use for
901  *               processing multiple GET/FIND requests in parallel
902  *
903  * @return handle to the DHT service, or NULL on error
904  */
905 struct GNUNET_DHT_Handle *
906 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
907                     unsigned int ht_len)
908 {
909   struct GNUNET_DHT_Handle *handle;
910
911   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
912   handle->cfg = cfg;
913   handle->uid_gen =
914       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
915   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
916   if (GNUNET_NO == try_connect (handle))
917   {
918     GNUNET_DHT_disconnect (handle);
919     return NULL;
920   }
921   return handle;
922 }
923
924
925 /**
926  * Shutdown connection with the DHT service.
927  *
928  * @param handle handle of the DHT connection to stop
929  */
930 void
931 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
932 {
933   struct PendingMessage *pm;
934   struct GNUNET_DHT_PutHandle *ph;
935
936   GNUNET_assert (NULL != handle);
937   GNUNET_assert (0 ==
938                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
939   if (NULL != handle->th)
940   {
941     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
942     handle->th = NULL;
943   }
944   while (NULL != (pm = handle->pending_head))
945   {
946     GNUNET_assert (GNUNET_YES == pm->in_pending_queue);
947     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
948                                  pm);
949     pm->in_pending_queue = GNUNET_NO;
950     GNUNET_assert (GNUNET_YES == pm->free_on_send);
951     if (NULL != pm->cont)
952       pm->cont (pm->cont_cls, NULL);
953     GNUNET_free (pm);
954   }
955   while (NULL != (ph = handle->put_head))
956   {
957     GNUNET_break (NULL == ph->pending);
958     if (NULL != ph->cont)
959       ph->cont (ph->cont_cls, GNUNET_SYSERR);
960     GNUNET_DHT_put_cancel (ph);
961   }
962
963   if (NULL != handle->client)
964   {
965     GNUNET_CLIENT_disconnect (handle->client);
966     handle->client = NULL;
967   }
968   if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
969     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
970   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
971   GNUNET_free (handle);
972 }
973
974
975 /**
976  * Timeout for the transmission of a fire&forget-request.  Clean it up.
977  *
978  * @param cls the 'struct PendingMessage'
979  * @param tc scheduler context
980  */
981 static void
982 timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
983 {
984   struct GNUNET_DHT_PutHandle *ph = cls;
985   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
986
987   ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
988   if (NULL != ph->pending)
989   {
990     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
991                                  ph->pending);
992     ph->pending->in_pending_queue = GNUNET_NO;
993     GNUNET_free (ph->pending);
994   }
995   if (NULL != ph->cont)
996     ph->cont (ph->cont_cls, GNUNET_NO);
997   GNUNET_CONTAINER_DLL_remove (handle->put_head,
998                                handle->put_tail,
999                                ph);
1000   GNUNET_free (ph);
1001 }
1002
1003
1004 /**
1005  * Function called whenever the PUT message leaves the queue.  Sets
1006  * the message pointer in the put handle to NULL.
1007  *
1008  * @param cls the 'struct GNUNET_DHT_PutHandle'
1009  * @param tc unused
1010  */
1011 static void
1012 mark_put_message_gone (void *cls,
1013                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1014 {
1015   struct GNUNET_DHT_PutHandle *ph = cls;
1016
1017   ph->pending = NULL;
1018 }
1019
1020
1021 /**
1022  * Perform a PUT operation storing data in the DHT.  FIXME: we should
1023  * change the protocol to get a confirmation for the PUT from the DHT
1024  * and call 'cont' only after getting the confirmation; otherwise, the
1025  * client has no good way of telling if the 'PUT' message actually got
1026  * to the DHT service!
1027  *
1028  * @param handle handle to DHT service
1029  * @param key the key to store under
1030  * @param desired_replication_level estimate of how many
1031  *                nearest peers this request should reach
1032  * @param options routing options for this message
1033  * @param type type of the value
1034  * @param size number of bytes in data; must be less than 64k
1035  * @param data the data to store
1036  * @param exp desired expiration time for the value
1037  * @param timeout how long to wait for transmission of this request
1038  * @param cont continuation to call when done (transmitting request to service)
1039  *        You must not call GNUNET_DHT_DISCONNECT in this continuation
1040  * @param cont_cls closure for cont
1041  */
1042 struct GNUNET_DHT_PutHandle *
1043 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
1044                 uint32_t desired_replication_level,
1045                 enum GNUNET_DHT_RouteOption options,
1046                 enum GNUNET_BLOCK_Type type, size_t size, const char *data,
1047                 struct GNUNET_TIME_Absolute exp,
1048                 struct GNUNET_TIME_Relative timeout, GNUNET_DHT_PutContinuation cont,
1049                 void *cont_cls)
1050 {
1051   struct GNUNET_DHT_ClientPutMessage *put_msg;
1052   size_t msize;
1053   struct PendingMessage *pending;
1054   struct GNUNET_DHT_PutHandle *ph;
1055
1056   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
1057   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1058       (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
1059   {
1060     GNUNET_break (0);
1061     return NULL;
1062   }
1063   ph = GNUNET_malloc (sizeof (struct GNUNET_DHT_PutHandle));
1064   ph->dht_handle = handle;
1065   ph->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, ph);
1066   ph->cont = cont;
1067   ph->cont_cls = cont_cls;
1068   ph->unique_id = ++handle->uid_gen;
1069   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1070   ph->pending = pending;
1071   put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
1072   pending->msg = &put_msg->header;
1073   pending->handle = handle;
1074   pending->cont = &mark_put_message_gone;
1075   pending->cont_cls = ph;
1076   pending->free_on_send = GNUNET_YES;
1077   put_msg->header.size = htons (msize);
1078   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
1079   put_msg->type = htonl (type);
1080   put_msg->options = htonl ((uint32_t) options);
1081   put_msg->desired_replication_level = htonl (desired_replication_level);
1082   put_msg->unique_id = ph->unique_id;
1083   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
1084   put_msg->key = *key;
1085   memcpy (&put_msg[1], data, size);
1086   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1087                                pending);
1088   pending->in_pending_queue = GNUNET_YES;
1089   GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1090                                     handle->put_tail,
1091                                     ph);
1092   process_pending_messages (handle);
1093   return ph;
1094 }
1095
1096
1097 /**
1098  * Cancels a DHT PUT operation.  Note that the PUT request may still
1099  * go out over the network (we can't stop that); However, if the PUT
1100  * has not yet been sent to the service, cancelling the PUT will stop
1101  * this from happening (but there is no way for the user of this API
1102  * to tell if that is the case).  The only use for this API is to 
1103  * prevent a later call to 'cont' from "GNUNET_DHT_put" (i.e. because
1104  * the system is shutting down).
1105  *
1106  * @param ph put operation to cancel ('cont' will no longer be called)
1107  */
1108 void
1109 GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1110 {
1111   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1112
1113   if (NULL != ph->pending)
1114   {
1115     GNUNET_CONTAINER_DLL_remove (handle->pending_head,
1116                                  handle->pending_tail,
1117                                  ph->pending);
1118     GNUNET_free (ph->pending);
1119     ph->pending = NULL;
1120   }
1121   if (ph->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1122   {
1123     GNUNET_SCHEDULER_cancel (ph->timeout_task);
1124     ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1125   }
1126   GNUNET_CONTAINER_DLL_remove (handle->put_head,
1127                                handle->put_tail,
1128                                ph);
1129   GNUNET_free (ph);
1130 }
1131
1132
1133 /**
1134  * Perform an asynchronous GET operation on the DHT identified. See
1135  * also "GNUNET_BLOCK_evaluate".
1136  *
1137  * @param handle handle to the DHT service
1138  * @param type expected type of the response object
1139  * @param key the key to look up
1140  * @param desired_replication_level estimate of how many
1141                   nearest peers this request should reach
1142  * @param options routing options for this message
1143  * @param xquery extended query data (can be NULL, depending on type)
1144  * @param xquery_size number of bytes in xquery
1145  * @param iter function to call on each result
1146  * @param iter_cls closure for iter
1147  * @return handle to stop the async get
1148  */
1149 struct GNUNET_DHT_GetHandle *
1150 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1151                       enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key,
1152                       uint32_t desired_replication_level,
1153                       enum GNUNET_DHT_RouteOption options, const void *xquery,
1154                       size_t xquery_size, GNUNET_DHT_GetIterator iter,
1155                       void *iter_cls)
1156 {
1157   struct GNUNET_DHT_ClientGetMessage *get_msg;
1158   struct GNUNET_DHT_GetHandle *get_handle;
1159   size_t msize;
1160   struct PendingMessage *pending;
1161
1162   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
1163   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1164       (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
1165   {
1166     GNUNET_break (0);
1167     return NULL;
1168   }
1169   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending query for %s to DHT %p\n",
1170        GNUNET_h2s (key), handle);
1171   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1172   get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
1173   pending->msg = &get_msg->header;
1174   pending->handle = handle;
1175   pending->free_on_send = GNUNET_NO;
1176   get_msg->header.size = htons (msize);
1177   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
1178   get_msg->options = htonl ((uint32_t) options);
1179   get_msg->desired_replication_level = htonl (desired_replication_level);
1180   get_msg->type = htonl (type);
1181   get_msg->key = *key;
1182   get_msg->unique_id = ++handle->uid_gen;
1183   memcpy (&get_msg[1], xquery, xquery_size);
1184   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1185                                pending);
1186   pending->in_pending_queue = GNUNET_YES;
1187   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
1188   get_handle->iter = iter;
1189   get_handle->iter_cls = iter_cls;
1190   get_handle->message = pending;
1191   get_handle->unique_id = get_msg->unique_id;
1192   GNUNET_CONTAINER_multihashmap_put (handle->active_requests, key, get_handle,
1193                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1194   process_pending_messages (handle);
1195   return get_handle;
1196 }
1197
1198
1199 /**
1200  * Stop async DHT-get.
1201  *
1202  * @param get_handle handle to the GET operation to stop
1203  */
1204 void
1205 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1206 {
1207   struct GNUNET_DHT_Handle *handle;
1208   const struct GNUNET_DHT_ClientGetMessage *get_msg;
1209   struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
1210   struct PendingMessage *pending;
1211
1212   handle = get_handle->message->handle;
1213   get_msg =
1214       (const struct GNUNET_DHT_ClientGetMessage *) get_handle->message->msg;
1215   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending STOP for %s to DHT via %p\n",
1216        GNUNET_h2s (&get_msg->key), handle);
1217   /* generate STOP */
1218   pending =
1219       GNUNET_malloc (sizeof (struct PendingMessage) +
1220                      sizeof (struct GNUNET_DHT_ClientGetStopMessage));
1221   stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
1222   pending->msg = &stop_msg->header;
1223   pending->handle = handle;
1224   pending->free_on_send = GNUNET_YES;
1225   stop_msg->header.size =
1226       htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
1227   stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
1228   stop_msg->reserved = htonl (0);
1229   stop_msg->unique_id = get_msg->unique_id;
1230   stop_msg->key = get_msg->key;
1231   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1232                                pending);
1233   pending->in_pending_queue = GNUNET_YES;
1234
1235   /* remove 'GET' from active status */
1236   GNUNET_assert (GNUNET_YES ==
1237                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
1238                                                        &get_msg->key,
1239                                                        get_handle));
1240   if (GNUNET_YES == get_handle->message->in_pending_queue)
1241   {
1242     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
1243                                  get_handle->message);
1244     get_handle->message->in_pending_queue = GNUNET_NO;
1245   }
1246   GNUNET_free (get_handle->message);
1247   GNUNET_free (get_handle);
1248
1249   process_pending_messages (handle);
1250 }
1251
1252
1253 /**
1254  * Start monitoring the local DHT service.
1255  *
1256  * @param handle Handle to the DHT service.
1257  * @param type Type of blocks that are of interest.
1258  * @param key Key of data of interest, NULL for all.
1259  * @param get_cb Callback to process monitored get messages.
1260  * @param get_resp_cb Callback to process monitored get response messages.
1261  * @param put_cb Callback to process monitored put messages.
1262  * @param cb_cls Closure for cb.
1263  *
1264  * @return Handle to stop monitoring.
1265  */
1266 struct GNUNET_DHT_MonitorHandle *
1267 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
1268                           enum GNUNET_BLOCK_Type type,
1269                           const GNUNET_HashCode *key,
1270                           GNUNET_DHT_MonitorGetCB get_cb,
1271                           GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1272                           GNUNET_DHT_MonitorPutCB put_cb,
1273                           void *cb_cls)
1274 {
1275   struct GNUNET_DHT_MonitorHandle *h;
1276   struct GNUNET_DHT_MonitorStartStopMessage *m;
1277   struct PendingMessage *pending;
1278
1279   h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
1280   GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
1281
1282   h->get_cb = get_cb;
1283   h->get_resp_cb = get_resp_cb;
1284   h->put_cb = put_cb;
1285   h->cb_cls = cb_cls;
1286   h->type = type;
1287   h->dht_handle = handle;
1288   if (NULL != key)
1289   {
1290     h->key = GNUNET_malloc (sizeof(GNUNET_HashCode));
1291     memcpy (h->key, key, sizeof(GNUNET_HashCode));
1292   }
1293
1294   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
1295                            sizeof (struct PendingMessage));
1296   m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
1297   pending->msg = &m->header;
1298   pending->handle = handle;
1299   pending->free_on_send = GNUNET_YES;
1300   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
1301   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
1302   m->type = htonl(type);
1303   m->get = htons(NULL != get_cb);
1304   m->get_resp = htons(NULL != get_resp_cb);
1305   m->put = htons(NULL != put_cb);
1306   if (NULL != key) {
1307     m->filter_key = htons(1);
1308     memcpy (&m->key, key, sizeof(GNUNET_HashCode));
1309   }
1310   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1311                                pending);
1312   pending->in_pending_queue = GNUNET_YES;
1313   process_pending_messages (handle);
1314
1315   return h;
1316 }
1317
1318
1319 /**
1320  * Stop monitoring.
1321  *
1322  * @param handle The handle to the monitor request returned by monitor_start.
1323  *
1324  * On return get_handle will no longer be valid, caller must not use again!!!
1325  */
1326 void
1327 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle)
1328 {
1329   struct GNUNET_DHT_MonitorStartStopMessage *m;
1330   struct PendingMessage *pending;
1331
1332   GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head,
1333                                handle->dht_handle->monitor_tail,
1334                                handle);
1335
1336   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
1337                            sizeof (struct PendingMessage));
1338   m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
1339   pending->msg = &m->header;
1340   pending->handle = handle->dht_handle;
1341   pending->free_on_send = GNUNET_YES;
1342   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
1343   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
1344   m->type = htonl(handle->type);
1345   m->get = htons(NULL != handle->get_cb);
1346   m->get_resp = htons(NULL != handle->get_resp_cb);
1347   m->put = htons(NULL != handle->put_cb);
1348   if (NULL != handle->key) {
1349     m->filter_key = htons(1);
1350     memcpy (&m->key, handle->key, sizeof(GNUNET_HashCode));
1351   }
1352   GNUNET_CONTAINER_DLL_insert (handle->dht_handle->pending_head,
1353                                handle->dht_handle->pending_tail,
1354                                pending);
1355   pending->in_pending_queue = GNUNET_YES;
1356   process_pending_messages (handle->dht_handle);
1357   
1358   GNUNET_free_non_null (handle->key);
1359   GNUNET_free (handle);
1360 }
1361
1362
1363
1364 /* end of dht_api.c */