More W32 resolver workarounds
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
38
39 /**
40  * Entry in our list of messages to be (re-)transmitted.
41  */
42 struct PendingMessage
43 {
44   /**
45    * This is a doubly-linked list.
46    */
47   struct PendingMessage *prev;
48
49   /**
50    * This is a doubly-linked list.
51    */
52   struct PendingMessage *next;
53
54   /**
55    * Message that is pending, allocated at the end
56    * of this struct.
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60   /**
61    * Handle to the DHT API context.
62    */
63   struct GNUNET_DHT_Handle *handle;
64
65   /**
66    * Continuation to call when the request has been
67    * transmitted (for the first time) to the service; can be NULL.
68    */
69   GNUNET_SCHEDULER_Task cont;
70
71   /**
72    * Closure for 'cont'.
73    */
74   void *cont_cls;
75
76   /**
77    * Unique ID for this request
78    */
79   uint64_t unique_id;
80
81   /**
82    * Free the saved message once sent, set to GNUNET_YES for messages
83    * that do not receive responses; GNUNET_NO if this pending message
84    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
85    * from there.
86    */
87   int free_on_send;
88
89   /**
90    * GNUNET_YES if this message is in our pending queue right now.
91    */
92   int in_pending_queue;
93
94 };
95
96
97 /**
98  * Handle to a PUT request.
99  */
100 struct GNUNET_DHT_PutHandle
101 {
102   /**
103    * Kept in a DLL.
104    */
105   struct GNUNET_DHT_PutHandle *next;
106
107   /**
108    * Kept in a DLL.
109    */
110   struct GNUNET_DHT_PutHandle *prev;
111
112   /**
113    * Continuation to call when done.
114    */
115   GNUNET_DHT_PutContinuation cont;
116
117   /**
118    * Pending message associated with this PUT operation,
119    * NULL after the message has been transmitted to the service.
120    */
121   struct PendingMessage *pending;
122
123   /**
124    * Main handle to this DHT api
125    */
126   struct GNUNET_DHT_Handle *dht_handle;
127
128   /**
129    * Closure for 'cont'.
130    */
131   void *cont_cls;
132
133   /**
134    * Timeout task for this operation.
135    */
136   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
137
138   /**
139    * Unique ID for the PUT operation.
140    */
141   uint64_t unique_id;
142
143 };
144
145
146
147 /**
148  * Handle to a GET request
149  */
150 struct GNUNET_DHT_GetHandle
151 {
152
153   /**
154    * Iterator to call on data receipt
155    */
156   GNUNET_DHT_GetIterator iter;
157
158   /**
159    * Closure for the iterator callback
160    */
161   void *iter_cls;
162
163   /**
164    * Main handle to this DHT api
165    */
166   struct GNUNET_DHT_Handle *dht_handle;
167
168   /**
169    * The actual message sent for this request,
170    * used for retransmitting requests on service
171    * failure/reconnect.  Freed on route_stop.
172    */
173   struct PendingMessage *message;
174
175   /**
176    * Array of hash codes over the results that we have already
177    * seen.
178    */
179   struct GNUNET_HashCode *seen_results;
180
181   /**
182    * Key that this get request is for
183    */
184   struct GNUNET_HashCode key;
185
186   /**
187    * Unique identifier for this request (for key collisions).
188    */
189   uint64_t unique_id;
190
191   /**
192    * Size of the 'seen_results' array.  Note that not
193    * all positions might be used (as we over-allocate).
194    */
195   unsigned int seen_results_size;
196
197   /**
198    * Offset into the 'seen_results' array marking the
199    * end of the positions that are actually used.
200    */
201   unsigned int seen_results_end;
202
203   /**
204    * Offset into the 'seen_results' array marking the
205    * position up to where we've send the hash codes to
206    * the DHT for blocking (needed as we might not be
207    * able to send all hash codes at once).
208    */
209   unsigned int seen_results_transmission_offset;
210
211
212 };
213
214
215 /**
216  * Handle to a monitoring request.
217  */
218 struct GNUNET_DHT_MonitorHandle
219 {
220   /**
221    * DLL.
222    */
223   struct GNUNET_DHT_MonitorHandle *next;
224
225   /**
226    * DLL.
227    */
228   struct GNUNET_DHT_MonitorHandle *prev;
229
230   /**
231    * Main handle to this DHT api.
232    */
233   struct GNUNET_DHT_Handle *dht_handle;
234
235   /**
236    * Type of block looked for.
237    */
238   enum GNUNET_BLOCK_Type type;
239
240   /**
241    * Key being looked for, NULL == all.
242    */
243   struct GNUNET_HashCode *key;
244
245   /**
246    * Callback for each received message of type get.
247    */
248   GNUNET_DHT_MonitorGetCB get_cb;
249
250   /**
251    * Callback for each received message of type get response.
252    */
253   GNUNET_DHT_MonitorGetRespCB get_resp_cb;
254
255   /**
256    * Callback for each received message of type put.
257    */
258   GNUNET_DHT_MonitorPutCB put_cb;
259
260   /**
261    * Closure for cb.
262    */
263   void *cb_cls;
264
265 };
266
267
268 /**
269  * Connection to the DHT service.
270  */
271 struct GNUNET_DHT_Handle
272 {
273
274   /**
275    * Configuration to use.
276    */
277   const struct GNUNET_CONFIGURATION_Handle *cfg;
278
279   /**
280    * Socket (if available).
281    */
282   struct GNUNET_CLIENT_Connection *client;
283
284   /**
285    * Currently pending transmission request (or NULL).
286    */
287   struct GNUNET_CLIENT_TransmitHandle *th;
288
289   /**
290    * Head of linked list of messages we would like to transmit.
291    */
292   struct PendingMessage *pending_head;
293
294   /**
295    * Tail of linked list of messages we would like to transmit.
296    */
297   struct PendingMessage *pending_tail;
298
299   /**
300    * Head of linked list of messages we would like to monitor.
301    */
302   struct GNUNET_DHT_MonitorHandle *monitor_head;
303
304   /**
305    * Tail of linked list of messages we would like to monitor.
306    */
307   struct GNUNET_DHT_MonitorHandle *monitor_tail;
308
309   /**
310    * Head of active PUT requests.
311    */
312   struct GNUNET_DHT_PutHandle *put_head;
313
314   /**
315    * Tail of active PUT requests.
316    */
317   struct GNUNET_DHT_PutHandle *put_tail;
318
319   /**
320    * Hash map containing the current outstanding unique GET requests
321    * (values are of type `struct GNUNET_DHT_GetHandle`).
322    */
323   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
324
325   /**
326    * Task for trying to reconnect.
327    */
328   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
329
330   /**
331    * How quickly should we retry?  Used for exponential back-off on
332    * connect-errors.
333    */
334   struct GNUNET_TIME_Relative retry_time;
335
336   /**
337    * Generator for unique ids.
338    */
339   uint64_t uid_gen;
340
341   /**
342    * Did we start our receive loop yet?
343    */
344   int in_receive;
345 };
346
347
348 /**
349  * Handler for messages received from the DHT service
350  * a demultiplexer which handles numerous message types
351  *
352  * @param cls the `struct GNUNET_DHT_Handle`
353  * @param msg the incoming message
354  */
355 static void
356 service_message_handler (void *cls,
357                          const struct GNUNET_MessageHeader *msg);
358
359
360 /**
361  * Try to (re)connect to the DHT service.
362  *
363  * @param handle DHT handle to reconnect
364  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
365  */
366 static int
367 try_connect (struct GNUNET_DHT_Handle *handle)
368 {
369   if (NULL != handle->client)
370     return GNUNET_OK;
371   handle->in_receive = GNUNET_NO;
372   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
373   if (NULL == handle->client)
374   {
375     LOG (GNUNET_ERROR_TYPE_WARNING,
376          _("Failed to connect to the DHT service!\n"));
377     return GNUNET_NO;
378   }
379   return GNUNET_YES;
380 }
381
382
383 /**
384  * Queue messages to DHT to block certain results from the result set.
385  *
386  * @param get_handle GET to generate messages for.
387  */
388 static void
389 queue_filter_messages (struct GNUNET_DHT_GetHandle *get_handle)
390 {
391   struct PendingMessage *pm;
392   struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
393   uint16_t msize;
394   unsigned int delta;
395   unsigned int max;
396
397   while (get_handle->seen_results_transmission_offset < get_handle->seen_results_end)
398   {
399     delta = get_handle->seen_results_end - get_handle->seen_results_transmission_offset;
400     max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
401     if (delta > max)
402       delta = max;
403     msize = sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + delta * sizeof (struct GNUNET_HashCode);
404
405     pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
406     msg = (struct GNUNET_DHT_ClientGetResultSeenMessage *) &pm[1];
407     pm->msg = &msg->header;
408     pm->handle = get_handle->dht_handle;
409     pm->unique_id = get_handle->unique_id;
410     pm->free_on_send = GNUNET_YES;
411     pm->in_pending_queue = GNUNET_YES;
412     msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
413     msg->header.size = htons (msize);
414     msg->key = get_handle->key;
415     msg->unique_id = get_handle->unique_id;
416     memcpy (&msg[1],
417             &get_handle->seen_results[get_handle->seen_results_transmission_offset],
418             sizeof (struct GNUNET_HashCode) * delta);
419     get_handle->seen_results_transmission_offset += delta;
420     GNUNET_CONTAINER_DLL_insert_tail (get_handle->dht_handle->pending_head,
421                                       get_handle->dht_handle->pending_tail,
422                                       pm);
423   }
424 }
425
426
427 /**
428  * Add the request corresponding to the given route handle
429  * to the pending queue (if it is not already in there).
430  *
431  * @param cls the `struct GNUNET_DHT_Handle *`
432  * @param key key for the request (not used)
433  * @param value the `struct GNUNET_DHT_GetHandle *`
434  * @return #GNUNET_YES (always)
435  */
436 static int
437 add_request_to_pending (void *cls,
438                         const struct GNUNET_HashCode *key,
439                         void *value)
440 {
441   struct GNUNET_DHT_Handle *handle = cls;
442   struct GNUNET_DHT_GetHandle *get_handle = value;
443
444   if (GNUNET_NO == get_handle->message->in_pending_queue)
445   {
446     LOG (GNUNET_ERROR_TYPE_DEBUG,
447          "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
448          handle);
449     get_handle->seen_results_transmission_offset = 0;
450     GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
451                                  get_handle->message);
452     queue_filter_messages (get_handle);
453     get_handle->message->in_pending_queue = GNUNET_YES;
454   }
455   return GNUNET_YES;
456 }
457
458
459 /**
460  * Try to send messages from list of messages to send
461  *
462  * @param handle DHT_Handle
463  */
464 static void
465 process_pending_messages (struct GNUNET_DHT_Handle *handle);
466
467
468 /**
469  * Try reconnecting to the dht service.
470  *
471  * @param cls a `struct GNUNET_DHT_Handle`
472  * @param tc scheduler context
473  */
474 static void
475 try_reconnect (void *cls,
476                const struct GNUNET_SCHEDULER_TaskContext *tc)
477 {
478   struct GNUNET_DHT_Handle *handle = cls;
479
480   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with DHT %p\n", handle);
481   handle->retry_time = GNUNET_TIME_STD_BACKOFF (handle->retry_time);
482   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
483   if (GNUNET_YES != try_connect (handle))
484   {
485     LOG (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
486     return;
487   }
488   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
489                                          &add_request_to_pending, handle);
490   process_pending_messages (handle);
491 }
492
493
494 /**
495  * Try reconnecting to the DHT service.
496  *
497  * @param handle handle to dht to (possibly) disconnect and reconnect
498  */
499 static void
500 do_disconnect (struct GNUNET_DHT_Handle *handle)
501 {
502   struct GNUNET_DHT_PutHandle *ph;
503   struct GNUNET_DHT_PutHandle *next;
504
505   if (NULL == handle->client)
506     return;
507   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task);
508   if (NULL != handle->th)
509     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
510   handle->th = NULL;
511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
512               "Disconnecting from DHT service, will try to reconnect in %s\n",
513               GNUNET_STRINGS_relative_time_to_string (handle->retry_time,
514                                                       GNUNET_YES));
515   GNUNET_CLIENT_disconnect (handle->client);
516   handle->client = NULL;
517
518   /* signal disconnect to all PUT requests that were transmitted but waiting
519      for the put confirmation */
520   next = handle->put_head;
521   while (NULL != (ph = next))
522   {
523     next = ph->next;
524     if (NULL == ph->pending)
525     {
526       if (NULL != ph->cont)
527         ph->cont (ph->cont_cls, GNUNET_SYSERR);
528       GNUNET_DHT_put_cancel (ph);
529     }
530   }
531   handle->reconnect_task =
532       GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
533 }
534
535
536 /**
537  * Transmit the next pending message, called by notify_transmit_ready
538  *
539  * @param cls the DHT handle
540  * @param size number of bytes available in @a buf for transmission
541  * @param buf where to copy messages for the service
542  * @return number of bytes written to @a buf
543  */
544 static size_t
545 transmit_pending (void *cls,
546                   size_t size,
547                   void *buf);
548
549
550 /**
551  * Try to send messages from list of messages to send
552  *
553  * @param handle handle to DHT
554  */
555 static void
556 process_pending_messages (struct GNUNET_DHT_Handle *handle)
557 {
558   struct PendingMessage *head;
559
560   if (NULL == handle->client)
561   {
562     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
563                 "process_pending_messages called, but client is NULL, reconnecting\n");
564     do_disconnect (handle);
565     return;
566   }
567   if (NULL != handle->th)
568     return;
569   if (NULL == (head = handle->pending_head))
570     return;
571   handle->th =
572       GNUNET_CLIENT_notify_transmit_ready (handle->client,
573                                            ntohs (head->msg->size),
574                                            GNUNET_TIME_UNIT_FOREVER_REL,
575                                            GNUNET_YES, &transmit_pending,
576                                            handle);
577   if (NULL != handle->th)
578     return;
579   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
580               "notify_transmit_ready returned NULL, reconnecting\n");
581   do_disconnect (handle);
582 }
583
584
585 /**
586  * Transmit the next pending message, called by notify_transmit_ready
587  *
588  * @param cls the DHT handle
589  * @param size number of bytes available in @a buf for transmission
590  * @param buf where to copy messages for the service
591  * @return number of bytes written to @a buf
592  */
593 static size_t
594 transmit_pending (void *cls,
595                   size_t size,
596                   void *buf)
597 {
598   struct GNUNET_DHT_Handle *handle = cls;
599   struct PendingMessage *head;
600   size_t tsize;
601
602   handle->th = NULL;
603   if (NULL == buf)
604   {
605     LOG (GNUNET_ERROR_TYPE_DEBUG,
606          "Transmission to DHT service failed!  Reconnecting!\n");
607     do_disconnect (handle);
608     return 0;
609   }
610   if (NULL == (head = handle->pending_head))
611     return 0;
612
613   tsize = ntohs (head->msg->size);
614   if (size < tsize)
615   {
616     process_pending_messages (handle);
617     return 0;
618   }
619   memcpy (buf, head->msg, tsize);
620   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
621                                head);
622   head->in_pending_queue = GNUNET_NO;
623   if (NULL != head->cont)
624   {
625     head->cont (head->cont_cls, NULL);
626     head->cont = NULL;
627     head->cont_cls = NULL;
628   }
629   if (GNUNET_YES == head->free_on_send)
630     GNUNET_free (head);
631   process_pending_messages (handle);
632   LOG (GNUNET_ERROR_TYPE_DEBUG,
633        "Forwarded request of %u bytes to DHT service\n", (unsigned int) tsize);
634   if (GNUNET_NO == handle->in_receive)
635   {
636     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n");
637     handle->in_receive = GNUNET_YES;
638     GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
639                            GNUNET_TIME_UNIT_FOREVER_REL);
640   }
641   return tsize;
642 }
643
644
645 /**
646  * Process a given reply that might match the given
647  * request.
648  *
649  * @param cls the `struct GNUNET_DHT_ClientResultMessage`
650  * @param key query of the request
651  * @param value the `struct GNUNET_DHT_RouteHandle` of a request matching the same key
652  * @return #GNUNET_YES to continue to iterate over all results,
653  *         #GNUNET_NO if the reply is malformed or we found a matching request
654  */
655 static int
656 process_reply (void *cls,
657                const struct GNUNET_HashCode *key,
658                void *value)
659 {
660   const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
661   struct GNUNET_DHT_GetHandle *get_handle = value;
662   const struct GNUNET_PeerIdentity *put_path;
663   const struct GNUNET_PeerIdentity *get_path;
664   struct GNUNET_HashCode hc;
665   uint32_t put_path_length;
666   uint32_t get_path_length;
667   size_t data_length;
668   size_t msize;
669   size_t meta_length;
670   const void *data;
671
672   if (dht_msg->unique_id != get_handle->unique_id)
673   {
674     /* UID mismatch */
675     LOG (GNUNET_ERROR_TYPE_DEBUG,
676          "Ignoring reply for %s: UID mismatch: %llu/%llu\n", GNUNET_h2s (key),
677          dht_msg->unique_id, get_handle->unique_id);
678     return GNUNET_YES;
679   }
680   msize = ntohs (dht_msg->header.size);
681   put_path_length = ntohl (dht_msg->put_path_length);
682   get_path_length = ntohl (dht_msg->get_path_length);
683   meta_length =
684       sizeof (struct GNUNET_DHT_ClientResultMessage) +
685       sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
686   if ((msize < meta_length) ||
687       (get_path_length >
688        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
689       (put_path_length >
690        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
691   {
692     GNUNET_break (0);
693     return GNUNET_NO;
694   }
695   data_length = msize - meta_length;
696   LOG (GNUNET_ERROR_TYPE_DEBUG, "Giving %u byte reply for %s to application\n",
697        (unsigned int) data_length, GNUNET_h2s (key));
698   put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
699   get_path = &put_path[put_path_length];
700   data = &get_path[get_path_length];
701   /* remember that we've seen this result */
702   GNUNET_CRYPTO_hash (data, data_length, &hc);
703   if (get_handle->seen_results_size == get_handle->seen_results_end)
704     GNUNET_array_grow (get_handle->seen_results,
705                        get_handle->seen_results_size,
706                        get_handle->seen_results_size * 2 + 1);
707   GNUNET_assert (get_handle->seen_results_end == get_handle->seen_results_transmission_offset);
708   get_handle->seen_results[get_handle->seen_results_end++] = hc;
709   /* no need to block it explicitly, service already knows about it! */
710   get_handle->seen_results_transmission_offset++;
711   get_handle->iter (get_handle->iter_cls,
712                     GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
713                     get_path, get_path_length, put_path, put_path_length,
714                     ntohl (dht_msg->type), data_length, data);
715   return GNUNET_NO;
716 }
717
718
719 /**
720  * Process a get monitor message from the service.
721  *
722  * @param handle The DHT handle.
723  * @param msg Monitor get message from the service.
724  * @return #GNUNET_OK if everything went fine,
725  *         #GNUNET_SYSERR if the message is malformed.
726  */
727 static int
728 process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
729                              const struct GNUNET_DHT_MonitorGetMessage *msg)
730 {
731   struct GNUNET_DHT_MonitorHandle *h;
732
733   for (h = handle->monitor_head; NULL != h; h = h->next)
734   {
735     int type_ok;
736     int key_ok;
737
738     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
739     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
740                                                sizeof (struct GNUNET_HashCode)));
741     if (type_ok && key_ok && (NULL != h->get_cb))
742       h->get_cb (h->cb_cls,
743                  ntohl (msg->options),
744                  (enum GNUNET_BLOCK_Type) ntohl(msg->type),
745                  ntohl (msg->hop_count),
746                  ntohl (msg->desired_replication_level),
747                  ntohl (msg->get_path_length),
748                  (struct GNUNET_PeerIdentity *) &msg[1],
749                  &msg->key);
750   }
751   return GNUNET_OK;
752 }
753
754
755 /**
756  * Process a get response monitor message from the service.
757  *
758  * @param handle The DHT handle.
759  * @param msg monitor get response message from the service
760  * @return #GNUNET_OK if everything went fine,
761  *         #GNUNET_SYSERR if the message is malformed.
762  */
763 static int
764 process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
765                                   const struct GNUNET_DHT_MonitorGetRespMessage *msg)
766 {
767   struct GNUNET_DHT_MonitorHandle *h;
768   struct GNUNET_PeerIdentity *path;
769   uint32_t getl;
770   uint32_t putl;
771   size_t msize;
772
773   msize = ntohs (msg->header.size);
774   path = (struct GNUNET_PeerIdentity *) &msg[1];
775   getl = ntohl (msg->get_path_length);
776   putl = ntohl (msg->put_path_length);
777   if ( (getl + putl < getl) ||
778        ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) )
779   {
780     GNUNET_break (0);
781     return GNUNET_SYSERR;
782   }
783   for (h = handle->monitor_head; NULL != h; h = h->next)
784   {
785     int type_ok;
786     int key_ok;
787
788     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
789     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
790                                                sizeof (struct GNUNET_HashCode)));
791     if (type_ok && key_ok && (NULL != h->get_resp_cb))
792       h->get_resp_cb (h->cb_cls,
793                       (enum GNUNET_BLOCK_Type) ntohl(msg->type),
794                       path, getl,
795                       &path[getl], putl,
796                       GNUNET_TIME_absolute_ntoh(msg->expiration_time),
797                       &msg->key,
798                       (void *) &path[getl + putl],
799                       msize -
800                       sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
801                       sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
802   }
803   return GNUNET_OK;
804 }
805
806
807 /**
808  * Process a put monitor message from the service.
809  *
810  * @param handle The DHT handle.
811  * @param msg Monitor put message from the service.
812  * @return #GNUNET_OK if everything went fine,
813  *         #GNUNET_SYSERR if the message is malformed.
814  */
815 static int
816 process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
817                              const struct GNUNET_DHT_MonitorPutMessage *msg)
818 {
819   struct GNUNET_DHT_MonitorHandle *h;
820   size_t msize;
821   struct GNUNET_PeerIdentity *path;
822   uint32_t putl;
823
824   msize = ntohs (msg->header.size);
825   path = (struct GNUNET_PeerIdentity *) &msg[1];
826   putl = ntohl (msg->put_path_length);
827   if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl)
828   {
829     GNUNET_break (0);
830     return GNUNET_SYSERR;
831   }
832   for (h = handle->monitor_head; NULL != h; h = h->next)
833   {
834     int type_ok;
835     int key_ok;
836
837     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
838     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
839                                                sizeof (struct GNUNET_HashCode)));
840     if (type_ok && key_ok && (NULL != h->put_cb))
841       h->put_cb (h->cb_cls,
842                  ntohl (msg->options),
843                  (enum GNUNET_BLOCK_Type) ntohl(msg->type),
844                  ntohl (msg->hop_count),
845                  ntohl (msg->desired_replication_level),
846                  putl, path,
847                  GNUNET_TIME_absolute_ntoh(msg->expiration_time),
848                  &msg->key,
849                  (void *) &path[putl],
850                  msize -
851                  sizeof (struct GNUNET_DHT_MonitorPutMessage) -
852                  sizeof (struct GNUNET_PeerIdentity) * putl);
853   }
854   return GNUNET_OK;
855 }
856
857
858 /**
859  * Process a put confirmation message from the service.
860  *
861  * @param handle The DHT handle.
862  * @param msg confirmation message from the service.
863  * @return #GNUNET_OK if everything went fine,
864  *         #GNUNET_SYSERR if the message is malformed.
865  */
866 static int
867 process_put_confirmation_message (struct GNUNET_DHT_Handle *handle,
868                                   const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
869 {
870   struct GNUNET_DHT_PutHandle *ph;
871   GNUNET_DHT_PutContinuation cont;
872   void *cont_cls;
873
874   for (ph = handle->put_head; NULL != ph; ph = ph->next)
875     if (ph->unique_id == msg->unique_id)
876       break;
877   if (NULL == ph)
878     return GNUNET_OK;
879   cont = ph->cont;
880   cont_cls = ph->cont_cls;
881   GNUNET_DHT_put_cancel (ph);
882   if (NULL != cont)
883     cont (cont_cls, GNUNET_OK);
884   return GNUNET_OK;
885 }
886
887
888 /**
889  * Handler for messages received from the DHT service
890  * a demultiplexer which handles numerous message types
891  *
892  * @param cls the `struct GNUNET_DHT_Handle`
893  * @param msg the incoming message
894  */
895 static void
896 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
897 {
898   struct GNUNET_DHT_Handle *handle = cls;
899   const struct GNUNET_DHT_ClientResultMessage *dht_msg;
900   uint16_t msize;
901   int ret;
902
903   if (NULL == msg)
904   {
905     LOG (GNUNET_ERROR_TYPE_DEBUG,
906          "Error receiving data from DHT service, reconnecting\n");
907     do_disconnect (handle);
908     return;
909   }
910   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
911                          GNUNET_TIME_UNIT_FOREVER_REL);
912   ret = GNUNET_SYSERR;
913   msize = ntohs (msg->size);
914   switch (ntohs (msg->type))
915   {
916   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
917     if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
918     {
919       GNUNET_break (0);
920       break;
921     }
922     ret = process_monitor_get_message(handle,
923                                       (const struct GNUNET_DHT_MonitorGetMessage *) msg);
924     break;
925   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
926     if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
927     {
928       GNUNET_break (0);
929       break;
930     }
931     ret = process_monitor_get_resp_message(handle,
932                                            (const struct GNUNET_DHT_MonitorGetRespMessage *) msg);
933     break;
934   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
935     if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
936     {
937       GNUNET_break (0);
938       break;
939     }
940     ret = process_monitor_put_message(handle,
941                                       (const struct GNUNET_DHT_MonitorPutMessage *) msg);
942     break;
943   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
944     /* Not implemented yet */
945     GNUNET_break(0);
946     break;
947   case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT:
948     if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
949     {
950       GNUNET_break (0);
951       break;
952     }
953     dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
954     LOG (GNUNET_ERROR_TYPE_DEBUG,
955          "Received reply for `%s' from DHT service %p\n",
956          GNUNET_h2s (&dht_msg->key), handle);
957     GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
958                                                 &dht_msg->key,
959                                                 &process_reply,
960                                                 (void *) dht_msg);
961     ret = GNUNET_OK;
962     break;
963   case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK:
964     if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage))
965     {
966       GNUNET_break (0);
967       break;
968     }
969     ret = process_put_confirmation_message (handle,
970                                             (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg);
971     break;
972   default:
973     GNUNET_break(0);
974     LOG (GNUNET_ERROR_TYPE_WARNING,
975          "Unknown DHT message type: %hu (%hu) size: %hu\n",
976          ntohs (msg->type), msg->type, msize);
977     break;
978   }
979   if (GNUNET_OK != ret)
980   {
981     GNUNET_break (0);
982     do_disconnect (handle);
983     return;
984   }
985 }
986
987
988 /**
989  * Initialize the connection with the DHT service.
990  *
991  * @param cfg configuration to use
992  * @param ht_len size of the internal hash table to use for
993  *               processing multiple GET/FIND requests in parallel
994  * @return handle to the DHT service, or NULL on error
995  */
996 struct GNUNET_DHT_Handle *
997 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
998                     unsigned int ht_len)
999 {
1000   struct GNUNET_DHT_Handle *handle;
1001
1002   handle = GNUNET_new (struct GNUNET_DHT_Handle);
1003   handle->cfg = cfg;
1004   handle->uid_gen =
1005       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
1006   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len, GNUNET_YES);
1007   if (GNUNET_NO == try_connect (handle))
1008   {
1009     GNUNET_DHT_disconnect (handle);
1010     return NULL;
1011   }
1012   return handle;
1013 }
1014
1015
1016 /**
1017  * Shutdown connection with the DHT service.
1018  *
1019  * @param handle handle of the DHT connection to stop
1020  */
1021 void
1022 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
1023 {
1024   struct PendingMessage *pm;
1025   struct GNUNET_DHT_PutHandle *ph;
1026
1027   GNUNET_assert (NULL != handle);
1028   GNUNET_assert (0 ==
1029                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
1030   if (NULL != handle->th)
1031   {
1032     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
1033     handle->th = NULL;
1034   }
1035   while (NULL != (pm = handle->pending_head))
1036   {
1037     GNUNET_assert (GNUNET_YES == pm->in_pending_queue);
1038     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
1039                                  pm);
1040     pm->in_pending_queue = GNUNET_NO;
1041     GNUNET_assert (GNUNET_YES == pm->free_on_send);
1042     if (NULL != pm->cont)
1043       pm->cont (pm->cont_cls, NULL);
1044     GNUNET_free (pm);
1045   }
1046   while (NULL != (ph = handle->put_head))
1047   {
1048     GNUNET_break (NULL == ph->pending);
1049     if (NULL != ph->cont)
1050       ph->cont (ph->cont_cls, GNUNET_SYSERR);
1051     GNUNET_DHT_put_cancel (ph);
1052   }
1053
1054   if (NULL != handle->client)
1055   {
1056     GNUNET_CLIENT_disconnect (handle->client);
1057     handle->client = NULL;
1058   }
1059   if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
1060     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1061   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
1062   GNUNET_free (handle);
1063 }
1064
1065
1066 /**
1067  * Timeout for the transmission of a fire&forget-request.  Clean it up.
1068  *
1069  * @param cls the `struct GNUNET_DHT_PutHandle *`
1070  * @param tc scheduler context
1071  */
1072 static void
1073 timeout_put_request (void *cls,
1074                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1075 {
1076   struct GNUNET_DHT_PutHandle *ph = cls;
1077   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1078
1079   ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1080   if (NULL != ph->pending)
1081   {
1082     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
1083                                  ph->pending);
1084     ph->pending->in_pending_queue = GNUNET_NO;
1085     GNUNET_free (ph->pending);
1086   }
1087   if (NULL != ph->cont)
1088     ph->cont (ph->cont_cls, GNUNET_NO);
1089   GNUNET_CONTAINER_DLL_remove (handle->put_head,
1090                                handle->put_tail,
1091                                ph);
1092   GNUNET_free (ph);
1093 }
1094
1095
1096 /**
1097  * Function called whenever the PUT message leaves the queue.  Sets
1098  * the message pointer in the put handle to NULL.
1099  *
1100  * @param cls the `struct GNUNET_DHT_PutHandle`
1101  * @param tc unused
1102  */
1103 static void
1104 mark_put_message_gone (void *cls,
1105                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1106 {
1107   struct GNUNET_DHT_PutHandle *ph = cls;
1108
1109   ph->pending = NULL;
1110 }
1111
1112
1113 /**
1114  * Perform a PUT operation storing data in the DHT.  FIXME: we should
1115  * change the protocol to get a confirmation for the PUT from the DHT
1116  * and call 'cont' only after getting the confirmation; otherwise, the
1117  * client has no good way of telling if the 'PUT' message actually got
1118  * to the DHT service!
1119  *
1120  * @param handle handle to DHT service
1121  * @param key the key to store under
1122  * @param desired_replication_level estimate of how many
1123  *                nearest peers this request should reach
1124  * @param options routing options for this message
1125  * @param type type of the value
1126  * @param size number of bytes in data; must be less than 64k
1127  * @param data the data to store
1128  * @param exp desired expiration time for the value
1129  * @param timeout how long to wait for transmission of this request
1130  * @param cont continuation to call when done (transmitting request to service)
1131  *        You must not call #GNUNET_DHT_disconnect in this continuation
1132  * @param cont_cls closure for @a cont
1133  */
1134 struct GNUNET_DHT_PutHandle *
1135 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1136                 const struct GNUNET_HashCode * key,
1137                 uint32_t desired_replication_level,
1138                 enum GNUNET_DHT_RouteOption options,
1139                 enum GNUNET_BLOCK_Type type, size_t size,
1140                 const void *data,
1141                 struct GNUNET_TIME_Absolute exp,
1142                 struct GNUNET_TIME_Relative timeout,
1143                 GNUNET_DHT_PutContinuation cont,
1144                 void *cont_cls)
1145 {
1146   struct GNUNET_DHT_ClientPutMessage *put_msg;
1147   size_t msize;
1148   struct PendingMessage *pending;
1149   struct GNUNET_DHT_PutHandle *ph;
1150
1151   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
1152   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1153       (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
1154   {
1155     GNUNET_break (0);
1156     return NULL;
1157   }
1158   ph = GNUNET_new (struct GNUNET_DHT_PutHandle);
1159   ph->dht_handle = handle;
1160   ph->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, ph);
1161   ph->cont = cont;
1162   ph->cont_cls = cont_cls;
1163   ph->unique_id = ++handle->uid_gen;
1164   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1165   ph->pending = pending;
1166   put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
1167   pending->msg = &put_msg->header;
1168   pending->handle = handle;
1169   pending->cont = &mark_put_message_gone;
1170   pending->cont_cls = ph;
1171   pending->free_on_send = GNUNET_YES;
1172   put_msg->header.size = htons (msize);
1173   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
1174   put_msg->type = htonl (type);
1175   put_msg->options = htonl ((uint32_t) options);
1176   put_msg->desired_replication_level = htonl (desired_replication_level);
1177   put_msg->unique_id = ph->unique_id;
1178   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
1179   put_msg->key = *key;
1180   memcpy (&put_msg[1], data, size);
1181   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1182                                pending);
1183   pending->in_pending_queue = GNUNET_YES;
1184   GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1185                                     handle->put_tail,
1186                                     ph);
1187   process_pending_messages (handle);
1188   return ph;
1189 }
1190
1191
1192 /**
1193  * Cancels a DHT PUT operation.  Note that the PUT request may still
1194  * go out over the network (we can't stop that); However, if the PUT
1195  * has not yet been sent to the service, cancelling the PUT will stop
1196  * this from happening (but there is no way for the user of this API
1197  * to tell if that is the case).  The only use for this API is to
1198  * prevent a later call to 'cont' from #GNUNET_DHT_put (i.e. because
1199  * the system is shutting down).
1200  *
1201  * @param ph put operation to cancel ('cont' will no longer be called)
1202  */
1203 void
1204 GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1205 {
1206   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1207
1208   if (NULL != ph->pending)
1209   {
1210     GNUNET_CONTAINER_DLL_remove (handle->pending_head,
1211                                  handle->pending_tail,
1212                                  ph->pending);
1213     GNUNET_free (ph->pending);
1214     ph->pending = NULL;
1215   }
1216   if (ph->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1217   {
1218     GNUNET_SCHEDULER_cancel (ph->timeout_task);
1219     ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1220   }
1221   GNUNET_CONTAINER_DLL_remove (handle->put_head,
1222                                handle->put_tail,
1223                                ph);
1224   GNUNET_free (ph);
1225 }
1226
1227
1228 /**
1229  * Perform an asynchronous GET operation on the DHT identified. See
1230  * also #GNUNET_BLOCK_evaluate.
1231  *
1232  * @param handle handle to the DHT service
1233  * @param type expected type of the response object
1234  * @param key the key to look up
1235  * @param desired_replication_level estimate of how many
1236                   nearest peers this request should reach
1237  * @param options routing options for this message
1238  * @param xquery extended query data (can be NULL, depending on type)
1239  * @param xquery_size number of bytes in @a xquery
1240  * @param iter function to call on each result
1241  * @param iter_cls closure for iter
1242  * @return handle to stop the async get
1243  */
1244 struct GNUNET_DHT_GetHandle *
1245 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1246                       enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode * key,
1247                       uint32_t desired_replication_level,
1248                       enum GNUNET_DHT_RouteOption options, const void *xquery,
1249                       size_t xquery_size, GNUNET_DHT_GetIterator iter,
1250                       void *iter_cls)
1251 {
1252   struct GNUNET_DHT_ClientGetMessage *get_msg;
1253   struct GNUNET_DHT_GetHandle *get_handle;
1254   size_t msize;
1255   struct PendingMessage *pending;
1256
1257   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
1258   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1259       (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
1260   {
1261     GNUNET_break (0);
1262     return NULL;
1263   }
1264   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending query for %s to DHT %p\n",
1265        GNUNET_h2s (key), handle);
1266   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1267   get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
1268   pending->msg = &get_msg->header;
1269   pending->handle = handle;
1270   pending->free_on_send = GNUNET_NO;
1271   get_msg->header.size = htons (msize);
1272   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
1273   get_msg->options = htonl ((uint32_t) options);
1274   get_msg->desired_replication_level = htonl (desired_replication_level);
1275   get_msg->type = htonl (type);
1276   get_msg->key = *key;
1277   get_msg->unique_id = ++handle->uid_gen;
1278   memcpy (&get_msg[1], xquery, xquery_size);
1279   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1280                                pending);
1281   pending->in_pending_queue = GNUNET_YES;
1282   get_handle = GNUNET_new (struct GNUNET_DHT_GetHandle);
1283   get_handle->key = *key;
1284   get_handle->dht_handle = handle;
1285   get_handle->iter = iter;
1286   get_handle->iter_cls = iter_cls;
1287   get_handle->message = pending;
1288   get_handle->unique_id = get_msg->unique_id;
1289   GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
1290                                      &get_handle->key,
1291                                      get_handle,
1292                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1293   process_pending_messages (handle);
1294   return get_handle;
1295 }
1296
1297
1298 /**
1299  * Tell the DHT not to return any of the following known results
1300  * to this client.
1301  *
1302  * @param get_handle get operation for which results should be filtered
1303  * @param num_results number of results to be blocked that are
1304  *        provided in this call (size of the @a results array)
1305  * @param results array of hash codes over the 'data' of the results
1306  *        to be blocked
1307  */
1308 void
1309 GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
1310                                      unsigned int num_results,
1311                                      const struct GNUNET_HashCode *results)
1312 {
1313   unsigned int needed;
1314
1315   needed = get_handle->seen_results_end + num_results;
1316   if (needed > get_handle->seen_results_size)
1317     GNUNET_array_grow (get_handle->seen_results,
1318                        get_handle->seen_results_size,
1319                        needed);
1320   memcpy (&get_handle->seen_results[get_handle->seen_results_end],
1321           results,
1322           num_results * sizeof (struct GNUNET_HashCode));
1323   get_handle->seen_results_end += num_results;
1324   queue_filter_messages (get_handle);
1325   process_pending_messages (get_handle->dht_handle);
1326 }
1327
1328
1329 /**
1330  * Stop async DHT-get.
1331  *
1332  * @param get_handle handle to the GET operation to stop
1333  */
1334 void
1335 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1336 {
1337   struct GNUNET_DHT_Handle *handle;
1338   const struct GNUNET_DHT_ClientGetMessage *get_msg;
1339   struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
1340   struct PendingMessage *pending;
1341
1342   handle = get_handle->message->handle;
1343   get_msg =
1344       (const struct GNUNET_DHT_ClientGetMessage *) get_handle->message->msg;
1345   LOG (GNUNET_ERROR_TYPE_DEBUG,
1346        "Sending STOP for %s to DHT via %p\n",
1347        GNUNET_h2s (&get_msg->key), handle);
1348   /* generate STOP */
1349   pending =
1350       GNUNET_malloc (sizeof (struct PendingMessage) +
1351                      sizeof (struct GNUNET_DHT_ClientGetStopMessage));
1352   stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
1353   pending->msg = &stop_msg->header;
1354   pending->handle = handle;
1355   pending->free_on_send = GNUNET_YES;
1356   stop_msg->header.size =
1357       htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
1358   stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
1359   stop_msg->reserved = htonl (0);
1360   stop_msg->unique_id = get_msg->unique_id;
1361   stop_msg->key = get_msg->key;
1362   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1363                                pending);
1364   pending->in_pending_queue = GNUNET_YES;
1365
1366   /* remove 'GET' from active status */
1367   GNUNET_assert (GNUNET_YES ==
1368                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
1369                                                        &get_handle->key,
1370                                                        get_handle));
1371   if (GNUNET_YES == get_handle->message->in_pending_queue)
1372   {
1373     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
1374                                  get_handle->message);
1375     get_handle->message->in_pending_queue = GNUNET_NO;
1376   }
1377   GNUNET_free (get_handle->message);
1378   GNUNET_array_grow (get_handle->seen_results,
1379                      get_handle->seen_results_end,
1380                      0);
1381   GNUNET_free (get_handle);
1382   process_pending_messages (handle);
1383 }
1384
1385
1386 /**
1387  * Start monitoring the local DHT service.
1388  *
1389  * @param handle Handle to the DHT service.
1390  * @param type Type of blocks that are of interest.
1391  * @param key Key of data of interest, NULL for all.
1392  * @param get_cb Callback to process monitored get messages.
1393  * @param get_resp_cb Callback to process monitored get response messages.
1394  * @param put_cb Callback to process monitored put messages.
1395  * @param cb_cls Closure for callbacks.
1396  * @return Handle to stop monitoring.
1397  */
1398 struct GNUNET_DHT_MonitorHandle *
1399 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
1400                           enum GNUNET_BLOCK_Type type,
1401                           const struct GNUNET_HashCode *key,
1402                           GNUNET_DHT_MonitorGetCB get_cb,
1403                           GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1404                           GNUNET_DHT_MonitorPutCB put_cb,
1405                           void *cb_cls)
1406 {
1407   struct GNUNET_DHT_MonitorHandle *h;
1408   struct GNUNET_DHT_MonitorStartStopMessage *m;
1409   struct PendingMessage *pending;
1410
1411   h = GNUNET_new (struct GNUNET_DHT_MonitorHandle);
1412   GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
1413
1414   h->get_cb = get_cb;
1415   h->get_resp_cb = get_resp_cb;
1416   h->put_cb = put_cb;
1417   h->cb_cls = cb_cls;
1418   h->type = type;
1419   h->dht_handle = handle;
1420   if (NULL != key)
1421   {
1422     h->key = GNUNET_new (struct GNUNET_HashCode);
1423     *h->key = *key;
1424   }
1425
1426   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
1427                            sizeof (struct PendingMessage));
1428   m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
1429   pending->msg = &m->header;
1430   pending->handle = handle;
1431   pending->free_on_send = GNUNET_YES;
1432   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
1433   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
1434   m->type = htonl(type);
1435   m->get = htons(NULL != get_cb);
1436   m->get_resp = htons(NULL != get_resp_cb);
1437   m->put = htons(NULL != put_cb);
1438   if (NULL != key) {
1439     m->filter_key = htons(1);
1440     memcpy (&m->key, key, sizeof(struct GNUNET_HashCode));
1441   }
1442   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1443                                pending);
1444   pending->in_pending_queue = GNUNET_YES;
1445   process_pending_messages (handle);
1446
1447   return h;
1448 }
1449
1450
1451 /**
1452  * Stop monitoring.
1453  *
1454  * @param handle The handle to the monitor request returned by monitor_start.
1455  *
1456  * On return get_handle will no longer be valid, caller must not use again!!!
1457  */
1458 void
1459 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle)
1460 {
1461   struct GNUNET_DHT_MonitorStartStopMessage *m;
1462   struct PendingMessage *pending;
1463
1464   GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head,
1465                                handle->dht_handle->monitor_tail,
1466                                handle);
1467
1468   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
1469                            sizeof (struct PendingMessage));
1470   m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
1471   pending->msg = &m->header;
1472   pending->handle = handle->dht_handle;
1473   pending->free_on_send = GNUNET_YES;
1474   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
1475   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
1476   m->type = htonl(handle->type);
1477   m->get = htons (NULL != handle->get_cb);
1478   m->get_resp = htons(NULL != handle->get_resp_cb);
1479   m->put = htons (NULL != handle->put_cb);
1480   if (NULL != handle->key)
1481   {
1482     m->filter_key = htons (1);
1483     m->key = *handle->key;
1484   }
1485   GNUNET_CONTAINER_DLL_insert (handle->dht_handle->pending_head,
1486                                handle->dht_handle->pending_tail,
1487                                pending);
1488   pending->in_pending_queue = GNUNET_YES;
1489   process_pending_messages (handle->dht_handle);
1490
1491   GNUNET_free_non_null (handle->key);
1492   GNUNET_free (handle);
1493 }
1494
1495 /* end of dht_api.c */