332bc1eda03b4512a015783857e32a9d3a78479e
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file dht/gnunet-service-dht_clients.c
21  * @brief GNUnet DHT service's client management code
22  * @author Christian Grothoff
23  * @author Nathan Evans
24  */
25
26 #include "platform.h"
27 #include "gnunet_constants.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_statistics_service.h"
30 #include "gnunet-service-dht.h"
31 #include "gnunet-service-dht_datacache.h"
32 #include "gnunet-service-dht_neighbours.h"
33 #include "dht.h"
34
35
36 /**
37  * Should routing details be logged to stderr (for debugging)?
38  */
39 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
40
41 #define LOG(kind,...) GNUNET_log_from (kind, "dht-clients",__VA_ARGS__)
42
43
44 /**
45  * Struct containing information about a client,
46  * handle to connect to it, and any pending messages
47  * that need to be sent to it.
48  */
49 struct ClientHandle;
50
51
52 /**
53  * Entry in the local forwarding map for a client's GET request.
54  */
55 struct ClientQueryRecord
56 {
57
58   /**
59    * The key this request was about
60    */
61   struct GNUNET_HashCode key;
62
63   /**
64    * Kept in a DLL with @e client.
65    */
66   struct ClientQueryRecord *next;
67
68   /**
69    * Kept in a DLL with @e client.
70    */
71   struct ClientQueryRecord *prev;
72
73   /**
74    * Client responsible for the request.
75    */
76   struct ClientHandle *ch;
77
78   /**
79    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
80    */
81   const void *xquery;
82
83   /**
84    * Replies we have already seen for this request.
85    */
86   struct GNUNET_HashCode *seen_replies;
87
88   /**
89    * Pointer to this nodes heap location in the retry-heap (for fast removal)
90    */
91   struct GNUNET_CONTAINER_HeapNode *hnode;
92
93   /**
94    * What's the delay between re-try operations that we currently use for this
95    * request?
96    */
97   struct GNUNET_TIME_Relative retry_frequency;
98
99   /**
100    * What's the next time we should re-try this request?
101    */
102   struct GNUNET_TIME_Absolute retry_time;
103
104   /**
105    * The unique identifier of this request
106    */
107   uint64_t unique_id;
108
109   /**
110    * Number of bytes in xquery.
111    */
112   size_t xquery_size;
113
114   /**
115    * Number of entries in 'seen_replies'.
116    */
117   unsigned int seen_replies_count;
118
119   /**
120    * Desired replication level
121    */
122   uint32_t replication;
123
124   /**
125    * Any message options for this request
126    */
127   uint32_t msg_options;
128
129   /**
130    * The type for the data for the GET request.
131    */
132   enum GNUNET_BLOCK_Type type;
133
134 };
135
136
137 /**
138  * Struct containing paremeters of monitoring requests.
139  */
140 struct ClientMonitorRecord
141 {
142
143   /**
144    * Next element in DLL.
145    */
146   struct ClientMonitorRecord *next;
147
148   /**
149    * Previous element in DLL.
150    */
151   struct ClientMonitorRecord *prev;
152
153   /**
154    * Type of blocks that are of interest
155    */
156   enum GNUNET_BLOCK_Type type;
157
158   /**
159    * Key of data of interest, NULL for all.
160    */
161   struct GNUNET_HashCode *key;
162
163   /**
164    * Flag whether to notify about GET messages.
165    */
166   int16_t get;
167
168   /**
169    * Flag whether to notify about GET_REPONSE messages.
170    */
171   int16_t get_resp;
172
173   /**
174    * Flag whether to notify about PUT messages.
175    */
176   uint16_t put;
177
178   /**
179    * Client to notify of these requests.
180    */
181   struct ClientHandle *ch;
182 };
183
184
185 /**
186  * Struct containing information about a client,
187  * handle to connect to it, and any pending messages
188  * that need to be sent to it.
189  */
190 struct ClientHandle
191 {
192   /**
193    * Linked list of active queries of this client.
194    */
195   struct ClientQueryRecord *cqr_head;
196
197   /**
198    * Linked list of active queries of this client.
199    */
200   struct ClientQueryRecord *cqr_tail;
201
202   /**
203    * The handle to this client
204    */
205   struct GNUNET_SERVICE_Client *client;
206
207   /**
208    * The message queue to this client
209    */
210   struct GNUNET_MQ_Handle *mq;
211
212 };
213
214 /**
215  * Our handle to the BLOCK library.
216  */
217 struct GNUNET_BLOCK_Context *GDS_block_context;
218
219 /**
220  * Handle for the statistics service.
221  */
222 struct GNUNET_STATISTICS_Handle *GDS_stats;
223
224 /**
225  * Handle for the service.
226  */
227 struct GNUNET_SERVICE_Handle *GDS_service;
228
229 /**
230  * The configuration the DHT service is running with
231  */
232 const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
233
234 /**
235  * List of active monitoring requests.
236  */
237 static struct ClientMonitorRecord *monitor_head;
238
239 /**
240  * List of active monitoring requests.
241  */
242 static struct ClientMonitorRecord *monitor_tail;
243
244 /**
245  * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
246  */
247 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
248
249 /**
250  * Heap with all of our client's request, sorted by retry time (earliest on top).
251  */
252 static struct GNUNET_CONTAINER_Heap *retry_heap;
253
254 /**
255  * Task that re-transmits requests (using retry_heap).
256  */
257 static struct GNUNET_SCHEDULER_Task *retry_task;
258
259
260 /**
261  * Free data structures associated with the given query.
262  *
263  * @param record record to remove
264  */
265 static void
266 remove_client_record (struct ClientQueryRecord *record)
267 {
268   struct ClientHandle *ch = record->ch;
269
270   GNUNET_CONTAINER_DLL_remove (ch->cqr_head,
271                                ch->cqr_tail,
272                                record);
273   GNUNET_assert (GNUNET_YES ==
274                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
275                                                        &record->key,
276                                                        record));
277   if (NULL != record->hnode)
278     GNUNET_CONTAINER_heap_remove_node (record->hnode);
279   GNUNET_array_grow (record->seen_replies,
280                      record->seen_replies_count,
281                      0);
282   GNUNET_free (record);
283 }
284
285
286 /**
287  * Functions with this signature are called whenever a local client is
288  * connects to us.
289  *
290  * @param cls closure (NULL for dht)
291  * @param client identification of the client
292  * @param mq message queue for talking to @a client
293  * @return our `struct ClientHandle` for @a client
294  */
295 static void *
296 client_connect_cb (void *cls,
297                    struct GNUNET_SERVICE_Client *client,
298                    struct GNUNET_MQ_Handle *mq)
299 {
300   struct ClientHandle *ch;
301
302   ch = GNUNET_new (struct ClientHandle);
303   ch->client = client;
304   ch->mq = mq;
305   return ch;
306 }
307
308
309 /**
310  * Functions with this signature are called whenever a client
311  * is disconnected on the network level.
312  *
313  * @param cls closure (NULL for dht)
314  * @param client identification of the client
315  * @param app_ctx our `struct ClientHandle` for @a client
316  */
317 static void
318 client_disconnect_cb (void *cls,
319                       struct GNUNET_SERVICE_Client *client,
320                       void *app_ctx)
321 {
322   struct ClientHandle *ch = app_ctx;
323   struct ClientQueryRecord *cqr;
324   struct ClientMonitorRecord *monitor;
325
326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327               "Local client %p disconnects\n",
328               ch);
329   monitor = monitor_head;
330   while (NULL != monitor)
331   {
332     if (monitor->ch == ch)
333     {
334       struct ClientMonitorRecord *next;
335
336       next = monitor->next;
337       GNUNET_free_non_null (monitor->key);
338       GNUNET_CONTAINER_DLL_remove (monitor_head,
339                                    monitor_tail,
340                                    monitor);
341       GNUNET_free (monitor);
342       monitor = next;
343     }
344     else
345     {
346       monitor = monitor->next;
347     }
348   }
349   while (NULL != (cqr = ch->cqr_head))
350     remove_client_record (cqr);
351   GNUNET_free (ch);
352 }
353
354
355 /**
356  * Route the given request via the DHT.  This includes updating
357  * the bloom filter and retransmission times, building the P2P
358  * message and initiating the routing operation.
359  */
360 static void
361 transmit_request (struct ClientQueryRecord *cqr)
362 {
363   struct GNUNET_BLOCK_Group *bg;
364   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
365
366   GNUNET_STATISTICS_update (GDS_stats,
367                             gettext_noop ("# GET requests from clients injected"),
368                             1,
369                             GNUNET_NO);
370   bg = GNUNET_BLOCK_group_create (GDS_block_context,
371                                   cqr->type,
372                                   GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
373                                                             UINT32_MAX),
374                                   NULL,
375                                   0,
376                                   "seen-set-size",
377                                   cqr->seen_replies_count,
378                                   NULL);
379   GNUNET_BLOCK_group_set_seen (bg,
380                                cqr->seen_replies,
381                                cqr->seen_replies_count);
382   peer_bf
383     = GNUNET_CONTAINER_bloomfilter_init (NULL,
384                                          DHT_BLOOM_SIZE,
385                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
386   LOG (GNUNET_ERROR_TYPE_DEBUG,
387        "Initiating GET for %s, replication %u, already have %u replies\n",
388        GNUNET_h2s (&cqr->key),
389        cqr->replication,
390        cqr->seen_replies_count);
391   GDS_NEIGHBOURS_handle_get (cqr->type,
392                              cqr->msg_options,
393                              cqr->replication,
394                              0 /* hop count */ ,
395                              &cqr->key,
396                              cqr->xquery,
397                              cqr->xquery_size,
398                              bg,
399                              peer_bf);
400   GNUNET_BLOCK_group_destroy (bg);
401   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
402
403   /* exponential back-off for retries.
404    * max GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
405   cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
406   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
407 }
408
409
410 /**
411  * Task that looks at the #retry_heap and transmits all of the requests
412  * on the heap that are ready for transmission.  Then re-schedules
413  * itself (unless the heap is empty).
414  *
415  * @param cls unused
416  */
417 static void
418 transmit_next_request_task (void *cls)
419 {
420   struct ClientQueryRecord *cqr;
421   struct GNUNET_TIME_Relative delay;
422
423   retry_task = NULL;
424   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
425   {
426     cqr->hnode = NULL;
427     delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
428     if (delay.rel_value_us > 0)
429     {
430       cqr->hnode
431         = GNUNET_CONTAINER_heap_insert (retry_heap,
432                                         cqr,
433                                         cqr->retry_time.abs_value_us);
434       retry_task
435         = GNUNET_SCHEDULER_add_at (cqr->retry_time,
436                                    &transmit_next_request_task,
437                                    NULL);
438       return;
439     }
440     transmit_request (cqr);
441     cqr->hnode
442       = GNUNET_CONTAINER_heap_insert (retry_heap,
443                                       cqr,
444                                       cqr->retry_time.abs_value_us);
445   }
446 }
447
448
449 /**
450  * Check DHT PUT messages from the client.
451  *
452  * @param cls the client we received this message from
453  * @param dht_msg the actual message received
454  * @return #GNUNET_OK (always)
455  */
456 static int
457 check_dht_local_put (void *cls,
458                       const struct GNUNET_DHT_ClientPutMessage *dht_msg)
459 {
460   /* always well-formed */
461   return GNUNET_OK;
462 }
463
464
465 /**
466  * Handler for PUT messages.
467  *
468  * @param cls the client we received this message from
469  * @param dht_msg the actual message received
470  */
471 static void
472 handle_dht_local_put (void *cls,
473                       const struct GNUNET_DHT_ClientPutMessage *dht_msg)
474 {
475   struct ClientHandle *ch = cls;
476   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
477   uint16_t size;
478
479   size = ntohs (dht_msg->header.size);
480   GNUNET_STATISTICS_update (GDS_stats,
481                             gettext_noop ("# PUT requests received from clients"),
482                             1,
483                             GNUNET_NO);
484   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
485                "CLIENT-PUT %s\n",
486                GNUNET_h2s_full (&dht_msg->key));
487   /* give to local clients */
488   LOG (GNUNET_ERROR_TYPE_DEBUG,
489        "Handling local PUT of %u-bytes for query %s\n",
490        size - sizeof (struct GNUNET_DHT_ClientPutMessage),
491        GNUNET_h2s (&dht_msg->key));
492   GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
493                             &dht_msg->key,
494                             0,
495                             NULL,
496                             0,
497                             NULL,
498                             ntohl (dht_msg->type),
499                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
500                             &dht_msg[1]);
501   /* store locally */
502   GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
503                             &dht_msg->key,
504                             0,
505                             NULL,
506                             ntohl (dht_msg->type),
507                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
508                             &dht_msg[1]);
509   /* route to other peers */
510   peer_bf
511     = GNUNET_CONTAINER_bloomfilter_init (NULL,
512                                          DHT_BLOOM_SIZE,
513                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
514   GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
515                              ntohl (dht_msg->options),
516                              ntohl (dht_msg->desired_replication_level),
517                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
518                              0 /* hop count */,
519                              peer_bf,
520                              &dht_msg->key,
521                              0,
522                              NULL,
523                              &dht_msg[1],
524                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
525   GDS_CLIENTS_process_put (ntohl (dht_msg->options),
526                            ntohl (dht_msg->type),
527                            0,
528                            ntohl (dht_msg->desired_replication_level),
529                            1,
530                            GDS_NEIGHBOURS_get_id(),
531                            GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
532                            &dht_msg->key,
533                            &dht_msg[1],
534                            size - sizeof (struct GNUNET_DHT_ClientPutMessage));
535   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
536   GNUNET_SERVICE_client_continue (ch->client);
537 }
538
539
540 /**
541  * Check DHT GET messages from the client.
542  *
543  * @param cls the client we received this message from
544  * @param message the actual message received
545  * @return #GNUNET_OK (always)
546  */
547 static int
548 check_dht_local_get (void *cls,
549                       const struct GNUNET_DHT_ClientGetMessage *get)
550 {
551   /* always well-formed */
552   return GNUNET_OK;
553 }
554
555
556 /**
557  * Handle a result from local datacache for a GET operation.
558  *
559  * @param cls the `struct ClientHandle` of the client doing the query
560  * @param type type of the block
561  * @param expiration_time when does the content expire
562  * @param key key for the content
563  * @param put_path_length number of entries in @a put_path
564  * @param put_path peers the original PUT traversed (if tracked)
565  * @param get_path_length number of entries in @a get_path
566  * @param get_path peers this reply has traversed so far (if tracked)
567  * @param data payload of the reply
568  * @param data_size number of bytes in @a data
569  */
570 static void
571 handle_local_result (void *cls,
572                      enum GNUNET_BLOCK_Type type,
573                      struct GNUNET_TIME_Absolute expiration_time,
574                      const struct GNUNET_HashCode *key,
575                      unsigned int put_path_length,
576                      const struct GNUNET_PeerIdentity *put_path,
577                      unsigned int get_path_length,
578                      const struct GNUNET_PeerIdentity *get_path,
579                      const void *data,
580                      size_t data_size)
581 {
582   // FIXME: this needs some clean up: inline the function,
583   // possibly avoid even looking up the client!
584   GDS_CLIENTS_handle_reply (expiration_time,
585                             key,
586                             0, NULL,
587                             put_path_length, put_path,
588                             type,
589                             data_size, data);
590 }
591
592
593 /**
594  * Handler for DHT GET messages from the client.
595  *
596  * @param cls the client we received this message from
597  * @param message the actual message received
598  */
599 static void
600 handle_dht_local_get (void *cls,
601                       const struct GNUNET_DHT_ClientGetMessage *get)
602 {
603   struct ClientHandle *ch = cls;
604   struct ClientQueryRecord *cqr;
605   size_t xquery_size;
606   const char *xquery;
607   uint16_t size;
608
609   size = ntohs (get->header.size);
610   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
611   xquery = (const char *) &get[1];
612   GNUNET_STATISTICS_update (GDS_stats,
613                             gettext_noop
614                             ("# GET requests received from clients"), 1,
615                             GNUNET_NO);
616   LOG (GNUNET_ERROR_TYPE_DEBUG,
617        "Received GET request for %s from local client %p, xq: %.*s\n",
618        GNUNET_h2s (&get->key),
619        ch->client,
620        xquery_size,
621        xquery);
622   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
623                "CLIENT-GET %s\n",
624                GNUNET_h2s_full (&get->key));
625
626   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
627   cqr->key = get->key;
628   cqr->ch = ch;
629   cqr->xquery = (void *) &cqr[1];
630   GNUNET_memcpy (&cqr[1], xquery, xquery_size);
631   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0);
632   cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
633   cqr->retry_time = GNUNET_TIME_absolute_get ();
634   cqr->unique_id = get->unique_id;
635   cqr->xquery_size = xquery_size;
636   cqr->replication = ntohl (get->desired_replication_level);
637   cqr->msg_options = ntohl (get->options);
638   cqr->type = ntohl (get->type);
639   GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
640                                ch->cqr_tail,
641                                cqr);
642   GNUNET_CONTAINER_multihashmap_put (forward_map,
643                                      &cqr->key,
644                                      cqr,
645                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
646   GDS_CLIENTS_process_get (ntohl (get->options),
647                            ntohl (get->type),
648                            0,
649                            ntohl (get->desired_replication_level),
650                            1,
651                            GDS_NEIGHBOURS_get_id(),
652                            &get->key);
653   /* start remote requests */
654   if (NULL != retry_task)
655     GNUNET_SCHEDULER_cancel (retry_task);
656   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task,
657                                          NULL);
658   /* perform local lookup */
659   GDS_DATACACHE_handle_get (&get->key,
660                             cqr->type,
661                             cqr->xquery,
662                             xquery_size,
663                             NULL,
664                             &handle_local_result,
665                             ch);
666   GNUNET_SERVICE_client_continue (ch->client);
667 }
668
669
670 /**
671  * Closure for #find_by_unique_id().
672  */
673 struct FindByUniqueIdContext
674 {
675   /**
676    * Where to store the result, if found.
677    */
678   struct ClientQueryRecord *cqr;
679
680   uint64_t unique_id;
681 };
682
683
684 /**
685  * Function called for each existing DHT record for the given
686  * query.  Checks if it matches the UID given in the closure
687  * and if so returns the entry as a result.
688  *
689  * @param cls the search context
690  * @param key query for the lookup (not used)
691  * @param value the `struct ClientQueryRecord`
692  * @return #GNUNET_YES to continue iteration (result not yet found)
693  */
694 static int
695 find_by_unique_id (void *cls,
696                    const struct GNUNET_HashCode *key,
697                    void *value)
698 {
699   struct FindByUniqueIdContext *fui_ctx = cls;
700   struct ClientQueryRecord *cqr = value;
701
702   if (cqr->unique_id != fui_ctx->unique_id)
703     return GNUNET_YES;
704   fui_ctx->cqr = cqr;
705   return GNUNET_NO;
706 }
707
708
709 /**
710  * Check "GET result seen" messages from the client.
711  *
712  * @param cls the client we received this message from
713  * @param message the actual message received
714  * @return #GNUNET_OK if @a seen is well-formed
715  */
716 static int
717 check_dht_local_get_result_seen (void *cls,
718                                  const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
719 {
720   uint16_t size;
721   unsigned int hash_count;
722
723   size = ntohs (seen->header.size);
724   hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
725   if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode))
726   {
727     GNUNET_break (0);
728     return GNUNET_SYSERR;
729   }
730   return GNUNET_OK;
731 }
732
733
734 /**
735  * Handler for "GET result seen" messages from the client.
736  *
737  * @param cls the client we received this message from
738  * @param message the actual message received
739  */
740 static void
741 handle_dht_local_get_result_seen (void *cls,
742                                   const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
743 {
744   struct ClientHandle *ch = cls;
745   uint16_t size;
746   unsigned int hash_count;
747   unsigned int old_count;
748   const struct GNUNET_HashCode *hc;
749   struct FindByUniqueIdContext fui_ctx;
750   struct ClientQueryRecord *cqr;
751
752   size = ntohs (seen->header.size);
753   hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
754   hc = (const struct GNUNET_HashCode*) &seen[1];
755   fui_ctx.unique_id = seen->unique_id;
756   fui_ctx.cqr = NULL;
757   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
758                                               &seen->key,
759                                               &find_by_unique_id,
760                                               &fui_ctx);
761   if (NULL == (cqr = fui_ctx.cqr))
762   {
763     GNUNET_break (0);
764     GNUNET_SERVICE_client_drop (ch->client);
765     return;
766   }
767   /* finally, update 'seen' list */
768   old_count = cqr->seen_replies_count;
769   GNUNET_array_grow (cqr->seen_replies,
770                      cqr->seen_replies_count,
771                      cqr->seen_replies_count + hash_count);
772   GNUNET_memcpy (&cqr->seen_replies[old_count],
773                  hc,
774                  sizeof (struct GNUNET_HashCode) * hash_count);
775 }
776
777
778 /**
779  * Closure for #remove_by_unique_id().
780  */
781 struct RemoveByUniqueIdContext
782 {
783   /**
784    * Client that issued the removal request.
785    */
786   struct ClientHandle *ch;
787
788   /**
789    * Unique ID of the request.
790    */
791   uint64_t unique_id;
792 };
793
794
795 /**
796  * Iterator over hash map entries that frees all entries
797  * that match the given client and unique ID.
798  *
799  * @param cls unique ID and client to search for in source routes
800  * @param key current key code
801  * @param value value in the hash map, a ClientQueryRecord
802  * @return #GNUNET_YES (we should continue to iterate)
803  */
804 static int
805 remove_by_unique_id (void *cls,
806                      const struct GNUNET_HashCode *key,
807                      void *value)
808 {
809   const struct RemoveByUniqueIdContext *ctx = cls;
810   struct ClientQueryRecord *cqr = value;
811
812   if (cqr->unique_id != ctx->unique_id)
813     return GNUNET_YES;
814   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815               "Removing client %p's record for key %s (by unique id)\n",
816               ctx->ch->client,
817               GNUNET_h2s (key));
818   remove_client_record (cqr);
819   return GNUNET_YES;
820 }
821
822
823 /**
824  * Handler for any generic DHT stop messages, calls the appropriate handler
825  * depending on message type (if processed locally)
826  *
827  * @param cls client we received this message from
828  * @param message the actual message received
829  *
830  */
831 static void
832 handle_dht_local_get_stop (void *cls,
833                            const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
834 {
835   struct ClientHandle *ch = cls;
836   struct RemoveByUniqueIdContext ctx;
837
838   GNUNET_STATISTICS_update (GDS_stats,
839                             gettext_noop
840                             ("# GET STOP requests received from clients"), 1,
841                             GNUNET_NO);
842   LOG (GNUNET_ERROR_TYPE_DEBUG,
843        "Received GET STOP request for %s from local client %p\n",
844        GNUNET_h2s (&dht_stop_msg->key),
845        ch->client);
846   ctx.ch = ch;
847   ctx.unique_id = dht_stop_msg->unique_id;
848   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
849                                               &dht_stop_msg->key,
850                                               &remove_by_unique_id,
851                                               &ctx);
852   GNUNET_SERVICE_client_continue (ch->client);
853 }
854
855
856 /**
857  * Handler for monitor start messages
858  *
859  * @param cls the client we received this message from
860  * @param msg the actual message received
861  *
862  */
863 static void
864 handle_dht_local_monitor (void *cls,
865                           const struct GNUNET_DHT_MonitorStartStopMessage *msg)
866 {
867   struct ClientHandle *ch = cls;
868   struct ClientMonitorRecord *r;
869
870   r = GNUNET_new (struct ClientMonitorRecord);
871   r->ch = ch;
872   r->type = ntohl (msg->type);
873   r->get = ntohs (msg->get);
874   r->get_resp = ntohs (msg->get_resp);
875   r->put = ntohs (msg->put);
876   if (0 == ntohs (msg->filter_key))
877   {
878     r->key = NULL;
879   }
880   else
881   {
882     r->key = GNUNET_new (struct GNUNET_HashCode);
883     GNUNET_memcpy (r->key,
884                    &msg->key,
885                    sizeof (struct GNUNET_HashCode));
886   }
887   GNUNET_CONTAINER_DLL_insert (monitor_head,
888                                monitor_tail,
889                                r);
890   GNUNET_SERVICE_client_continue (ch->client);
891 }
892
893
894 /**
895  * Handler for monitor stop messages
896  *
897  * @param cls the client we received this message from
898  * @param msg the actual message received
899  */
900 static void
901 handle_dht_local_monitor_stop (void *cls,
902                                const struct GNUNET_DHT_MonitorStartStopMessage *msg)
903 {
904   struct ClientHandle *ch = cls;
905   struct ClientMonitorRecord *r;
906   int keys_match;
907
908   GNUNET_SERVICE_client_continue (ch->client);
909   for (r = monitor_head; NULL != r; r = r->next)
910   {
911     if (NULL == r->key)
912     {
913       keys_match = (0 == ntohs(msg->filter_key));
914     }
915     else
916     {
917       keys_match = ( (0 != ntohs(msg->filter_key)) &&
918                      (! memcmp (r->key,
919                                 &msg->key,
920                                 sizeof(struct GNUNET_HashCode))) );
921     }
922     if ( (ch == r->ch) &&
923          (ntohl(msg->type) == r->type) &&
924          (r->get == msg->get) &&
925          (r->get_resp == msg->get_resp) &&
926          (r->put == msg->put) &&
927          keys_match )
928     {
929       GNUNET_CONTAINER_DLL_remove (monitor_head,
930                                    monitor_tail,
931                                    r);
932       GNUNET_free_non_null (r->key);
933       GNUNET_free (r);
934       return; /* Delete only ONE entry */
935     }
936   }
937 }
938
939
940 /**
941  * Closure for #forward_reply()
942  */
943 struct ForwardReplyContext
944 {
945
946   /**
947    * Expiration time of the reply.
948    */
949   struct GNUNET_TIME_Absolute expiration;
950
951   /**
952    * GET path taken.
953    */
954   const struct GNUNET_PeerIdentity *get_path;
955
956   /**
957    * PUT path taken.
958    */
959   const struct GNUNET_PeerIdentity *put_path;
960
961   /**
962    * Embedded payload.
963    */
964   const void *data;
965
966   /**
967    * Number of bytes in data.
968    */
969   size_t data_size;
970
971   /**
972    * Number of entries in @e get_path.
973    */
974   unsigned int get_path_length;
975
976   /**
977    * Number of entries in @e put_path.
978    */
979   unsigned int put_path_length;
980
981   /**
982    * Type of the data.
983    */
984   enum GNUNET_BLOCK_Type type;
985
986 };
987
988
989 /**
990  * Iterator over hash map entries that send a given reply to
991  * each of the matching clients.  With some tricky recycling
992  * of the buffer.
993  *
994  * @param cls the 'struct ForwardReplyContext'
995  * @param key current key
996  * @param value value in the hash map, a ClientQueryRecord
997  * @return #GNUNET_YES (we should continue to iterate),
998  *         if the result is mal-formed, #GNUNET_NO
999  */
1000 static int
1001 forward_reply (void *cls,
1002                const struct GNUNET_HashCode *key,
1003                void *value)
1004 {
1005   struct ForwardReplyContext *frc = cls;
1006   struct ClientQueryRecord *record = value;
1007   struct GNUNET_MQ_Envelope *env;
1008   struct GNUNET_DHT_ClientResultMessage *reply;
1009   enum GNUNET_BLOCK_EvaluationResult eval;
1010   int do_free;
1011   struct GNUNET_HashCode ch;
1012   struct GNUNET_PeerIdentity *paths;
1013
1014   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1015                "CLIENT-RESULT %s\n",
1016                GNUNET_h2s_full (key));
1017   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
1018        (record->type != frc->type))
1019   {
1020     LOG (GNUNET_ERROR_TYPE_DEBUG,
1021          "Record type mismatch, not passing request for key %s to local client\n",
1022          GNUNET_h2s (key));
1023     GNUNET_STATISTICS_update (GDS_stats,
1024                               gettext_noop
1025                               ("# Key match, type mismatches in REPLY to CLIENT"),
1026                               1, GNUNET_NO);
1027     return GNUNET_YES;          /* type mismatch */
1028   }
1029   GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch);
1030   for (unsigned int i = 0; i < record->seen_replies_count; i++)
1031     if (0 == memcmp (&record->seen_replies[i],
1032                      &ch,
1033                      sizeof (struct GNUNET_HashCode)))
1034     {
1035       LOG (GNUNET_ERROR_TYPE_DEBUG,
1036            "Duplicate reply, not passing request for key %s to local client\n",
1037            GNUNET_h2s (key));
1038       GNUNET_STATISTICS_update (GDS_stats,
1039                                 gettext_noop
1040                                 ("# Duplicate REPLIES to CLIENT request dropped"),
1041                                 1, GNUNET_NO);
1042       return GNUNET_YES;        /* duplicate */
1043     }
1044   eval
1045     = GNUNET_BLOCK_evaluate (GDS_block_context,
1046                              record->type,
1047                              NULL,
1048                              GNUNET_BLOCK_EO_NONE,
1049                              key,
1050                              record->xquery,
1051                              record->xquery_size,
1052                              frc->data,
1053                              frc->data_size);
1054   LOG (GNUNET_ERROR_TYPE_DEBUG,
1055        "Evaluation result is %d for key %s for local client's query\n",
1056        (int) eval,
1057        GNUNET_h2s (key));
1058   switch (eval)
1059   {
1060   case GNUNET_BLOCK_EVALUATION_OK_LAST:
1061     do_free = GNUNET_YES;
1062     break;
1063   case GNUNET_BLOCK_EVALUATION_OK_MORE:
1064     GNUNET_array_append (record->seen_replies,
1065                          record->seen_replies_count,
1066                          ch);
1067     do_free = GNUNET_NO;
1068     break;
1069   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1070     /* should be impossible to encounter here */
1071     GNUNET_break (0);
1072     return GNUNET_YES;
1073   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1074     GNUNET_break_op (0);
1075     return GNUNET_NO;
1076   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1077     GNUNET_break (0);
1078     return GNUNET_NO;
1079   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1080     GNUNET_break (0);
1081     return GNUNET_NO;
1082   case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1083     return GNUNET_YES;
1084   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1085     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1086                 _("Unsupported block type (%u) in request!\n"), record->type);
1087     return GNUNET_NO;
1088   default:
1089     GNUNET_break (0);
1090     return GNUNET_NO;
1091   }
1092   GNUNET_STATISTICS_update (GDS_stats,
1093                             gettext_noop ("# RESULTS queued for clients"),
1094                             1,
1095                             GNUNET_NO);
1096   env = GNUNET_MQ_msg_extra (reply,
1097                              frc->data_size +
1098                              (frc->get_path_length + frc->put_path_length) * sizeof (struct GNUNET_PeerIdentity),
1099                              GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1100   reply->type = htonl (frc->type);
1101   reply->get_path_length = htonl (frc->get_path_length);
1102   reply->put_path_length = htonl (frc->put_path_length);
1103   reply->unique_id = record->unique_id;
1104   reply->expiration = GNUNET_TIME_absolute_hton (frc->expiration);
1105   reply->key = *key;
1106   paths = (struct GNUNET_PeerIdentity *) &reply[1];
1107   GNUNET_memcpy (paths,
1108                  frc->put_path,
1109                  sizeof (struct GNUNET_PeerIdentity) * frc->put_path_length);
1110   GNUNET_memcpy (&paths[frc->put_path_length],
1111                  frc->get_path,
1112                  sizeof (struct GNUNET_PeerIdentity) * frc->get_path_length);
1113   GNUNET_memcpy (&paths[frc->get_path_length + frc->put_path_length],
1114                  frc->data,
1115                  frc->data_size);
1116   LOG (GNUNET_ERROR_TYPE_DEBUG,
1117        "Sending reply to query %s for client %p\n",
1118        GNUNET_h2s (key),
1119        record->ch->client);
1120   GNUNET_MQ_send (record->ch->mq,
1121                   env);
1122   if (GNUNET_YES == do_free)
1123     remove_client_record (record);
1124   return GNUNET_YES;
1125 }
1126
1127
1128 /**
1129  * Handle a reply we've received from another peer.  If the reply
1130  * matches any of our pending queries, forward it to the respective
1131  * client(s).
1132  *
1133  * @param expiration when will the reply expire
1134  * @param key the query this reply is for
1135  * @param get_path_length number of peers in @a get_path
1136  * @param get_path path the reply took on get
1137  * @param put_path_length number of peers in @a put_path
1138  * @param put_path path the reply took on put
1139  * @param type type of the reply
1140  * @param data_size number of bytes in @a data
1141  * @param data application payload data
1142  */
1143 void
1144 GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
1145                           const struct GNUNET_HashCode *key,
1146                           unsigned int get_path_length,
1147                           const struct GNUNET_PeerIdentity *get_path,
1148                           unsigned int put_path_length,
1149                           const struct GNUNET_PeerIdentity *put_path,
1150                           enum GNUNET_BLOCK_Type type,
1151                           size_t data_size,
1152                           const void *data)
1153 {
1154   struct ForwardReplyContext frc;
1155   size_t msize;
1156
1157   msize = sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size +
1158     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1159   if (msize >= GNUNET_MAX_MESSAGE_SIZE)
1160   {
1161     GNUNET_break (0);
1162     return;
1163   }
1164   if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map,
1165                                                  key))
1166   {
1167     LOG (GNUNET_ERROR_TYPE_DEBUG,
1168          "No matching client for reply for key %s\n",
1169          GNUNET_h2s (key));
1170     GNUNET_STATISTICS_update (GDS_stats,
1171                               gettext_noop ("# REPLIES ignored for CLIENTS (no match)"),
1172                               1,
1173                               GNUNET_NO);
1174     return;                     /* no matching request, fast exit! */
1175   }
1176   frc.expiration = expiration;
1177   frc.get_path = get_path;
1178   frc.put_path = put_path;
1179   frc.data = data;
1180   frc.data_size = data_size;
1181   frc.get_path_length = get_path_length;
1182   frc.put_path_length = put_path_length;
1183   frc.type = type;
1184   LOG (GNUNET_ERROR_TYPE_DEBUG,
1185        "Forwarding reply for key %s to client\n",
1186        GNUNET_h2s (key));
1187   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1188                                               key,
1189                                               &forward_reply,
1190                                               &frc);
1191
1192 }
1193
1194
1195 /**
1196  * Check if some client is monitoring GET messages and notify
1197  * them in that case.
1198  *
1199  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1200  * @param type The type of data in the request.
1201  * @param hop_count Hop count so far.
1202  * @param path_length number of entries in path (or 0 if not recorded).
1203  * @param path peers on the GET path (or NULL if not recorded).
1204  * @param desired_replication_level Desired replication level.
1205  * @param key Key of the requested data.
1206  */
1207 void
1208 GDS_CLIENTS_process_get (uint32_t options,
1209                          enum GNUNET_BLOCK_Type type,
1210                          uint32_t hop_count,
1211                          uint32_t desired_replication_level,
1212                          unsigned int path_length,
1213                          const struct GNUNET_PeerIdentity *path,
1214                          const struct GNUNET_HashCode * key)
1215 {
1216   struct ClientMonitorRecord *m;
1217   struct ClientHandle **cl;
1218   unsigned int cl_size;
1219
1220   cl = NULL;
1221   cl_size = 0;
1222   for (m = monitor_head; NULL != m; m = m->next)
1223   {
1224     if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1225            (m->type == type) ) &&
1226          ( (NULL == m->key) ||
1227            (0 == memcmp (key,
1228                          m->key,
1229                          sizeof(struct GNUNET_HashCode))) ) )
1230     {
1231       struct GNUNET_MQ_Envelope *env;
1232       struct GNUNET_DHT_MonitorGetMessage *mmsg;
1233       struct GNUNET_PeerIdentity *msg_path;
1234       size_t msize;
1235       unsigned int i;
1236
1237       /* Don't send duplicates */
1238       for (i = 0; i < cl_size; i++)
1239         if (cl[i] == m->ch)
1240           break;
1241       if (i < cl_size)
1242         continue;
1243       GNUNET_array_append (cl,
1244                            cl_size,
1245                            m->ch);
1246
1247       msize = path_length * sizeof (struct GNUNET_PeerIdentity);
1248       env = GNUNET_MQ_msg_extra (mmsg,
1249                                  msize,
1250                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1251       mmsg->options = htonl(options);
1252       mmsg->type = htonl(type);
1253       mmsg->hop_count = htonl(hop_count);
1254       mmsg->desired_replication_level = htonl(desired_replication_level);
1255       mmsg->get_path_length = htonl(path_length);
1256       mmsg->key = *key;
1257       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1258       GNUNET_memcpy (msg_path,
1259                      path,
1260                      path_length * sizeof (struct GNUNET_PeerIdentity));
1261       GNUNET_MQ_send (m->ch->mq,
1262                       env);
1263     }
1264   }
1265   GNUNET_free_non_null (cl);
1266 }
1267
1268
1269 /**
1270  * Check if some client is monitoring GET RESP messages and notify
1271  * them in that case.
1272  *
1273  * @param type The type of data in the result.
1274  * @param get_path Peers on GET path (or NULL if not recorded).
1275  * @param get_path_length number of entries in get_path.
1276  * @param put_path peers on the PUT path (or NULL if not recorded).
1277  * @param put_path_length number of entries in get_path.
1278  * @param exp Expiration time of the data.
1279  * @param key Key of the data.
1280  * @param data Pointer to the result data.
1281  * @param size Number of bytes in @a data.
1282  */
1283 void
1284 GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1285                               const struct GNUNET_PeerIdentity *get_path,
1286                               unsigned int get_path_length,
1287                               const struct GNUNET_PeerIdentity *put_path,
1288                               unsigned int put_path_length,
1289                               struct GNUNET_TIME_Absolute exp,
1290                               const struct GNUNET_HashCode * key,
1291                               const void *data,
1292                               size_t size)
1293 {
1294   struct ClientMonitorRecord *m;
1295   struct ClientHandle **cl;
1296   unsigned int cl_size;
1297
1298   cl = NULL;
1299   cl_size = 0;
1300   for (m = monitor_head; NULL != m; m = m->next)
1301   {
1302     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1303         (NULL == m->key ||
1304          memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1305     {
1306       struct GNUNET_MQ_Envelope *env;
1307       struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1308       struct GNUNET_PeerIdentity *path;
1309       size_t msize;
1310       unsigned int i;
1311
1312       /* Don't send duplicates */
1313       for (i = 0; i < cl_size; i++)
1314         if (cl[i] == m->ch)
1315           break;
1316       if (i < cl_size)
1317         continue;
1318       GNUNET_array_append (cl,
1319                            cl_size,
1320                            m->ch);
1321
1322       msize = size;
1323       msize += (get_path_length + put_path_length)
1324                * sizeof (struct GNUNET_PeerIdentity);
1325       env = GNUNET_MQ_msg_extra (mmsg,
1326                                  msize,
1327                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1328       mmsg->type = htonl(type);
1329       mmsg->put_path_length = htonl(put_path_length);
1330       mmsg->get_path_length = htonl(get_path_length);
1331       mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1332       mmsg->key = *key;
1333       path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1334       GNUNET_memcpy (path,
1335                      put_path,
1336                      put_path_length * sizeof (struct GNUNET_PeerIdentity));
1337       GNUNET_memcpy (path,
1338                      get_path,
1339                      get_path_length * sizeof (struct GNUNET_PeerIdentity));
1340       GNUNET_memcpy (&path[get_path_length],
1341                      data,
1342                      size);
1343       GNUNET_MQ_send (m->ch->mq,
1344                       env);
1345     }
1346   }
1347   GNUNET_free_non_null (cl);
1348 }
1349
1350
1351 /**
1352  * Check if some client is monitoring PUT messages and notify
1353  * them in that case.
1354  *
1355  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1356  * @param type The type of data in the request.
1357  * @param hop_count Hop count so far.
1358  * @param path_length number of entries in path (or 0 if not recorded).
1359  * @param path peers on the PUT path (or NULL if not recorded).
1360  * @param desired_replication_level Desired replication level.
1361  * @param exp Expiration time of the data.
1362  * @param key Key under which data is to be stored.
1363  * @param data Pointer to the data carried.
1364  * @param size Number of bytes in data.
1365  */
1366 void
1367 GDS_CLIENTS_process_put (uint32_t options,
1368                          enum GNUNET_BLOCK_Type type,
1369                          uint32_t hop_count,
1370                          uint32_t desired_replication_level,
1371                          unsigned int path_length,
1372                          const struct GNUNET_PeerIdentity *path,
1373                          struct GNUNET_TIME_Absolute exp,
1374                          const struct GNUNET_HashCode *key,
1375                          const void *data,
1376                          size_t size)
1377 {
1378   struct ClientMonitorRecord *m;
1379   struct ClientHandle **cl;
1380   unsigned int cl_size;
1381
1382   cl = NULL;
1383   cl_size = 0;
1384   for (m = monitor_head; NULL != m; m = m->next)
1385   {
1386     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1387         (NULL == m->key ||
1388          memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1389     {
1390       struct GNUNET_MQ_Envelope *env;
1391       struct GNUNET_DHT_MonitorPutMessage *mmsg;
1392       struct GNUNET_PeerIdentity *msg_path;
1393       size_t msize;
1394       unsigned int i;
1395
1396       /* Don't send duplicates */
1397       for (i = 0; i < cl_size; i++)
1398         if (cl[i] == m->ch)
1399           break;
1400       if (i < cl_size)
1401         continue;
1402       GNUNET_array_append (cl,
1403                            cl_size,
1404                            m->ch);
1405
1406       msize = size;
1407       msize += path_length * sizeof (struct GNUNET_PeerIdentity);
1408       env = GNUNET_MQ_msg_extra (mmsg,
1409                                  msize,
1410                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1411       mmsg->options = htonl(options);
1412       mmsg->type = htonl(type);
1413       mmsg->hop_count = htonl(hop_count);
1414       mmsg->desired_replication_level = htonl (desired_replication_level);
1415       mmsg->put_path_length = htonl (path_length);
1416       mmsg->key = *key;
1417       mmsg->expiration_time = GNUNET_TIME_absolute_hton (exp);
1418       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1419       GNUNET_memcpy (msg_path,
1420                      path,
1421                      path_length * sizeof (struct GNUNET_PeerIdentity));
1422       GNUNET_memcpy (&msg_path[path_length],
1423                      data,
1424                      size);
1425       GNUNET_MQ_send (m->ch->mq,
1426                       env);
1427     }
1428   }
1429   GNUNET_free_non_null (cl);
1430 }
1431
1432
1433 /**
1434  * Initialize client subsystem.
1435  *
1436  * @param server the initialized server
1437  */
1438 static void
1439 GDS_CLIENTS_init ()
1440 {
1441   forward_map
1442     = GNUNET_CONTAINER_multihashmap_create (1024,
1443                                             GNUNET_YES);
1444   retry_heap
1445     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1446 }
1447
1448
1449 /**
1450  * Shutdown client subsystem.
1451  */
1452 static void
1453 GDS_CLIENTS_stop ()
1454 {
1455   if (NULL != retry_task)
1456   {
1457     GNUNET_SCHEDULER_cancel (retry_task);
1458     retry_task = NULL;
1459   }
1460 }
1461
1462
1463 /**
1464  * Define "main" method using service macro.
1465  *
1466  * @param name name of the service, i.e. "dht" or "xdht"
1467  * @param run name of the initializaton method for the service
1468  */
1469 #define GDS_DHT_SERVICE_INIT(name,run)          \
1470  GNUNET_SERVICE_MAIN \
1471   (name, \
1472   GNUNET_SERVICE_OPTION_NONE, \
1473   run, \
1474   &client_connect_cb, \
1475   &client_disconnect_cb, \
1476   NULL, \
1477   GNUNET_MQ_hd_var_size (dht_local_put, \
1478                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1479                          struct GNUNET_DHT_ClientPutMessage, \
1480                          NULL), \
1481   GNUNET_MQ_hd_var_size (dht_local_get, \
1482                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1483                          struct GNUNET_DHT_ClientGetMessage, \
1484                          NULL), \
1485   GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \
1486                           GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1487                           struct GNUNET_DHT_ClientGetStopMessage, \
1488                           NULL), \
1489   GNUNET_MQ_hd_fixed_size (dht_local_monitor, \
1490                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1491                            struct GNUNET_DHT_MonitorStartStopMessage, \
1492                            NULL), \
1493   GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \
1494                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1495                            struct GNUNET_DHT_MonitorStartStopMessage, \
1496                            NULL), \
1497   GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \
1498                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1499                          struct GNUNET_DHT_ClientGetResultSeenMessage , \
1500                          NULL), \
1501   GNUNET_MQ_handler_end ())
1502
1503
1504 /**
1505  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1506  */
1507 void __attribute__ ((destructor))
1508 GDS_CLIENTS_done ()
1509 {
1510   if (NULL != retry_heap)
1511   {
1512     GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1513     GNUNET_CONTAINER_heap_destroy (retry_heap);
1514     retry_heap = NULL;
1515   }
1516   if (NULL != forward_map)
1517   {
1518     GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
1519     GNUNET_CONTAINER_multihashmap_destroy (forward_map);
1520     forward_map = NULL;
1521   }
1522 }
1523
1524 /* end of gnunet-service-dht_clients.c */