Fix invocation of just-built tools
[oweals/gnunet.git] / src / dht / dht_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/dht_api.c
23  * @brief library to access the DHT service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_dht_service.h"
35 #include "dht.h"
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__)
38
39 /**
40  * Entry in our list of messages to be (re-)transmitted.
41  */
42 struct PendingMessage
43 {
44   /**
45    * This is a doubly-linked list.
46    */
47   struct PendingMessage *prev;
48
49   /**
50    * This is a doubly-linked list.
51    */
52   struct PendingMessage *next;
53
54   /**
55    * Message that is pending, allocated at the end
56    * of this struct.
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60   /**
61    * Handle to the DHT API context.
62    */
63   struct GNUNET_DHT_Handle *handle;
64
65   /**
66    * Continuation to call when the request has been
67    * transmitted (for the first time) to the service; can be NULL.
68    */
69   GNUNET_SCHEDULER_Task cont;
70
71   /**
72    * Closure for 'cont'.
73    */
74   void *cont_cls;
75
76   /**
77    * Unique ID for this request
78    */
79   uint64_t unique_id;
80
81   /**
82    * Free the saved message once sent, set to GNUNET_YES for messages
83    * that do not receive responses; GNUNET_NO if this pending message
84    * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
85    * from there.
86    */
87   int free_on_send;
88
89   /**
90    * GNUNET_YES if this message is in our pending queue right now.
91    */
92   int in_pending_queue;
93
94 };
95
96
97 /**
98  * Handle to a PUT request.
99  */
100 struct GNUNET_DHT_PutHandle
101 {
102   /**
103    * Kept in a DLL.
104    */
105   struct GNUNET_DHT_PutHandle *next;
106
107   /**
108    * Kept in a DLL.
109    */
110   struct GNUNET_DHT_PutHandle *prev;
111
112   /**
113    * Continuation to call when done.
114    */
115   GNUNET_DHT_PutContinuation cont;
116
117   /**
118    * Pending message associated with this PUT operation, 
119    * NULL after the message has been transmitted to the service.
120    */
121   struct PendingMessage *pending;
122
123   /**
124    * Main handle to this DHT api
125    */
126   struct GNUNET_DHT_Handle *dht_handle;
127
128   /**
129    * Closure for 'cont'.
130    */
131   void *cont_cls;
132
133   /**
134    * Timeout task for this operation.
135    */
136   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
137
138   /**
139    * Unique ID for the PUT operation.
140    */
141   uint64_t unique_id;
142
143 };
144
145
146
147 /**
148  * Handle to a GET request
149  */
150 struct GNUNET_DHT_GetHandle
151 {
152
153   /**
154    * Iterator to call on data receipt
155    */
156   GNUNET_DHT_GetIterator iter;
157
158   /**
159    * Closure for the iterator callback
160    */
161   void *iter_cls;
162
163   /**
164    * Main handle to this DHT api
165    */
166   struct GNUNET_DHT_Handle *dht_handle;
167
168   /**
169    * The actual message sent for this request,
170    * used for retransmitting requests on service
171    * failure/reconnect.  Freed on route_stop.
172    */
173   struct PendingMessage *message;
174
175   /**
176    * Array of hash codes over the results that we have already
177    * seen.
178    */
179   struct GNUNET_HashCode *seen_results;
180
181   /**
182    * Key that this get request is for
183    */
184   struct GNUNET_HashCode key;  
185
186   /**
187    * Unique identifier for this request (for key collisions).
188    */
189   uint64_t unique_id;
190
191   /**
192    * Size of the 'seen_results' array.  Note that not
193    * all positions might be used (as we over-allocate).
194    */
195   unsigned int seen_results_size;
196
197   /**
198    * Offset into the 'seen_results' array marking the
199    * end of the positions that are actually used.
200    */
201   unsigned int seen_results_end;
202
203   /**
204    * Offset into the 'seen_results' array marking the 
205    * position up to where we've send the hash codes to
206    * the DHT for blocking (needed as we might not be
207    * able to send all hash codes at once).
208    */
209   unsigned int seen_results_transmission_offset;
210
211
212 };
213
214
215 /**
216  * Handle to a monitoring request.
217  */
218 struct GNUNET_DHT_MonitorHandle
219 {
220   /**
221    * DLL.
222    */
223   struct GNUNET_DHT_MonitorHandle *next;
224
225   /**
226    * DLL.
227    */
228   struct GNUNET_DHT_MonitorHandle *prev;
229   
230   /**
231    * Main handle to this DHT api.
232    */
233   struct GNUNET_DHT_Handle *dht_handle;
234
235   /**
236    * Type of block looked for.
237    */
238   enum GNUNET_BLOCK_Type type;
239
240   /**
241    * Key being looked for, NULL == all.
242    */
243   struct GNUNET_HashCode *key;
244
245   /**
246    * Callback for each received message of type get.
247    */
248   GNUNET_DHT_MonitorGetCB get_cb;
249
250   /**
251    * Callback for each received message of type get response.
252    */
253   GNUNET_DHT_MonitorGetRespCB get_resp_cb;
254
255   /**
256    * Callback for each received message of type put.
257    */
258   GNUNET_DHT_MonitorPutCB put_cb;
259
260   /**
261    * Closure for cb.
262    */
263   void *cb_cls;
264   
265 };
266
267
268 /**
269  * Connection to the DHT service.
270  */
271 struct GNUNET_DHT_Handle
272 {
273
274   /**
275    * Configuration to use.
276    */
277   const struct GNUNET_CONFIGURATION_Handle *cfg;
278
279   /**
280    * Socket (if available).
281    */
282   struct GNUNET_CLIENT_Connection *client;
283
284   /**
285    * Currently pending transmission request (or NULL).
286    */
287   struct GNUNET_CLIENT_TransmitHandle *th;
288
289   /**
290    * Head of linked list of messages we would like to transmit.
291    */
292   struct PendingMessage *pending_head;
293
294   /**
295    * Tail of linked list of messages we would like to transmit.
296    */
297   struct PendingMessage *pending_tail;
298
299   /**
300    * Head of linked list of messages we would like to monitor. 
301    */
302   struct GNUNET_DHT_MonitorHandle *monitor_head;
303
304   /**
305    * Tail of linked list of messages we would like to monitor.
306    */
307   struct GNUNET_DHT_MonitorHandle *monitor_tail;
308
309   /**
310    * Head of active PUT requests.
311    */
312   struct GNUNET_DHT_PutHandle *put_head;
313
314   /**
315    * Tail of active PUT requests.
316    */
317   struct GNUNET_DHT_PutHandle *put_tail;
318
319   /**
320    * Hash map containing the current outstanding unique GET requests
321    * (values are of type 'struct GNUNET_DHT_GetHandle').
322    */
323   struct GNUNET_CONTAINER_MultiHashMap *active_requests;
324
325   /**
326    * Task for trying to reconnect.
327    */
328   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
329
330   /**
331    * How quickly should we retry?  Used for exponential back-off on
332    * connect-errors.
333    */
334   struct GNUNET_TIME_Relative retry_time;
335
336   /**
337    * Generator for unique ids.
338    */
339   uint64_t uid_gen;
340
341   /**
342    * Did we start our receive loop yet?
343    */
344   int in_receive;
345 };
346
347
348 /**
349  * Handler for messages received from the DHT service
350  * a demultiplexer which handles numerous message types
351  *
352  * @param cls the 'struct GNUNET_DHT_Handle'
353  * @param msg the incoming message
354  */
355 static void
356 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
357
358
359 /**
360  * Try to (re)connect to the DHT service.
361  *
362  * @param handle DHT handle to reconnect
363  * @return GNUNET_YES on success, GNUNET_NO on failure.
364  */
365 static int
366 try_connect (struct GNUNET_DHT_Handle *handle)
367 {
368   if (NULL != handle->client)
369     return GNUNET_OK;
370   handle->in_receive = GNUNET_NO;
371   handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
372   if (NULL == handle->client)
373   {
374     LOG (GNUNET_ERROR_TYPE_WARNING,
375          _("Failed to connect to the DHT service!\n"));
376     return GNUNET_NO;
377   }
378   return GNUNET_YES;
379 }
380
381
382 /**
383  * Queue messages to DHT to block certain results from the result set.
384  *
385  * @param get_handle GET to generate messages for.
386  */
387 static void
388 queue_filter_messages (struct GNUNET_DHT_GetHandle *get_handle)
389 {
390   struct PendingMessage *pm;
391   struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
392   uint16_t msize;
393   unsigned int delta;
394   unsigned int max;
395
396   while (get_handle->seen_results_transmission_offset < get_handle->seen_results_end)
397   {
398     delta = get_handle->seen_results_end - get_handle->seen_results_transmission_offset;
399     max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
400     if (delta > max)
401       delta = max;
402     msize = sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + delta * sizeof (struct GNUNET_HashCode);
403     
404     pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
405     msg = (struct GNUNET_DHT_ClientGetResultSeenMessage *) &pm[1];
406     pm->msg = &msg->header;
407     pm->handle = get_handle->dht_handle;
408     pm->unique_id = get_handle->unique_id;
409     pm->free_on_send = GNUNET_YES;
410     pm->in_pending_queue = GNUNET_YES;
411     msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
412     msg->header.size = htons (msize);
413     msg->key = get_handle->key;
414     msg->unique_id = get_handle->unique_id;
415     memcpy (&msg[1],
416             &get_handle->seen_results[get_handle->seen_results_transmission_offset],
417             sizeof (struct GNUNET_HashCode) * delta);
418     get_handle->seen_results_transmission_offset += delta;
419     GNUNET_CONTAINER_DLL_insert_tail (get_handle->dht_handle->pending_head,
420                                       get_handle->dht_handle->pending_tail,
421                                       pm);  
422   }
423 }
424
425
426 /**
427  * Add the request corresponding to the given route handle
428  * to the pending queue (if it is not already in there).
429  *
430  * @param cls the 'struct GNUNET_DHT_Handle*'
431  * @param key key for the request (not used)
432  * @param value the 'struct GNUNET_DHT_GetHandle*'
433  * @return GNUNET_YES (always)
434  */
435 static int
436 add_request_to_pending (void *cls, const struct GNUNET_HashCode * key, void *value)
437 {
438   struct GNUNET_DHT_Handle *handle = cls;
439   struct GNUNET_DHT_GetHandle *get_handle = value;
440
441   if (GNUNET_NO == get_handle->message->in_pending_queue)
442   {
443     LOG (GNUNET_ERROR_TYPE_DEBUG,
444          "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
445          handle);
446     get_handle->seen_results_transmission_offset = 0;
447     GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
448                                  get_handle->message);
449     queue_filter_messages (get_handle);
450     get_handle->message->in_pending_queue = GNUNET_YES;
451   }
452   return GNUNET_YES;
453 }
454
455
456 /**
457  * Try to send messages from list of messages to send
458  *
459  * @param handle DHT_Handle
460  */
461 static void
462 process_pending_messages (struct GNUNET_DHT_Handle *handle);
463
464
465 /**
466  * Try reconnecting to the dht service.
467  *
468  * @param cls GNUNET_DHT_Handle
469  * @param tc scheduler context
470  */
471 static void
472 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
473 {
474   struct GNUNET_DHT_Handle *handle = cls;
475
476   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with DHT %p\n", handle);
477   handle->retry_time = GNUNET_TIME_STD_BACKOFF (handle->retry_time);
478   handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
479   if (GNUNET_YES != try_connect (handle))
480   {
481     LOG (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
482     return;
483   }
484   GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
485                                          &add_request_to_pending, handle);
486   process_pending_messages (handle);
487 }
488
489
490 /**
491  * Try reconnecting to the DHT service.
492  *
493  * @param handle handle to dht to (possibly) disconnect and reconnect
494  */
495 static void
496 do_disconnect (struct GNUNET_DHT_Handle *handle)
497 {
498   struct GNUNET_DHT_PutHandle *ph;
499   struct GNUNET_DHT_PutHandle *next;
500
501   if (NULL == handle->client)
502     return;
503   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task);
504   if (NULL != handle->th)
505     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
506   handle->th = NULL;
507   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
508               "Disconnecting from DHT service, will try to reconnect in %s\n",
509               GNUNET_STRINGS_relative_time_to_string (handle->retry_time,
510                                                       GNUNET_YES));
511   GNUNET_CLIENT_disconnect (handle->client);
512   handle->client = NULL;
513
514   /* signal disconnect to all PUT requests that were transmitted but waiting
515      for the put confirmation */
516   next = handle->put_head;
517   while (NULL != (ph = next))
518   {
519     next = ph->next;
520     if (NULL == ph->pending)
521     {
522       if (NULL != ph->cont)
523         ph->cont (ph->cont_cls, GNUNET_SYSERR);
524       GNUNET_DHT_put_cancel (ph);
525     }
526   }
527   handle->reconnect_task =
528       GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
529 }
530
531
532 /**
533  * Transmit the next pending message, called by notify_transmit_ready
534  *
535  * @param cls the DHT handle
536  * @param size number of bytes available in 'buf' for transmission
537  * @param buf where to copy messages for the service
538  * @return number of bytes written to 'buf'
539  */
540 static size_t
541 transmit_pending (void *cls, size_t size, void *buf);
542
543
544 /**
545  * Try to send messages from list of messages to send
546  *
547  * @param handle handle to DHT
548  */
549 static void
550 process_pending_messages (struct GNUNET_DHT_Handle *handle)
551 {
552   struct PendingMessage *head;
553
554   if (NULL == handle->client)
555   {
556     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
557                 "process_pending_messages called, but client is NULL, reconnecting\n");
558     do_disconnect (handle);
559     return;
560   }
561   if (NULL != handle->th)
562     return;
563   if (NULL == (head = handle->pending_head))
564     return;
565   handle->th =
566       GNUNET_CLIENT_notify_transmit_ready (handle->client,
567                                            ntohs (head->msg->size),
568                                            GNUNET_TIME_UNIT_FOREVER_REL,
569                                            GNUNET_YES, &transmit_pending,
570                                            handle);
571   if (NULL != handle->th)
572     return;
573   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
574               "notify_transmit_ready returned NULL, reconnecting\n");
575   do_disconnect (handle);
576 }
577
578
579 /**
580  * Transmit the next pending message, called by notify_transmit_ready
581  *
582  * @param cls the DHT handle
583  * @param size number of bytes available in 'buf' for transmission
584  * @param buf where to copy messages for the service
585  * @return number of bytes written to 'buf'
586  */
587 static size_t
588 transmit_pending (void *cls, size_t size, void *buf)
589 {
590   struct GNUNET_DHT_Handle *handle = cls;
591   struct PendingMessage *head;
592   size_t tsize;
593
594   handle->th = NULL;
595   if (NULL == buf)
596   {    
597     LOG (GNUNET_ERROR_TYPE_DEBUG,
598          "Transmission to DHT service failed!  Reconnecting!\n");
599     do_disconnect (handle);
600     return 0;
601   }
602   if (NULL == (head = handle->pending_head))
603     return 0;
604
605   tsize = ntohs (head->msg->size);
606   if (size < tsize)
607   {
608     process_pending_messages (handle);
609     return 0;
610   }
611   memcpy (buf, head->msg, tsize);
612   GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
613                                head);
614   head->in_pending_queue = GNUNET_NO;
615   if (NULL != head->cont)
616   {
617     head->cont (head->cont_cls, NULL);
618     head->cont = NULL;
619     head->cont_cls = NULL;
620   }
621   if (GNUNET_YES == head->free_on_send)
622     GNUNET_free (head);
623   process_pending_messages (handle);
624   LOG (GNUNET_ERROR_TYPE_DEBUG,
625        "Forwarded request of %u bytes to DHT service\n", (unsigned int) tsize);
626   if (GNUNET_NO == handle->in_receive)
627   {
628     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n");
629     handle->in_receive = GNUNET_YES;
630     GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
631                            GNUNET_TIME_UNIT_FOREVER_REL);
632   }
633   return tsize;
634 }
635
636
637 /**
638  * Process a given reply that might match the given
639  * request.
640  *
641  * @param cls the 'struct GNUNET_DHT_ClientResultMessage'
642  * @param key query of the request
643  * @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key
644  * @return GNUNET_YES to continue to iterate over all results,
645  *         GNUNET_NO if the reply is malformed or we found a matching request
646  */
647 static int
648 process_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
649 {
650   const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
651   struct GNUNET_DHT_GetHandle *get_handle = value;
652   const struct GNUNET_PeerIdentity *put_path;
653   const struct GNUNET_PeerIdentity *get_path;
654   struct GNUNET_HashCode hc;
655   uint32_t put_path_length;
656   uint32_t get_path_length;
657   size_t data_length;
658   size_t msize;
659   size_t meta_length;
660   const void *data;
661
662   if (dht_msg->unique_id != get_handle->unique_id)
663   {
664     /* UID mismatch */
665     LOG (GNUNET_ERROR_TYPE_DEBUG,
666          "Ignoring reply for %s: UID mismatch: %llu/%llu\n", GNUNET_h2s (key),
667          dht_msg->unique_id, get_handle->unique_id);
668     return GNUNET_YES;
669   }
670   msize = ntohs (dht_msg->header.size);
671   put_path_length = ntohl (dht_msg->put_path_length);
672   get_path_length = ntohl (dht_msg->get_path_length);
673   meta_length =
674       sizeof (struct GNUNET_DHT_ClientResultMessage) +
675       sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
676   if ((msize < meta_length) ||
677       (get_path_length >
678        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
679       (put_path_length >
680        GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
681   {
682     GNUNET_break (0);
683     return GNUNET_NO;
684   }
685   data_length = msize - meta_length;
686   LOG (GNUNET_ERROR_TYPE_DEBUG, "Giving %u byte reply for %s to application\n",
687        (unsigned int) data_length, GNUNET_h2s (key));
688   put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
689   get_path = &put_path[put_path_length];
690   data = &get_path[get_path_length];
691   /* remember that we've seen this result */
692   GNUNET_CRYPTO_hash (data, data_length, &hc);
693   if (get_handle->seen_results_size == get_handle->seen_results_end)  
694     GNUNET_array_grow (get_handle->seen_results,
695                        get_handle->seen_results_size,
696                        get_handle->seen_results_size * 2 + 1);
697   GNUNET_assert (get_handle->seen_results_end == get_handle->seen_results_transmission_offset);
698   get_handle->seen_results[get_handle->seen_results_end++] = hc;
699   /* no need to block it explicitly, service already knows about it! */
700   get_handle->seen_results_transmission_offset++;
701   
702   get_handle->iter (get_handle->iter_cls,
703                     GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
704                     get_path, get_path_length, put_path, put_path_length,
705                     ntohl (dht_msg->type), data_length, data);
706   return GNUNET_NO;
707 }
708
709 /**
710  * Process a get monitor message from the service.
711  *
712  * @param handle The DHT handle.
713  * @param msg Monitor get message from the service.
714  * 
715  * @return GNUNET_OK if everything went fine,
716  *         GNUNET_SYSERR if the message is malformed.
717  */
718 static int
719 process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
720                              const struct GNUNET_DHT_MonitorGetMessage *msg)
721 {
722   struct GNUNET_DHT_MonitorHandle *h;
723
724   for (h = handle->monitor_head; NULL != h; h = h->next)
725   {
726     int type_ok;
727     int key_ok;
728
729     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
730     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
731                                                sizeof (struct GNUNET_HashCode)));
732     if (type_ok && key_ok && (NULL != h->get_cb))
733       h->get_cb (h->cb_cls,
734                  ntohl (msg->options),
735                  (enum GNUNET_BLOCK_Type) ntohl(msg->type),
736                  ntohl (msg->hop_count),
737                  ntohl (msg->desired_replication_level),
738                  ntohl (msg->get_path_length),
739                  (struct GNUNET_PeerIdentity *) &msg[1],
740                  &msg->key);    
741   }
742   return GNUNET_OK;
743 }
744
745
746 /**
747  * Process a get response monitor message from the service.
748  *
749  * @param handle The DHT handle.
750  * @param msg monitor get response message from the service
751  * @return GNUNET_OK if everything went fine,
752  *         GNUNET_SYSERR if the message is malformed.
753  */
754 static int
755 process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
756                                   const struct GNUNET_DHT_MonitorGetRespMessage
757                                   *msg)
758 {
759   struct GNUNET_DHT_MonitorHandle *h;
760   struct GNUNET_PeerIdentity *path;
761   uint32_t getl;
762   uint32_t putl;
763   size_t msize;
764
765   msize = ntohs (msg->header.size);
766   path = (struct GNUNET_PeerIdentity *) &msg[1];
767   getl = ntohl (msg->get_path_length);
768   putl = ntohl (msg->put_path_length);
769   if ( (getl + putl < getl) ||
770        ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) )
771   {
772     GNUNET_break (0);
773     return GNUNET_SYSERR;
774   }
775   for (h = handle->monitor_head; NULL != h; h = h->next)
776   {
777     int type_ok;
778     int key_ok;
779
780     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
781     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
782                                                sizeof (struct GNUNET_HashCode)));
783     if (type_ok && key_ok && (NULL != h->get_resp_cb))
784       h->get_resp_cb (h->cb_cls,
785                       (enum GNUNET_BLOCK_Type) ntohl(msg->type),
786                       path, getl,
787                       &path[getl], putl,
788                       GNUNET_TIME_absolute_ntoh(msg->expiration_time),
789                       &msg->key,
790                       (void *) &path[getl + putl],
791                       msize -
792                       sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
793                       sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
794   }
795   return GNUNET_OK;
796 }
797
798
799 /**
800  * Process a put monitor message from the service.
801  *
802  * @param handle The DHT handle.
803  * @param msg Monitor put message from the service.
804  * 
805  * @return GNUNET_OK if everything went fine,
806  *         GNUNET_SYSERR if the message is malformed.
807  */
808 static int
809 process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
810                              const struct GNUNET_DHT_MonitorPutMessage *msg)
811 {
812   struct GNUNET_DHT_MonitorHandle *h;
813   size_t msize;
814   struct GNUNET_PeerIdentity *path;
815   uint32_t putl;
816
817   msize = ntohs (msg->header.size);
818   path = (struct GNUNET_PeerIdentity *) &msg[1];
819   putl = ntohl (msg->put_path_length);
820   if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl)
821   {
822     GNUNET_break (0);
823     return GNUNET_SYSERR;
824   }
825   for (h = handle->monitor_head; NULL != h; h = h->next)
826   {
827     int type_ok;
828     int key_ok;
829
830     type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
831     key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
832                                                sizeof (struct GNUNET_HashCode)));
833     if (type_ok && key_ok && (NULL != h->put_cb))
834       h->put_cb (h->cb_cls,
835                  ntohl (msg->options),
836                  (enum GNUNET_BLOCK_Type) ntohl(msg->type),
837                  ntohl (msg->hop_count),
838                  ntohl (msg->desired_replication_level),
839                  putl, path,
840                  GNUNET_TIME_absolute_ntoh(msg->expiration_time),
841                  &msg->key,
842                  (void *) &path[putl],
843                  msize -
844                  sizeof (struct GNUNET_DHT_MonitorPutMessage) -
845                  sizeof (struct GNUNET_PeerIdentity) * putl);
846   }
847   return GNUNET_OK;
848 }
849
850
851 /**
852  * Process a put confirmation message from the service.
853  *
854  * @param handle The DHT handle.
855  * @param msg confirmation message from the service.
856  * @return GNUNET_OK if everything went fine,
857  *         GNUNET_SYSERR if the message is malformed.
858  */
859 static int
860 process_put_confirmation_message (struct GNUNET_DHT_Handle *handle,
861                                   const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
862 {
863   struct GNUNET_DHT_PutHandle *ph;
864   GNUNET_DHT_PutContinuation cont;
865   void *cont_cls;
866
867   for (ph = handle->put_head; NULL != ph; ph = ph->next)
868     if (ph->unique_id == msg->unique_id)
869       break;
870   if (NULL == ph)
871     return GNUNET_OK;
872   cont = ph->cont;
873   cont_cls = ph->cont_cls;
874   GNUNET_DHT_put_cancel (ph);
875   if (NULL != cont) 
876     cont (cont_cls, GNUNET_OK);
877   return GNUNET_OK;
878 }
879
880
881 /**
882  * Handler for messages received from the DHT service
883  * a demultiplexer which handles numerous message types
884  *
885  * @param cls the 'struct GNUNET_DHT_Handle'
886  * @param msg the incoming message
887  */
888 static void
889 service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
890 {
891   struct GNUNET_DHT_Handle *handle = cls;
892   const struct GNUNET_DHT_ClientResultMessage *dht_msg;
893   uint16_t msize;
894   int ret;
895
896   if (NULL == msg)
897   {
898     LOG (GNUNET_ERROR_TYPE_DEBUG,
899          "Error receiving data from DHT service, reconnecting\n");
900     do_disconnect (handle);
901     return;
902   }
903   GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
904                          GNUNET_TIME_UNIT_FOREVER_REL);
905   ret = GNUNET_SYSERR;
906   msize = ntohs (msg->size);
907   switch (ntohs (msg->type))
908   {
909   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
910     if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
911     {
912       GNUNET_break (0);
913       break;
914     }
915     ret = process_monitor_get_message(handle,
916                                       (const struct GNUNET_DHT_MonitorGetMessage *) msg);
917     break;
918   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
919     if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
920     {
921       GNUNET_break (0);
922       break;
923     }
924     ret = process_monitor_get_resp_message(handle,
925                                            (const struct GNUNET_DHT_MonitorGetRespMessage *) msg);
926     break;
927   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
928     if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
929     {
930       GNUNET_break (0);
931       break;
932     }
933     ret = process_monitor_put_message(handle,
934                                       (const struct GNUNET_DHT_MonitorPutMessage *) msg);
935     break;
936   case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
937     /* Not implemented yet */
938     GNUNET_break(0);
939     break;
940   case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT:
941     if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
942     {
943       GNUNET_break (0);
944       break;
945     }
946     dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
947     LOG (GNUNET_ERROR_TYPE_DEBUG,
948          "Received reply for `%s' from DHT service %p\n",
949          GNUNET_h2s (&dht_msg->key), handle);
950     GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
951                                                       &dht_msg->key,
952                                                       &process_reply,
953                                                       (void *) dht_msg);
954     ret = GNUNET_OK;
955     break;
956   case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK:
957     if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage))
958     {
959       GNUNET_break (0);
960       break;
961     }
962     ret = process_put_confirmation_message (handle,
963                                             (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg);
964     break;
965   default:
966     GNUNET_break(0);
967     LOG (GNUNET_ERROR_TYPE_WARNING,
968          "Unknown DHT message type: %hu (%hu) size: %hu\n",
969          ntohs (msg->type), msg->type, msize);
970     break;
971   }
972   if (GNUNET_OK != ret)
973   {
974     GNUNET_break (0);
975     do_disconnect (handle);
976     return;
977   }
978 }
979
980
981 /**
982  * Initialize the connection with the DHT service.
983  *
984  * @param cfg configuration to use
985  * @param ht_len size of the internal hash table to use for
986  *               processing multiple GET/FIND requests in parallel
987  *
988  * @return handle to the DHT service, or NULL on error
989  */
990 struct GNUNET_DHT_Handle *
991 GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
992                     unsigned int ht_len)
993 {
994   struct GNUNET_DHT_Handle *handle;
995
996   handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
997   handle->cfg = cfg;
998   handle->uid_gen =
999       GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
1000   handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len, GNUNET_NO);
1001   if (GNUNET_NO == try_connect (handle))
1002   {
1003     GNUNET_DHT_disconnect (handle);
1004     return NULL;
1005   }
1006   return handle;
1007 }
1008
1009
1010 /**
1011  * Shutdown connection with the DHT service.
1012  *
1013  * @param handle handle of the DHT connection to stop
1014  */
1015 void
1016 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
1017 {
1018   struct PendingMessage *pm;
1019   struct GNUNET_DHT_PutHandle *ph;
1020
1021   GNUNET_assert (NULL != handle);
1022   GNUNET_assert (0 ==
1023                  GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
1024   if (NULL != handle->th)
1025   {
1026     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
1027     handle->th = NULL;
1028   }
1029   while (NULL != (pm = handle->pending_head))
1030   {
1031     GNUNET_assert (GNUNET_YES == pm->in_pending_queue);
1032     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
1033                                  pm);
1034     pm->in_pending_queue = GNUNET_NO;
1035     GNUNET_assert (GNUNET_YES == pm->free_on_send);
1036     if (NULL != pm->cont)
1037       pm->cont (pm->cont_cls, NULL);
1038     GNUNET_free (pm);
1039   }
1040   while (NULL != (ph = handle->put_head))
1041   {
1042     GNUNET_break (NULL == ph->pending);
1043     if (NULL != ph->cont)
1044       ph->cont (ph->cont_cls, GNUNET_SYSERR);
1045     GNUNET_DHT_put_cancel (ph);
1046   }
1047
1048   if (NULL != handle->client)
1049   {
1050     GNUNET_CLIENT_disconnect (handle->client);
1051     handle->client = NULL;
1052   }
1053   if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
1054     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1055   GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
1056   GNUNET_free (handle);
1057 }
1058
1059
1060 /**
1061  * Timeout for the transmission of a fire&forget-request.  Clean it up.
1062  *
1063  * @param cls the 'struct PendingMessage'
1064  * @param tc scheduler context
1065  */
1066 static void
1067 timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1068 {
1069   struct GNUNET_DHT_PutHandle *ph = cls;
1070   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1071
1072   ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1073   if (NULL != ph->pending)
1074   {
1075     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
1076                                  ph->pending);
1077     ph->pending->in_pending_queue = GNUNET_NO;
1078     GNUNET_free (ph->pending);
1079   }
1080   if (NULL != ph->cont)
1081     ph->cont (ph->cont_cls, GNUNET_NO);
1082   GNUNET_CONTAINER_DLL_remove (handle->put_head,
1083                                handle->put_tail,
1084                                ph);
1085   GNUNET_free (ph);
1086 }
1087
1088
1089 /**
1090  * Function called whenever the PUT message leaves the queue.  Sets
1091  * the message pointer in the put handle to NULL.
1092  *
1093  * @param cls the 'struct GNUNET_DHT_PutHandle'
1094  * @param tc unused
1095  */
1096 static void
1097 mark_put_message_gone (void *cls,
1098                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1099 {
1100   struct GNUNET_DHT_PutHandle *ph = cls;
1101
1102   ph->pending = NULL;
1103 }
1104
1105
1106 /**
1107  * Perform a PUT operation storing data in the DHT.  FIXME: we should
1108  * change the protocol to get a confirmation for the PUT from the DHT
1109  * and call 'cont' only after getting the confirmation; otherwise, the
1110  * client has no good way of telling if the 'PUT' message actually got
1111  * to the DHT service!
1112  *
1113  * @param handle handle to DHT service
1114  * @param key the key to store under
1115  * @param desired_replication_level estimate of how many
1116  *                nearest peers this request should reach
1117  * @param options routing options for this message
1118  * @param type type of the value
1119  * @param size number of bytes in data; must be less than 64k
1120  * @param data the data to store
1121  * @param exp desired expiration time for the value
1122  * @param timeout how long to wait for transmission of this request
1123  * @param cont continuation to call when done (transmitting request to service)
1124  *        You must not call GNUNET_DHT_DISCONNECT in this continuation
1125  * @param cont_cls closure for cont
1126  */
1127 struct GNUNET_DHT_PutHandle *
1128 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const struct GNUNET_HashCode * key,
1129                 uint32_t desired_replication_level,
1130                 enum GNUNET_DHT_RouteOption options,
1131                 enum GNUNET_BLOCK_Type type, size_t size, const void *data,
1132                 struct GNUNET_TIME_Absolute exp,
1133                 struct GNUNET_TIME_Relative timeout, GNUNET_DHT_PutContinuation cont,
1134                 void *cont_cls)
1135 {
1136   struct GNUNET_DHT_ClientPutMessage *put_msg;
1137   size_t msize;
1138   struct PendingMessage *pending;
1139   struct GNUNET_DHT_PutHandle *ph;
1140
1141   msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
1142   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1143       (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
1144   {
1145     GNUNET_break (0);
1146     return NULL;
1147   }
1148   ph = GNUNET_malloc (sizeof (struct GNUNET_DHT_PutHandle));
1149   ph->dht_handle = handle;
1150   ph->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, ph);
1151   ph->cont = cont;
1152   ph->cont_cls = cont_cls;
1153   ph->unique_id = ++handle->uid_gen;
1154   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1155   ph->pending = pending;
1156   put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
1157   pending->msg = &put_msg->header;
1158   pending->handle = handle;
1159   pending->cont = &mark_put_message_gone;
1160   pending->cont_cls = ph;
1161   pending->free_on_send = GNUNET_YES;
1162   put_msg->header.size = htons (msize);
1163   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
1164   put_msg->type = htonl (type);
1165   put_msg->options = htonl ((uint32_t) options);
1166   put_msg->desired_replication_level = htonl (desired_replication_level);
1167   put_msg->unique_id = ph->unique_id;
1168   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
1169   put_msg->key = *key;
1170   memcpy (&put_msg[1], data, size);
1171   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1172                                pending);
1173   pending->in_pending_queue = GNUNET_YES;
1174   GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1175                                     handle->put_tail,
1176                                     ph);
1177   process_pending_messages (handle);
1178   return ph;
1179 }
1180
1181
1182 /**
1183  * Cancels a DHT PUT operation.  Note that the PUT request may still
1184  * go out over the network (we can't stop that); However, if the PUT
1185  * has not yet been sent to the service, cancelling the PUT will stop
1186  * this from happening (but there is no way for the user of this API
1187  * to tell if that is the case).  The only use for this API is to 
1188  * prevent a later call to 'cont' from "GNUNET_DHT_put" (i.e. because
1189  * the system is shutting down).
1190  *
1191  * @param ph put operation to cancel ('cont' will no longer be called)
1192  */
1193 void
1194 GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1195 {
1196   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1197
1198   if (NULL != ph->pending)
1199   {
1200     GNUNET_CONTAINER_DLL_remove (handle->pending_head,
1201                                  handle->pending_tail,
1202                                  ph->pending);
1203     GNUNET_free (ph->pending);
1204     ph->pending = NULL;
1205   }
1206   if (ph->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1207   {
1208     GNUNET_SCHEDULER_cancel (ph->timeout_task);
1209     ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1210   }
1211   GNUNET_CONTAINER_DLL_remove (handle->put_head,
1212                                handle->put_tail,
1213                                ph);
1214   GNUNET_free (ph);
1215 }
1216
1217
1218 /**
1219  * Perform an asynchronous GET operation on the DHT identified. See
1220  * also "GNUNET_BLOCK_evaluate".
1221  *
1222  * @param handle handle to the DHT service
1223  * @param type expected type of the response object
1224  * @param key the key to look up
1225  * @param desired_replication_level estimate of how many
1226                   nearest peers this request should reach
1227  * @param options routing options for this message
1228  * @param xquery extended query data (can be NULL, depending on type)
1229  * @param xquery_size number of bytes in xquery
1230  * @param iter function to call on each result
1231  * @param iter_cls closure for iter
1232  * @return handle to stop the async get
1233  */
1234 struct GNUNET_DHT_GetHandle *
1235 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1236                       enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode * key,
1237                       uint32_t desired_replication_level,
1238                       enum GNUNET_DHT_RouteOption options, const void *xquery,
1239                       size_t xquery_size, GNUNET_DHT_GetIterator iter,
1240                       void *iter_cls)
1241 {
1242   struct GNUNET_DHT_ClientGetMessage *get_msg;
1243   struct GNUNET_DHT_GetHandle *get_handle;
1244   size_t msize;
1245   struct PendingMessage *pending;
1246
1247   msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size;
1248   if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1249       (xquery_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
1250   {
1251     GNUNET_break (0);
1252     return NULL;
1253   }
1254   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending query for %s to DHT %p\n",
1255        GNUNET_h2s (key), handle);
1256   pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1257   get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1];
1258   pending->msg = &get_msg->header;
1259   pending->handle = handle;
1260   pending->free_on_send = GNUNET_NO;
1261   get_msg->header.size = htons (msize);
1262   get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
1263   get_msg->options = htonl ((uint32_t) options);
1264   get_msg->desired_replication_level = htonl (desired_replication_level);
1265   get_msg->type = htonl (type);
1266   get_msg->key = *key;
1267   get_msg->unique_id = ++handle->uid_gen;
1268   memcpy (&get_msg[1], xquery, xquery_size);
1269   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1270                                pending);
1271   pending->in_pending_queue = GNUNET_YES;
1272   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
1273   get_handle->dht_handle = handle;
1274   get_handle->iter = iter;
1275   get_handle->iter_cls = iter_cls;
1276   get_handle->message = pending;
1277   get_handle->unique_id = get_msg->unique_id;
1278   GNUNET_CONTAINER_multihashmap_put (handle->active_requests, key, get_handle,
1279                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1280   process_pending_messages (handle);
1281   return get_handle;
1282 }
1283
1284
1285
1286 /**
1287  * Tell the DHT not to return any of the following known results
1288  * to this client.
1289  *
1290  * @param get_handle get operation for which results should be filtered
1291  * @param num_results number of results to be blocked that are
1292  *        provided in this call (size of the 'results' array)
1293  * @param results array of hash codes over the 'data' of the results
1294  *        to be blocked
1295  */
1296 void
1297 GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
1298                                      unsigned int num_results,
1299                                      const struct GNUNET_HashCode *results)
1300 {
1301   unsigned int needed;
1302
1303   needed = get_handle->seen_results_end + num_results;
1304   if (needed > get_handle->seen_results_size)  
1305     GNUNET_array_grow (get_handle->seen_results,
1306                        get_handle->seen_results_size,
1307                        needed);
1308   memcpy (&get_handle->seen_results[get_handle->seen_results_end],
1309           results,
1310           num_results * sizeof (struct GNUNET_HashCode));
1311   get_handle->seen_results_end += num_results;
1312   queue_filter_messages (get_handle);
1313   process_pending_messages (get_handle->dht_handle);
1314 }
1315
1316
1317 /**
1318  * Stop async DHT-get.
1319  *
1320  * @param get_handle handle to the GET operation to stop
1321  */
1322 void
1323 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1324 {
1325   struct GNUNET_DHT_Handle *handle;
1326   const struct GNUNET_DHT_ClientGetMessage *get_msg;
1327   struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
1328   struct PendingMessage *pending;
1329
1330   handle = get_handle->message->handle;
1331   get_msg =
1332       (const struct GNUNET_DHT_ClientGetMessage *) get_handle->message->msg;
1333   LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending STOP for %s to DHT via %p\n",
1334        GNUNET_h2s (&get_msg->key), handle);
1335   /* generate STOP */
1336   pending =
1337       GNUNET_malloc (sizeof (struct PendingMessage) +
1338                      sizeof (struct GNUNET_DHT_ClientGetStopMessage));
1339   stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1];
1340   pending->msg = &stop_msg->header;
1341   pending->handle = handle;
1342   pending->free_on_send = GNUNET_YES;
1343   stop_msg->header.size =
1344       htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage));
1345   stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
1346   stop_msg->reserved = htonl (0);
1347   stop_msg->unique_id = get_msg->unique_id;
1348   stop_msg->key = get_msg->key;
1349   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1350                                pending);
1351   pending->in_pending_queue = GNUNET_YES;
1352
1353   /* remove 'GET' from active status */
1354   GNUNET_assert (GNUNET_YES ==
1355                  GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
1356                                                        &get_msg->key,
1357                                                        get_handle));
1358   if (GNUNET_YES == get_handle->message->in_pending_queue)
1359   {
1360     GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
1361                                  get_handle->message);
1362     get_handle->message->in_pending_queue = GNUNET_NO;
1363   }
1364   GNUNET_free (get_handle->message);
1365   GNUNET_array_grow (get_handle->seen_results,
1366                      get_handle->seen_results_end,
1367                      0);
1368   GNUNET_free (get_handle);
1369   process_pending_messages (handle);
1370 }
1371
1372
1373 /**
1374  * Start monitoring the local DHT service.
1375  *
1376  * @param handle Handle to the DHT service.
1377  * @param type Type of blocks that are of interest.
1378  * @param key Key of data of interest, NULL for all.
1379  * @param get_cb Callback to process monitored get messages.
1380  * @param get_resp_cb Callback to process monitored get response messages.
1381  * @param put_cb Callback to process monitored put messages.
1382  * @param cb_cls Closure for cb.
1383  *
1384  * @return Handle to stop monitoring.
1385  */
1386 struct GNUNET_DHT_MonitorHandle *
1387 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
1388                           enum GNUNET_BLOCK_Type type,
1389                           const struct GNUNET_HashCode *key,
1390                           GNUNET_DHT_MonitorGetCB get_cb,
1391                           GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1392                           GNUNET_DHT_MonitorPutCB put_cb,
1393                           void *cb_cls)
1394 {
1395   struct GNUNET_DHT_MonitorHandle *h;
1396   struct GNUNET_DHT_MonitorStartStopMessage *m;
1397   struct PendingMessage *pending;
1398
1399   h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
1400   GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
1401
1402   h->get_cb = get_cb;
1403   h->get_resp_cb = get_resp_cb;
1404   h->put_cb = put_cb;
1405   h->cb_cls = cb_cls;
1406   h->type = type;
1407   h->dht_handle = handle;
1408   if (NULL != key)
1409   {
1410     h->key = GNUNET_malloc (sizeof(struct GNUNET_HashCode));
1411     memcpy (h->key, key, sizeof(struct GNUNET_HashCode));
1412   }
1413
1414   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
1415                            sizeof (struct PendingMessage));
1416   m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
1417   pending->msg = &m->header;
1418   pending->handle = handle;
1419   pending->free_on_send = GNUNET_YES;
1420   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
1421   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
1422   m->type = htonl(type);
1423   m->get = htons(NULL != get_cb);
1424   m->get_resp = htons(NULL != get_resp_cb);
1425   m->put = htons(NULL != put_cb);
1426   if (NULL != key) {
1427     m->filter_key = htons(1);
1428     memcpy (&m->key, key, sizeof(struct GNUNET_HashCode));
1429   }
1430   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1431                                pending);
1432   pending->in_pending_queue = GNUNET_YES;
1433   process_pending_messages (handle);
1434
1435   return h;
1436 }
1437
1438
1439 /**
1440  * Stop monitoring.
1441  *
1442  * @param handle The handle to the monitor request returned by monitor_start.
1443  *
1444  * On return get_handle will no longer be valid, caller must not use again!!!
1445  */
1446 void
1447 GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle)
1448 {
1449   struct GNUNET_DHT_MonitorStartStopMessage *m;
1450   struct PendingMessage *pending;
1451
1452   GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head,
1453                                handle->dht_handle->monitor_tail,
1454                                handle);
1455
1456   pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
1457                            sizeof (struct PendingMessage));
1458   m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
1459   pending->msg = &m->header;
1460   pending->handle = handle->dht_handle;
1461   pending->free_on_send = GNUNET_YES;
1462   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
1463   m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
1464   m->type = htonl(handle->type);
1465   m->get = htons(NULL != handle->get_cb);
1466   m->get_resp = htons(NULL != handle->get_resp_cb);
1467   m->put = htons(NULL != handle->put_cb);
1468   if (NULL != handle->key) {
1469     m->filter_key = htons(1);
1470     memcpy (&m->key, handle->key, sizeof(struct GNUNET_HashCode));
1471   }
1472   GNUNET_CONTAINER_DLL_insert (handle->dht_handle->pending_head,
1473                                handle->dht_handle->pending_tail,
1474                                pending);
1475   pending->in_pending_queue = GNUNET_YES;
1476   process_pending_messages (handle->dht_handle);
1477   
1478   GNUNET_free_non_null (handle->key);
1479   GNUNET_free (handle);
1480 }
1481
1482
1483
1484 /* end of dht_api.c */