-allow caller ID to differ from zone used for resolution
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
38
39 /**
40  * Entry in our list of messages to be (re-)transmitted.
41  */
42 struct PendingMessage
43 {
44   /**
45    * This is a doubly-linked list.
46    */
47   struct PendingMessage *prev;
48
49   /**
50    * This is a doubly-linked list.
51    */
52   struct PendingMessage *next;
53
54   /**
55    * Message that is pending, allocated at the end
56    * of this struct.
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60   /**
61    * Handle to the DHT API context.
62    */
63   struct GNUNET_DHT_Handle *handle;
64
65   /**
66    * Continuation to call when the request has been
67    * transmitted (for the first time) to the service; can be NULL.
68    */
69   GNUNET_SCHEDULER_Task cont;
70
71   /**
72    * Closure for 'cont'.
73    */
74   void *cont_cls;
75
76   /**
77    * Unique ID for this request
78    */
79   uint64_t unique_id;
80
81   /**
82    * Free the saved message once sent, set to GNUNET_YES for messages
83    * that do not receive responses; GNUNET_NO if this pending message
84    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
85    * from there.
86    */
87   int free_on_send;
88
89   /**
90    * GNUNET_YES if this message is in our pending queue right now.
91    */
92   int in_pending_queue;
93
94 };
95
96
97 /**
98  * Handle to a PUT request.
99  */
100 struct GNUNET_DHT_PutHandle
101 {
102   /**
103    * Kept in a DLL.
104    */
105   struct GNUNET_DHT_PutHandle *next;
106
107   /**
108    * Kept in a DLL.
109    */
110   struct GNUNET_DHT_PutHandle *prev;
111
112   /**
113    * Continuation to call when done.
114    */
115   GNUNET_DHT_PutContinuation cont;
116
117   /**
118    * Pending message associated with this PUT operation,
119    * NULL after the message has been transmitted to the service.
120    */
121   struct PendingMessage *pending;
122
123   /**
124    * Main handle to this DHT api
125    */
126   struct GNUNET_DHT_Handle *dht_handle;
127
128   /**
129    * Closure for 'cont'.
130    */
131   void *cont_cls;
132
133   /**
134    * Timeout task for this operation.
135    */
136   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
137
138   /**
139    * Unique ID for the PUT operation.
140    */
141   uint64_t unique_id;
142
143 };
144
145
146
147 /**
148  * Handle to a GET request
149  */
150 struct GNUNET_DHT_GetHandle
151 {
152
153   /**
154    * Iterator to call on data receipt
155    */
156   GNUNET_DHT_GetIterator iter;
157
158   /**
159    * Closure for the iterator callback
160    */
161   void *iter_cls;
162
163   /**
164    * Main handle to this DHT api
165    */
166   struct GNUNET_DHT_Handle *dht_handle;
167
168   /**
169    * The actual message sent for this request,
170    * used for retransmitting requests on service
171    * failure/reconnect.  Freed on route_stop.
172    */
173   struct PendingMessage *message;
174
175   /**
176    * Array of hash codes over the results that we have already
177    * seen.
178    */
179   struct GNUNET_HashCode *seen_results;
180
181   /**
182    * Key that this get request is for
183    */
184   struct GNUNET_HashCode key;
185
186   /**
187    * Unique identifier for this request (for key collisions).
188    */
189   uint64_t unique_id;
190
191   /**
192    * Size of the 'seen_results' array.  Note that not
193    * all positions might be used (as we over-allocate).
194    */
195   unsigned int seen_results_size;
196
197   /**
198    * Offset into the 'seen_results' array marking the
199    * end of the positions that are actually used.
200    */
201   unsigned int seen_results_end;
202
203   /**
204    * Offset into the 'seen_results' array marking the
205    * position up to where we've send the hash codes to
206    * the DHT for blocking (needed as we might not be
207    * able to send all hash codes at once).
208    */
209   unsigned int seen_results_transmission_offset;
210
211
212 };
213
214
215 /**
216  * Handle to a monitoring request.
217  */
218 struct GNUNET_DHT_MonitorHandle
219 {
220   /**
221    * DLL.
222    */
223   struct GNUNET_DHT_MonitorHandle *next;
224
225   /**
226    * DLL.
227    */
228   struct GNUNET_DHT_MonitorHandle *prev;
229
230   /**
231    * Main handle to this DHT api.
232    */
233   struct GNUNET_DHT_Handle *dht_handle;
234
235   /**
236    * Type of block looked for.
237    */
238   enum GNUNET_BLOCK_Type type;
239
240   /**
241    * Key being looked for, NULL == all.
242    */
243   struct GNUNET_HashCode *key;
244
245   /**
246    * Callback for each received message of type get.
247    */
248   GNUNET_DHT_MonitorGetCB get_cb;
249
250   /**
251    * Callback for each received message of type get response.
252    */
253   GNUNET_DHT_MonitorGetRespCB get_resp_cb;
254
255   /**
256    * Callback for each received message of type put.
257    */
258   GNUNET_DHT_MonitorPutCB put_cb;
259
260   /**
261    * Closure for cb.
262    */
263   void *cb_cls;
264
265 };
266
267
268 /**
269  * Connection to the DHT service.
270  */
271 struct GNUNET_DHT_Handle
272 {
273
274   /**
275    * Configuration to use.
276    */
277   const struct GNUNET_CONFIGURATION_Handle *cfg;
278
279   /**
280    * Socket (if available).
281    */
282   struct GNUNET_CLIENT_Connection *client;
283
284   /**
285    * Currently pending transmission request (or NULL).
286    */
287   struct GNUNET_CLIENT_TransmitHandle *th;
288
289   /**
290    * Head of linked list of messages we would like to transmit.
291    */
292   struct PendingMessage *pending_head;
293
294   /**
295    * Tail of linked list of messages we would like to transmit.
296    */
297   struct PendingMessage *pending_tail;
298
299   /**
300    * Head of linked list of messages we would like to monitor.
301    */
302   struct GNUNET_DHT_MonitorHandle *monitor_head;
303
304   /**
305    * Tail of linked list of messages we would like to monitor.
306    */
307   struct GNUNET_DHT_MonitorHandle *monitor_tail;
308
309   /**
310    * Head of active PUT requests.
311    */
312   struct GNUNET_DHT_PutHandle *put_head;
313
314   /**
315    * Tail of active PUT requests.
316    */
317   struct GNUNET_DHT_PutHandle *put_tail;
318
319   /**
320    * Hash map containing the current outstanding unique GET requests
321    * (values are of type `struct GNUNET_DHT_GetHandle`).
322    */
323   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
324
325   /**
326    * Task for trying to reconnect.
327    */
328   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
329
330   /**
331    * How quickly should we retry?  Used for exponential back-off on
332    * connect-errors.
333    */
334   struct GNUNET_TIME_Relative retry_time;
335
336   /**
337    * Generator for unique ids.
338    */
339   uint64_t uid_gen;
340
341   /**
342    * Did we start our receive loop yet?
343    */
344   int in_receive;
345 };
346
347
348 /**
349  * Handler for messages received from the DHT service
350  * a demultiplexer which handles numerous message types
351  *
352  * @param cls the `struct GNUNET_DHT_Handle`
353  * @param msg the incoming message
354  */
355 static void
356 service_message_handler (void *cls,
357                          const struct GNUNET_MessageHeader *msg);
358
359
360 /**
361  * Try to (re)connect to the DHT service.
362  *
363  * @param handle DHT handle to reconnect
364  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
365  */
366 static int
367 try_connect (struct GNUNET_DHT_Handle *handle)
368 {
369   if (NULL != handle->client)
370     return GNUNET_OK;
371   handle->in_receive = GNUNET_NO;
372   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
373   if (NULL == handle->client)
374   {
375     LOG (GNUNET_ERROR_TYPE_WARNING,
376          _("Failed to connect to the DHT service!\n"));
377     return GNUNET_NO;
378   }
379   return GNUNET_YES;
380 }
381
382
383 /**
384  * Queue messages to DHT to block certain results from the result set.
385  *
386  * @param get_handle GET to generate messages for.
387  */
388 static void
389 queue_filter_messages (struct GNUNET_DHT_GetHandle *get_handle)
390 {
391   struct PendingMessage *pm;
392   struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
393   uint16_t msize;
394   unsigned int delta;
395   unsigned int max;
396
397   while (get_handle->seen_results_transmission_offset < get_handle->seen_results_end)
398   {
399     delta = get_handle->seen_results_end - get_handle->seen_results_transmission_offset;
400     max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
401     if (delta > max)
402       delta = max;
403     msize = sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + delta * sizeof (struct GNUNET_HashCode);
404
405     pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
406     msg = (struct GNUNET_DHT_ClientGetResultSeenMessage *) &pm[1];
407     pm->msg = &msg->header;
408     pm->handle = get_handle->dht_handle;
409     pm->unique_id = get_handle->unique_id;
410     pm->free_on_send = GNUNET_YES;
411     pm->in_pending_queue = GNUNET_YES;
412     msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
413     msg->header.size = htons (msize);
414     msg->key = get_handle->key;
415     msg->unique_id = get_handle->unique_id;
416     memcpy (&msg[1],
417             &get_handle->seen_results[get_handle->seen_results_transmission_offset],
418             sizeof (struct GNUNET_HashCode) * delta);
419     get_handle->seen_results_transmission_offset += delta;
420     GNUNET_CONTAINER_DLL_insert_tail (get_handle->dht_handle->pending_head,
421                                       get_handle->dht_handle->pending_tail,
422                                       pm);
423   }
424 }
425
426
427 /**
428  * Add the request corresponding to the given route handle
429  * to the pending queue (if it is not already in there).
430  *
431  * @param cls the `struct GNUNET_DHT_Handle *`
432  * @param key key for the request (not used)
433  * @param value the `struct GNUNET_DHT_GetHandle *`
434  * @return #GNUNET_YES (always)
435  */
436 static int
437 add_request_to_pending (void *cls,
438                         const struct GNUNET_HashCode *key,
439                         void *value)
440 {
441   struct GNUNET_DHT_Handle *handle = cls;
442   struct GNUNET_DHT_GetHandle *get_handle = value;
443
444   if (GNUNET_NO == get_handle->message->in_pending_queue)
445   {
446     LOG (GNUNET_ERROR_TYPE_DEBUG,
447          "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
448          handle);
449     get_handle->seen_results_transmission_offset = 0;
450     GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
451                                  get_handle->message);
452     queue_filter_messages (get_handle);
453     get_handle->message->in_pending_queue = GNUNET_YES;
454   }
455   return GNUNET_YES;
456 }
457
458
459 /**
460  * Try to send messages from list of messages to send
461  *
462  * @param handle DHT_Handle
463  */
464 static void
465 process_pending_messages (struct GNUNET_DHT_Handle *handle);
466
467
468 /**
469  * Try reconnecting to the dht service.
470  *
471  * @param cls a `struct GNUNET_DHT_Handle`
472  * @param tc scheduler context
473  */
474 static void
475 try_reconnect (void *cls,
476                const struct GNUNET_SCHEDULER_TaskContext *tc)
477 {
478   struct GNUNET_DHT_Handle *handle = cls;
479
480   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with DHT %p\n", handle);
481   handle->retry_time = GNUNET_TIME_STD_BACKOFF (handle->retry_time);
482   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
483   if (GNUNET_YES != try_connect (handle))
484   {
485     LOG (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
486     return;
487   }
488   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
489                                          &add_request_to_pending, handle);
490   process_pending_messages (handle);
491 }
492
493
494 /**
495  * Try reconnecting to the DHT service.
496  *
497  * @param handle handle to dht to (possibly) disconnect and reconnect
498  */
499 static void
500 do_disconnect (struct GNUNET_DHT_Handle *handle)
501 {
502   struct GNUNET_DHT_PutHandle *ph;
503   struct GNUNET_DHT_PutHandle *next;
504
505   if (NULL == handle->client)
506     return;
507   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task);
508   if (NULL != handle->th)
509     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
510   handle->th = NULL;
511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
512               "Disconnecting from DHT service, will try to reconnect in %s\n",
513               GNUNET_STRINGS_relative_time_to_string (handle->retry_time,
514                                                       GNUNET_YES));
515   GNUNET_CLIENT_disconnect (handle->client);
516   handle->client = NULL;
517
518   /* signal disconnect to all PUT requests that were transmitted but waiting
519      for the put confirmation */
520   next = handle->put_head;
521   while (NULL != (ph = next))
522   {
523     next = ph->next;
524     if (NULL == ph->pending)
525     {
526       if (NULL != ph->cont)
527         ph->cont (ph->cont_cls, GNUNET_SYSERR);
528       GNUNET_DHT_put_cancel (ph);
529     }
530   }
531   handle->reconnect_task =
532       GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
533 }
534
535
536 /**
537  * Transmit the next pending message, called by notify_transmit_ready
538  *
539  * @param cls the DHT handle
540  * @param size number of bytes available in @a buf for transmission
541  * @param buf where to copy messages for the service
542  * @return number of bytes written to @a buf
543  */
544 static size_t
545 transmit_pending (void *cls,
546                   size_t size,
547                   void *buf);
548
549
550 /**
551  * Try to send messages from list of messages to send
552  *
553  * @param handle handle to DHT
554  */
555 static void
556 process_pending_messages (struct GNUNET_DHT_Handle *handle)
557 {
558   struct PendingMessage *head;
559
560   if (NULL == handle->client)
561   {
562     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
563                 "process_pending_messages called, but client is NULL, reconnecting\n");
564     do_disconnect (handle);
565     return;
566   }
567   if (NULL != handle->th)
568     return;
569   if (NULL == (head = handle->pending_head))
570     return;
571   handle->th =
572       GNUNET_CLIENT_notify_transmit_ready (handle->client,
573                                            ntohs (head->msg->size),
574                                            GNUNET_TIME_UNIT_FOREVER_REL,
575                                            GNUNET_YES, &transmit_pending,
576                                            handle);
577   if (NULL != handle->th)
578     return;
579   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
580               "notify_transmit_ready returned NULL, reconnecting\n");
581   do_disconnect (handle);
582 }
583
584
585 /**
586  * Transmit the next pending message, called by notify_transmit_ready
587  *
588  * @param cls the DHT handle
589  * @param size number of bytes available in @a buf for transmission
590  * @param buf where to copy messages for the service
591  * @return number of bytes written to @a buf
592  */
593 static size_t
594 transmit_pending (void *cls,
595                   size_t size,
596                   void *buf)
597 {
598   struct GNUNET_DHT_Handle *handle = cls;
599   struct PendingMessage *head;
600   size_t tsize;
601
602
603   handle->th = NULL;
604   if (NULL == buf)
605   {
606     LOG (GNUNET_ERROR_TYPE_DEBUG,
607          "Transmission to DHT service failed!  Reconnecting!\n");
608     do_disconnect (handle);
609     return 0;
610   }
611   if (NULL == (head = handle->pending_head))
612     return 0;
613
614   tsize = ntohs (head->msg->size);
615   if (size < tsize)
616   {
617     process_pending_messages (handle);
618     return 0;
619   }
620   memcpy (buf, head->msg, tsize);
621   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
622                                head);
623   head->in_pending_queue = GNUNET_NO;
624   if (NULL != head->cont)
625   {
626     head->cont (head->cont_cls, NULL);
627     head->cont = NULL;
628     head->cont_cls = NULL;
629   }
630   if (GNUNET_YES == head->free_on_send)
631     GNUNET_free (head);
632   process_pending_messages (handle);
633   LOG (GNUNET_ERROR_TYPE_DEBUG,
634        "Forwarded request of %u bytes to DHT service\n", (unsigned int) tsize);
635   if (GNUNET_NO == handle->in_receive)
636   {
637     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n");
638     handle->in_receive = GNUNET_YES;
639     
640     GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
641                            GNUNET_TIME_UNIT_FOREVER_REL);
642   }
643   return tsize;
644 }
645
646
647 /**
648  * Process a given reply that might match the given
649  * request.
650  *
651  * @param cls the `struct GNUNET_DHT_ClientResultMessage`
652  * @param key query of the request
653  * @param value the `struct GNUNET_DHT_RouteHandle` of a request matching the same key
654  * @return #GNUNET_YES to continue to iterate over all results,
655  *         #GNUNET_NO if the reply is malformed or we found a matching request
656  */
657 static int
658 process_reply (void *cls,
659                const struct GNUNET_HashCode *key,
660                void *value)
661 {
662   const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
663   struct GNUNET_DHT_GetHandle *get_handle = value;
664   const struct GNUNET_PeerIdentity *put_path;
665   const struct GNUNET_PeerIdentity *get_path;
666   struct GNUNET_HashCode hc;
667   uint32_t put_path_length;
668   uint32_t get_path_length;
669   size_t data_length;
670   size_t msize;
671   size_t meta_length;
672   const void *data;
673
674   if (dht_msg->unique_id != get_handle->unique_id)
675   {
676     /* UID mismatch */
677     LOG (GNUNET_ERROR_TYPE_DEBUG,
678          "Ignoring reply for %s: UID mismatch: %llu/%llu\n", GNUNET_h2s (key),
679          dht_msg->unique_id, get_handle->unique_id);
680     return GNUNET_YES;
681   }
682   msize = ntohs (dht_msg->header.size);
683   put_path_length = ntohl (dht_msg->put_path_length);
684   get_path_length = ntohl (dht_msg->get_path_length);
685   meta_length =
686       sizeof (struct GNUNET_DHT_ClientResultMessage) +
687       sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
688   if ((msize < meta_length) ||
689       (get_path_length >
690        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
691       (put_path_length >
692        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
693   {
694     GNUNET_break (0);
695     return GNUNET_NO;
696   }
697   data_length = msize - meta_length;
698   LOG (GNUNET_ERROR_TYPE_DEBUG, "Giving %u byte reply for %s to application\n",
699        (unsigned int) data_length, GNUNET_h2s (key));
700   put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
701   get_path = &put_path[put_path_length];
702   data = &get_path[get_path_length];
703   /* remember that we've seen this result */
704   GNUNET_CRYPTO_hash (data, data_length, &hc);
705   if (get_handle->seen_results_size == get_handle->seen_results_end)
706     GNUNET_array_grow (get_handle->seen_results,
707                        get_handle->seen_results_size,
708                        get_handle->seen_results_size * 2 + 1);
709   GNUNET_assert (get_handle->seen_results_end == get_handle->seen_results_transmission_offset);
710   get_handle->seen_results[get_handle->seen_results_end++] = hc;
711   /* no need to block it explicitly, service already knows about it! */
712   get_handle->seen_results_transmission_offset++;
713   get_handle->iter (get_handle->iter_cls,
714                     GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
715                     get_path, get_path_length, put_path, put_path_length,
716                     ntohl (dht_msg->type), data_length, data);
717   return GNUNET_NO;
718 }
719
720
721 /**
722  * Process a get monitor message from the service.
723  *
724  * @param handle The DHT handle.
725  * @param msg Monitor get message from the service.
726  * @return #GNUNET_OK if everything went fine,
727  *         #GNUNET_SYSERR if the message is malformed.
728  */
729 static int
730 process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
731                              const struct GNUNET_DHT_MonitorGetMessage *msg)
732 {
733   struct GNUNET_DHT_MonitorHandle *h;
734
735   for (h = handle->monitor_head; NULL != h; h = h->next)
736   {
737     int type_ok;
738     int key_ok;
739
740     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
741     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
742                                                sizeof (struct GNUNET_HashCode)));
743     if (type_ok && key_ok && (NULL != h->get_cb))
744       h->get_cb (h->cb_cls,
745                  ntohl (msg->options),
746                  (enum GNUNET_BLOCK_Type) ntohl(msg->type),
747                  ntohl (msg->hop_count),
748                  ntohl (msg->desired_replication_level),
749                  ntohl (msg->get_path_length),
750                  (struct GNUNET_PeerIdentity *) &msg[1],
751                  &msg->key);
752   }
753   return GNUNET_OK;
754 }
755
756
757 /**
758  * Process a get response monitor message from the service.
759  *
760  * @param handle The DHT handle.
761  * @param msg monitor get response message from the service
762  * @return #GNUNET_OK if everything went fine,
763  *         #GNUNET_SYSERR if the message is malformed.
764  */
765 static int
766 process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
767                                   const struct GNUNET_DHT_MonitorGetRespMessage *msg)
768 {
769   struct GNUNET_DHT_MonitorHandle *h;
770   struct GNUNET_PeerIdentity *path;
771   uint32_t getl;
772   uint32_t putl;
773   size_t msize;
774
775   msize = ntohs (msg->header.size);
776   path = (struct GNUNET_PeerIdentity *) &msg[1];
777   getl = ntohl (msg->get_path_length);
778   putl = ntohl (msg->put_path_length);
779   if ( (getl + putl < getl) ||
780        ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) )
781   {
782     GNUNET_break (0);
783     return GNUNET_SYSERR;
784   }
785   for (h = handle->monitor_head; NULL != h; h = h->next)
786   {
787     int type_ok;
788     int key_ok;
789
790     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
791     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
792                                                sizeof (struct GNUNET_HashCode)));
793     if (type_ok && key_ok && (NULL != h->get_resp_cb))
794       h->get_resp_cb (h->cb_cls,
795                       (enum GNUNET_BLOCK_Type) ntohl(msg->type),
796                       path, getl,
797                       &path[getl], putl,
798                       GNUNET_TIME_absolute_ntoh(msg->expiration_time),
799                       &msg->key,
800                       (void *) &path[getl + putl],
801                       msize -
802                       sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
803                       sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
804   }
805   return GNUNET_OK;
806 }
807
808
809 /**
810  * Process a put monitor message from the service.
811  *
812  * @param handle The DHT handle.
813  * @param msg Monitor put message from the service.
814  * @return #GNUNET_OK if everything went fine,
815  *         #GNUNET_SYSERR if the message is malformed.
816  */
817 static int
818 process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
819                              const struct GNUNET_DHT_MonitorPutMessage *msg)
820 {
821   struct GNUNET_DHT_MonitorHandle *h;
822   size_t msize;
823   struct GNUNET_PeerIdentity *path;
824   uint32_t putl;
825
826   msize = ntohs (msg->header.size);
827   path = (struct GNUNET_PeerIdentity *) &msg[1];
828   putl = ntohl (msg->put_path_length);
829   if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl)
830   {
831     GNUNET_break (0);
832     return GNUNET_SYSERR;
833   }
834   for (h = handle->monitor_head; NULL != h; h = h->next)
835   {
836     int type_ok;
837     int key_ok;
838
839     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
840     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
841                                                sizeof (struct GNUNET_HashCode)));
842     if (type_ok && key_ok && (NULL != h->put_cb))
843       h->put_cb (h->cb_cls,
844                  ntohl (msg->options),
845                  (enum GNUNET_BLOCK_Type) ntohl(msg->type),
846                  ntohl (msg->hop_count),
847                  ntohl (msg->desired_replication_level),
848                  putl, path,
849                  GNUNET_TIME_absolute_ntoh(msg->expiration_time),
850                  &msg->key,
851                  (void *) &path[putl],
852                  msize -
853                  sizeof (struct GNUNET_DHT_MonitorPutMessage) -
854                  sizeof (struct GNUNET_PeerIdentity) * putl);
855   }
856   return GNUNET_OK;
857 }
858
859
860 /**
861  * Process a put confirmation message from the service.
862  *
863  * @param handle The DHT handle.
864  * @param msg confirmation message from the service.
865  * @return #GNUNET_OK if everything went fine,
866  *         #GNUNET_SYSERR if the message is malformed.
867  */
868 static int
869 process_put_confirmation_message (struct GNUNET_DHT_Handle *handle,
870                                   const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
871 {
872   struct GNUNET_DHT_PutHandle *ph;
873   GNUNET_DHT_PutContinuation cont;
874   void *cont_cls;
875
876   for (ph = handle->put_head; NULL != ph; ph = ph->next)
877     if (ph->unique_id == msg->unique_id)
878       break;
879   if (NULL == ph)
880     return GNUNET_OK;
881   cont = ph->cont;
882   cont_cls = ph->cont_cls;
883   GNUNET_DHT_put_cancel (ph);
884   if (NULL != cont)
885     cont (cont_cls, GNUNET_OK);
886   return GNUNET_OK;
887 }
888
889
890 /**
891  * Handler for messages received from the DHT service
892  * a demultiplexer which handles numerous message types
893  *
894  * @param cls the `struct GNUNET_DHT_Handle`
895  * @param msg the incoming message
896  */
897 static void
898 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
899 {
900   struct GNUNET_DHT_Handle *handle = cls;
901   const struct GNUNET_DHT_ClientResultMessage *dht_msg;
902   uint16_t msize;
903   int ret;
904
905
906   if (NULL == msg)
907   {
908     LOG (GNUNET_ERROR_TYPE_DEBUG,
909          "Error receiving data from DHT service, reconnecting\n");
910     do_disconnect (handle);
911     return;
912   }
913   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
914                          GNUNET_TIME_UNIT_FOREVER_REL);
915   ret = GNUNET_SYSERR;
916   msize = ntohs (msg->size);
917   switch (ntohs (msg->type))
918   {
919   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
920     if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
921     {
922       GNUNET_break (0);
923       break;
924     }
925     ret = process_monitor_get_message(handle,
926                                       (const struct GNUNET_DHT_MonitorGetMessage *) msg);
927     break;
928   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
929     if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
930     {
931       GNUNET_break (0);
932       break;
933     }
934     ret = process_monitor_get_resp_message(handle,
935                                            (const struct GNUNET_DHT_MonitorGetRespMessage *) msg);
936     break;
937   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
938     if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
939     {
940       GNUNET_break (0);
941       break;
942     }
943     ret = process_monitor_put_message(handle,
944                                       (const struct GNUNET_DHT_MonitorPutMessage *) msg);
945     break;
946   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
947     /* Not implemented yet */
948     GNUNET_break(0);
949     break;
950   case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT:
951     if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
952     {
953       GNUNET_break (0);
954       break;
955     }
956     dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
957     LOG (GNUNET_ERROR_TYPE_DEBUG,
958          "Received reply for `%s' from DHT service %p\n",
959          GNUNET_h2s (&dht_msg->key), handle);
960     GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
961                                                 &dht_msg->key,
962                                                 &process_reply,
963                                                 (void *) dht_msg);
964     ret = GNUNET_OK;
965     break;
966   case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK:
967     if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage))
968     {
969       GNUNET_break (0);
970       break;
971     }
972     ret = process_put_confirmation_message (handle,
973                                             (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg);
974     break;
975   default:
976     GNUNET_break(0);
977     LOG (GNUNET_ERROR_TYPE_WARNING,
978          "Unknown DHT message type: %hu (%hu) size: %hu\n",
979          ntohs (msg->type), msg->type, msize);
980     break;
981   }
982   if (GNUNET_OK != ret)
983   {
984     GNUNET_break (0);
985     do_disconnect (handle);
986     return;
987   }
988 }
989
990
991 /**
992  * Initialize the connection with the DHT service.
993  *
994  * @param cfg configuration to use
995  * @param ht_len size of the internal hash table to use for
996  *               processing multiple GET/FIND requests in parallel
997  * @return handle to the DHT service, or NULL on error
998  */
999 struct GNUNET_DHT_Handle *
1000 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1001                     unsigned int ht_len)
1002 {
1003   struct GNUNET_DHT_Handle *handle;
1004
1005   handle = GNUNET_new (struct GNUNET_DHT_Handle);
1006   handle->cfg = cfg;
1007   handle->uid_gen =
1008       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
1009   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len, GNUNET_YES);
1010   if (GNUNET_NO == try_connect (handle))
1011   {
1012     GNUNET_DHT_disconnect (handle);
1013     return NULL;
1014   }
1015   return handle;
1016 }
1017
1018
1019 /**
1020  * Shutdown connection with the DHT service.
1021  *
1022  * @param handle handle of the DHT connection to stop
1023  */
1024 void
1025 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
1026 {
1027   struct PendingMessage *pm;
1028   struct GNUNET_DHT_PutHandle *ph;
1029
1030   GNUNET_assert (NULL != handle);
1031   GNUNET_assert (0 ==
1032                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
1033   if (NULL != handle->th)
1034   {
1035     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
1036     handle->th = NULL;
1037   }
1038   while (NULL != (pm = handle->pending_head))
1039   {
1040     GNUNET_assert (GNUNET_YES == pm->in_pending_queue);
1041     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
1042                                  pm);
1043     pm->in_pending_queue = GNUNET_NO;
1044     GNUNET_assert (GNUNET_YES == pm->free_on_send);
1045     if (NULL != pm->cont)
1046       pm->cont (pm->cont_cls, NULL);
1047     GNUNET_free (pm);
1048   }
1049   while (NULL != (ph = handle->put_head))
1050   {
1051     GNUNET_break (NULL == ph->pending);
1052     if (NULL != ph->cont)
1053       ph->cont (ph->cont_cls, GNUNET_SYSERR);
1054     GNUNET_DHT_put_cancel (ph);
1055   }
1056
1057   if (NULL != handle->client)
1058   {
1059     GNUNET_CLIENT_disconnect (handle->client);
1060     handle->client = NULL;
1061   }
1062   if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
1063     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1064   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
1065   GNUNET_free (handle);
1066 }
1067
1068
1069 /**
1070  * Timeout for the transmission of a fire&forget-request.  Clean it up.
1071  *
1072  * @param cls the `struct GNUNET_DHT_PutHandle *`
1073  * @param tc scheduler context
1074  */
1075 static void
1076 timeout_put_request (void *cls,
1077                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1078 {
1079   struct GNUNET_DHT_PutHandle *ph = cls;
1080   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1081
1082   ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1083   if (NULL != ph->pending)
1084   {
1085     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
1086                                  ph->pending);
1087     ph->pending->in_pending_queue = GNUNET_NO;
1088     GNUNET_free (ph->pending);
1089   }
1090   if (NULL != ph->cont)
1091     ph->cont (ph->cont_cls, GNUNET_NO);
1092   GNUNET_CONTAINER_DLL_remove (handle->put_head,
1093                                handle->put_tail,
1094                                ph);
1095   GNUNET_free (ph);
1096 }
1097
1098
1099 /**
1100  * Function called whenever the PUT message leaves the queue.  Sets
1101  * the message pointer in the put handle to NULL.
1102  *
1103  * @param cls the `struct GNUNET_DHT_PutHandle`
1104  * @param tc unused
1105  */
1106 static void
1107 mark_put_message_gone (void *cls,
1108                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1109 {
1110   struct GNUNET_DHT_PutHandle *ph = cls;
1111
1112   ph->pending = NULL;
1113 }
1114
1115
1116 /**
1117  * Perform a PUT operation storing data in the DHT.  FIXME: we should
1118  * change the protocol to get a confirmation for the PUT from the DHT
1119  * and call 'cont' only after getting the confirmation; otherwise, the
1120  * client has no good way of telling if the 'PUT' message actually got
1121  * to the DHT service!
1122  *
1123  * @param handle handle to DHT service
1124  * @param key the key to store under
1125  * @param desired_replication_level estimate of how many
1126  *                nearest peers this request should reach
1127  * @param options routing options for this message
1128  * @param type type of the value
1129  * @param size number of bytes in data; must be less than 64k
1130  * @param data the data to store
1131  * @param exp desired expiration time for the value
1132  * @param timeout how long to wait for transmission of this request
1133  * @param cont continuation to call when done (transmitting request to service)
1134  *        You must not call #GNUNET_DHT_disconnect in this continuation
1135  * @param cont_cls closure for @a cont
1136  */
1137 struct GNUNET_DHT_PutHandle *
1138 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1139                         const struct GNUNET_HashCode * key,
1140                 uint32_t desired_replication_level,
1141                 enum GNUNET_DHT_RouteOption options,
1142                 enum GNUNET_BLOCK_Type type, size_t size,
1143                             const void *data,
1144                 struct GNUNET_TIME_Absolute exp,
1145                 struct GNUNET_TIME_Relative timeout,
1146                             GNUNET_DHT_PutContinuation cont,
1147                 void *cont_cls)
1148 {
1149   struct GNUNET_DHT_ClientPutMessage *put_msg;
1150   size_t msize;
1151   struct PendingMessage *pending;
1152   struct GNUNET_DHT_PutHandle *ph;
1153
1154
1155   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
1156   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1157       (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
1158   {
1159     GNUNET_break (0);
1160     return NULL;
1161   }
1162   ph = GNUNET_new (struct GNUNET_DHT_PutHandle);
1163   ph->dht_handle = handle;
1164   ph->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, ph);
1165   ph->cont = cont;
1166   ph->cont_cls = cont_cls;
1167   ph->unique_id = ++handle->uid_gen;
1168   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1169   ph->pending = pending;
1170   put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
1171   pending->msg = &put_msg->header;
1172   pending->handle = handle;
1173   pending->cont = &mark_put_message_gone;
1174   pending->cont_cls = ph;
1175   pending->free_on_send = GNUNET_YES;
1176   put_msg->header.size = htons (msize);
1177   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
1178   put_msg->type = htonl (type);
1179   put_msg->options = htonl ((uint32_t) options);
1180   put_msg->desired_replication_level = htonl (desired_replication_level);
1181   put_msg->unique_id = ph->unique_id;
1182   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
1183   put_msg->key = *key;
1184   memcpy (&put_msg[1], data, size);
1185   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1186                                pending);
1187   pending->in_pending_queue = GNUNET_YES;
1188   GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1189                                     handle->put_tail,
1190                                     ph);
1191   process_pending_messages (handle);
1192   return ph;
1193 }
1194
1195
1196 /**
1197  * Cancels a DHT PUT operation.  Note that the PUT request may still
1198  * go out over the network (we can't stop that); However, if the PUT
1199  * has not yet been sent to the service, cancelling the PUT will stop
1200  * this from happening (but there is no way for the user of this API
1201  * to tell if that is the case).  The only use for this API is to
1202  * prevent a later call to 'cont' from #GNUNET_DHT_put (i.e. because
1203  * the system is shutting down).
1204  *
1205  * @param ph put operation to cancel ('cont' will no longer be called)
1206  */
1207 void
1208 GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1209 {
1210   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1211
1212   if (NULL != ph->pending)
1213   {
1214     GNUNET_CONTAINER_DLL_remove (handle->pending_head,
1215                                  handle->pending_tail,
1216                                  ph->pending);
1217     GNUNET_free (ph->pending);
1218     ph->pending = NULL;
1219   }
1220   if (ph->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1221   {
1222     GNUNET_SCHEDULER_cancel (ph->timeout_task);
1223     ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1224   }
1225   GNUNET_CONTAINER_DLL_remove (handle->put_head,
1226                                handle->put_tail,
1227                                ph);
1228   GNUNET_free (ph);
1229 }
1230
1231
1232 /**
1233  * Perform an asynchronous GET operation on the DHT identified. See
1234  * also #GNUNET_BLOCK_evaluate.
1235  *
1236  * @param handle handle to the DHT service
1237  * @param type expected type of the response object
1238  * @param key the key to look up
1239  * @param desired_replication_level estimate of how many
1240                   nearest peers this request should reach
1241  * @param options routing options for this message
1242  * @param xquery extended query data (can be NULL, depending on type)
1243  * @param xquery_size number of bytes in @a xquery
1244  * @param iter function to call on each result
1245  * @param iter_cls closure for iter
1246  * @return handle to stop the async get
1247  */
1248 struct GNUNET_DHT_GetHandle *
1249 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1250                       enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode * key,
1251                       uint32_t desired_replication_level,
1252                       enum GNUNET_DHT_RouteOption options, const void *xquery,
1253                       size_t xquery_size, GNUNET_DHT_GetIterator iter,
1254                       void *iter_cls)
1255 {
1256   struct GNUNET_DHT_ClientGetMessage *get_msg;
1257   struct GNUNET_DHT_GetHandle *get_handle;
1258   size_t msize;
1259   struct PendingMessage *pending;
1260
1261   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
1262   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1263       (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
1264   {
1265     GNUNET_break (0);
1266     return NULL;
1267   }
1268   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending query for %s to DHT %p\n",
1269        GNUNET_h2s (key), handle);
1270   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1271   get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
1272   pending->msg = &get_msg->header;
1273   pending->handle = handle;
1274   pending->free_on_send = GNUNET_NO;
1275   get_msg->header.size = htons (msize);
1276   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
1277   get_msg->options = htonl ((uint32_t) options);
1278   get_msg->desired_replication_level = htonl (desired_replication_level);
1279   get_msg->type = htonl (type);
1280   get_msg->key = *key;
1281   get_msg->unique_id = ++handle->uid_gen;
1282   memcpy (&get_msg[1], xquery, xquery_size);
1283   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1284                                pending);
1285   pending->in_pending_queue = GNUNET_YES;
1286   get_handle = GNUNET_new (struct GNUNET_DHT_GetHandle);
1287   get_handle->key = *key;
1288   get_handle->dht_handle = handle;
1289   get_handle->iter = iter;
1290   get_handle->iter_cls = iter_cls;
1291   get_handle->message = pending;
1292   get_handle->unique_id = get_msg->unique_id;
1293   GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
1294                                      &get_handle->key,
1295                                      get_handle,
1296                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1297   process_pending_messages (handle);
1298   return get_handle;
1299 }
1300
1301
1302 /**
1303  * Tell the DHT not to return any of the following known results
1304  * to this client.
1305  *
1306  * @param get_handle get operation for which results should be filtered
1307  * @param num_results number of results to be blocked that are
1308  *        provided in this call (size of the @a results array)
1309  * @param results array of hash codes over the 'data' of the results
1310  *        to be blocked
1311  */
1312 void
1313 GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
1314                                      unsigned int num_results,
1315                                      const struct GNUNET_HashCode *results)
1316 {
1317   unsigned int needed;
1318
1319   needed = get_handle->seen_results_end + num_results;
1320   if (needed > get_handle->seen_results_size)
1321     GNUNET_array_grow (get_handle->seen_results,
1322                        get_handle->seen_results_size,
1323                        needed);
1324   memcpy (&get_handle->seen_results[get_handle->seen_results_end],
1325           results,
1326           num_results * sizeof (struct GNUNET_HashCode));
1327   get_handle->seen_results_end += num_results;
1328   queue_filter_messages (get_handle);
1329   process_pending_messages (get_handle->dht_handle);
1330 }
1331
1332
1333 /**
1334  * Stop async DHT-get.
1335  *
1336  * @param get_handle handle to the GET operation to stop
1337  */
1338 void
1339 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1340 {
1341   struct GNUNET_DHT_Handle *handle;
1342   const struct GNUNET_DHT_ClientGetMessage *get_msg;
1343   struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
1344   struct PendingMessage *pending;
1345
1346   handle = get_handle->message->handle;
1347   get_msg =
1348       (const struct GNUNET_DHT_ClientGetMessage *) get_handle->message->msg;
1349   LOG (GNUNET_ERROR_TYPE_DEBUG,
1350        "Sending STOP for %s to DHT via %p\n",
1351        GNUNET_h2s (&get_msg->key), handle);
1352   /* generate STOP */
1353   pending =
1354       GNUNET_malloc (sizeof (struct PendingMessage) +
1355                      sizeof (struct GNUNET_DHT_ClientGetStopMessage));
1356   stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
1357   pending->msg = &stop_msg->header;
1358   pending->handle = handle;
1359   pending->free_on_send = GNUNET_YES;
1360   stop_msg->header.size =
1361       htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
1362   stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
1363   stop_msg->reserved = htonl (0);
1364   stop_msg->unique_id = get_msg->unique_id;
1365   stop_msg->key = get_msg->key;
1366   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1367                                pending);
1368   pending->in_pending_queue = GNUNET_YES;
1369
1370   /* remove 'GET' from active status */
1371   GNUNET_assert (GNUNET_YES ==
1372                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
1373                                                        &get_handle->key,
1374                                                        get_handle));
1375   if (GNUNET_YES == get_handle->message->in_pending_queue)
1376   {
1377     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
1378                                  get_handle->message);
1379     get_handle->message->in_pending_queue = GNUNET_NO;
1380   }
1381   GNUNET_free (get_handle->message);
1382   GNUNET_array_grow (get_handle->seen_results,
1383                      get_handle->seen_results_end,
1384                      0);
1385   GNUNET_free (get_handle);
1386   process_pending_messages (handle);
1387 }
1388
1389
1390 /**
1391  * Start monitoring the local DHT service.
1392  *
1393  * @param handle Handle to the DHT service.
1394  * @param type Type of blocks that are of interest.
1395  * @param key Key of data of interest, NULL for all.
1396  * @param get_cb Callback to process monitored get messages.
1397  * @param get_resp_cb Callback to process monitored get response messages.
1398  * @param put_cb Callback to process monitored put messages.
1399  * @param cb_cls Closure for callbacks.
1400  * @return Handle to stop monitoring.
1401  */
1402 struct GNUNET_DHT_MonitorHandle *
1403 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
1404                           enum GNUNET_BLOCK_Type type,
1405                           const struct GNUNET_HashCode *key,
1406                           GNUNET_DHT_MonitorGetCB get_cb,
1407                           GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1408                           GNUNET_DHT_MonitorPutCB put_cb,
1409                           void *cb_cls)
1410 {
1411   struct GNUNET_DHT_MonitorHandle *h;
1412   struct GNUNET_DHT_MonitorStartStopMessage *m;
1413   struct PendingMessage *pending;
1414
1415   h = GNUNET_new (struct GNUNET_DHT_MonitorHandle);
1416   GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
1417
1418   h->get_cb = get_cb;
1419   h->get_resp_cb = get_resp_cb;
1420   h->put_cb = put_cb;
1421   h->cb_cls = cb_cls;
1422   h->type = type;
1423   h->dht_handle = handle;
1424   if (NULL != key)
1425   {
1426     h->key = GNUNET_new (struct GNUNET_HashCode);
1427     *h->key = *key;
1428   }
1429
1430   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
1431                            sizeof (struct PendingMessage));
1432   m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
1433   pending->msg = &m->header;
1434   pending->handle = handle;
1435   pending->free_on_send = GNUNET_YES;
1436   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
1437   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
1438   m->type = htonl(type);
1439   m->get = htons(NULL != get_cb);
1440   m->get_resp = htons(NULL != get_resp_cb);
1441   m->put = htons(NULL != put_cb);
1442   if (NULL != key) {
1443     m->filter_key = htons(1);
1444     memcpy (&m->key, key, sizeof(struct GNUNET_HashCode));
1445   }
1446   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1447                                pending);
1448   pending->in_pending_queue = GNUNET_YES;
1449   process_pending_messages (handle);
1450
1451   return h;
1452 }
1453
1454
1455 /**
1456  * Stop monitoring.
1457  *
1458  * @param handle The handle to the monitor request returned by monitor_start.
1459  *
1460  * On return get_handle will no longer be valid, caller must not use again!!!
1461  */
1462 void
1463 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle)
1464 {
1465   struct GNUNET_DHT_MonitorStartStopMessage *m;
1466   struct PendingMessage *pending;
1467
1468   GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head,
1469                                handle->dht_handle->monitor_tail,
1470                                handle);
1471
1472   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
1473                            sizeof (struct PendingMessage));
1474   m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
1475   pending->msg = &m->header;
1476   pending->handle = handle->dht_handle;
1477   pending->free_on_send = GNUNET_YES;
1478   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
1479   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
1480   m->type = htonl(handle->type);
1481   m->get = htons (NULL != handle->get_cb);
1482   m->get_resp = htons(NULL != handle->get_resp_cb);
1483   m->put = htons (NULL != handle->put_cb);
1484   if (NULL != handle->key)
1485   {
1486     m->filter_key = htons (1);
1487     m->key = *handle->key;
1488   }
1489   GNUNET_CONTAINER_DLL_insert (handle->dht_handle->pending_head,
1490                                handle->dht_handle->pending_tail,
1491                                pending);
1492   pending->in_pending_queue = GNUNET_YES;
1493   process_pending_messages (handle->dht_handle);
1494
1495   GNUNET_free_non_null (handle->key);
1496   GNUNET_free (handle);
1497 }
1498
1499 /* end of dht_api.c */