uncrustify as demanded.
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-service-dht.h"
33 #include "gnunet-service-dht_datacache.h"
34 #include "gnunet-service-dht_neighbours.h"
35 #include "dht.h"
36
37
38 /**
39  * Should routing details be logged to stderr (for debugging)?
40  */
41 #define LOG_TRAFFIC(kind, ...) GNUNET_log_from(kind, "dht-traffic", __VA_ARGS__)
42
43 #define LOG(kind, ...) GNUNET_log_from(kind, "dht-clients", __VA_ARGS__)
44
45
46 /**
47  * Struct containing information about a client,
48  * handle to connect to it, and any pending messages
49  * that need to be sent to it.
50  */
51 struct ClientHandle;
52
53
54 /**
55  * Entry in the local forwarding map for a client's GET request.
56  */
57 struct ClientQueryRecord {
58   /**
59    * The key this request was about
60    */
61   struct GNUNET_HashCode key;
62
63   /**
64    * Kept in a DLL with @e client.
65    */
66   struct ClientQueryRecord *next;
67
68   /**
69    * Kept in a DLL with @e client.
70    */
71   struct ClientQueryRecord *prev;
72
73   /**
74    * Client responsible for the request.
75    */
76   struct ClientHandle *ch;
77
78   /**
79    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
80    */
81   const void *xquery;
82
83   /**
84    * Replies we have already seen for this request.
85    */
86   struct GNUNET_HashCode *seen_replies;
87
88   /**
89    * Pointer to this nodes heap location in the retry-heap (for fast removal)
90    */
91   struct GNUNET_CONTAINER_HeapNode *hnode;
92
93   /**
94    * What's the delay between re-try operations that we currently use for this
95    * request?
96    */
97   struct GNUNET_TIME_Relative retry_frequency;
98
99   /**
100    * What's the next time we should re-try this request?
101    */
102   struct GNUNET_TIME_Absolute retry_time;
103
104   /**
105    * The unique identifier of this request
106    */
107   uint64_t unique_id;
108
109   /**
110    * Number of bytes in xquery.
111    */
112   size_t xquery_size;
113
114   /**
115    * Number of entries in 'seen_replies'.
116    */
117   unsigned int seen_replies_count;
118
119   /**
120    * Desired replication level
121    */
122   uint32_t replication;
123
124   /**
125    * Any message options for this request
126    */
127   uint32_t msg_options;
128
129   /**
130    * The type for the data for the GET request.
131    */
132   enum GNUNET_BLOCK_Type type;
133 };
134
135
136 /**
137  * Struct containing paremeters of monitoring requests.
138  */
139 struct ClientMonitorRecord {
140   /**
141    * Next element in DLL.
142    */
143   struct ClientMonitorRecord *next;
144
145   /**
146    * Previous element in DLL.
147    */
148   struct ClientMonitorRecord *prev;
149
150   /**
151    * Type of blocks that are of interest
152    */
153   enum GNUNET_BLOCK_Type type;
154
155   /**
156    * Key of data of interest, NULL for all.
157    */
158   struct GNUNET_HashCode *key;
159
160   /**
161    * Flag whether to notify about GET messages.
162    */
163   int16_t get;
164
165   /**
166    * Flag whether to notify about GET_REPONSE messages.
167    */
168   int16_t get_resp;
169
170   /**
171    * Flag whether to notify about PUT messages.
172    */
173   uint16_t put;
174
175   /**
176    * Client to notify of these requests.
177    */
178   struct ClientHandle *ch;
179 };
180
181
182 /**
183  * Struct containing information about a client,
184  * handle to connect to it, and any pending messages
185  * that need to be sent to it.
186  */
187 struct ClientHandle {
188   /**
189    * Linked list of active queries of this client.
190    */
191   struct ClientQueryRecord *cqr_head;
192
193   /**
194    * Linked list of active queries of this client.
195    */
196   struct ClientQueryRecord *cqr_tail;
197
198   /**
199    * The handle to this client
200    */
201   struct GNUNET_SERVICE_Client *client;
202
203   /**
204    * The message queue to this client
205    */
206   struct GNUNET_MQ_Handle *mq;
207 };
208
209 /**
210  * Our handle to the BLOCK library.
211  */
212 struct GNUNET_BLOCK_Context *GDS_block_context;
213
214 /**
215  * Handle for the statistics service.
216  */
217 struct GNUNET_STATISTICS_Handle *GDS_stats;
218
219 /**
220  * Handle for the service.
221  */
222 struct GNUNET_SERVICE_Handle *GDS_service;
223
224 /**
225  * The configuration the DHT service is running with
226  */
227 const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
228
229 /**
230  * List of active monitoring requests.
231  */
232 static struct ClientMonitorRecord *monitor_head;
233
234 /**
235  * List of active monitoring requests.
236  */
237 static struct ClientMonitorRecord *monitor_tail;
238
239 /**
240  * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
241  */
242 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
243
244 /**
245  * Heap with all of our client's request, sorted by retry time (earliest on top).
246  */
247 static struct GNUNET_CONTAINER_Heap *retry_heap;
248
249 /**
250  * Task that re-transmits requests (using retry_heap).
251  */
252 static struct GNUNET_SCHEDULER_Task *retry_task;
253
254
255 /**
256  * Free data structures associated with the given query.
257  *
258  * @param record record to remove
259  */
260 static void
261 remove_client_record(struct ClientQueryRecord *record)
262 {
263   struct ClientHandle *ch = record->ch;
264
265   GNUNET_CONTAINER_DLL_remove(ch->cqr_head,
266                               ch->cqr_tail,
267                               record);
268   GNUNET_assert(GNUNET_YES ==
269                 GNUNET_CONTAINER_multihashmap_remove(forward_map,
270                                                      &record->key,
271                                                      record));
272   if (NULL != record->hnode)
273     GNUNET_CONTAINER_heap_remove_node(record->hnode);
274   GNUNET_array_grow(record->seen_replies,
275                     record->seen_replies_count,
276                     0);
277   GNUNET_free(record);
278 }
279
280
281 /**
282  * Functions with this signature are called whenever a local client is
283  * connects to us.
284  *
285  * @param cls closure (NULL for dht)
286  * @param client identification of the client
287  * @param mq message queue for talking to @a client
288  * @return our `struct ClientHandle` for @a client
289  */
290 static void *
291 client_connect_cb(void *cls,
292                   struct GNUNET_SERVICE_Client *client,
293                   struct GNUNET_MQ_Handle *mq)
294 {
295   struct ClientHandle *ch;
296
297   ch = GNUNET_new(struct ClientHandle);
298   ch->client = client;
299   ch->mq = mq;
300   return ch;
301 }
302
303
304 /**
305  * Functions with this signature are called whenever a client
306  * is disconnected on the network level.
307  *
308  * @param cls closure (NULL for dht)
309  * @param client identification of the client
310  * @param app_ctx our `struct ClientHandle` for @a client
311  */
312 static void
313 client_disconnect_cb(void *cls,
314                      struct GNUNET_SERVICE_Client *client,
315                      void *app_ctx)
316 {
317   struct ClientHandle *ch = app_ctx;
318   struct ClientQueryRecord *cqr;
319   struct ClientMonitorRecord *monitor;
320
321   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
322              "Local client %p disconnects\n",
323              ch);
324   monitor = monitor_head;
325   while (NULL != monitor)
326     {
327       if (monitor->ch == ch)
328         {
329           struct ClientMonitorRecord *next;
330
331           next = monitor->next;
332           GNUNET_free_non_null(monitor->key);
333           GNUNET_CONTAINER_DLL_remove(monitor_head,
334                                       monitor_tail,
335                                       monitor);
336           GNUNET_free(monitor);
337           monitor = next;
338         }
339       else
340         {
341           monitor = monitor->next;
342         }
343     }
344   while (NULL != (cqr = ch->cqr_head))
345     remove_client_record(cqr);
346   GNUNET_free(ch);
347 }
348
349
350 /**
351  * Route the given request via the DHT.  This includes updating
352  * the bloom filter and retransmission times, building the P2P
353  * message and initiating the routing operation.
354  */
355 static void
356 transmit_request(struct ClientQueryRecord *cqr)
357 {
358   struct GNUNET_BLOCK_Group *bg;
359   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
360
361   GNUNET_STATISTICS_update(GDS_stats,
362                            gettext_noop("# GET requests from clients injected"),
363                            1,
364                            GNUNET_NO);
365   bg = GNUNET_BLOCK_group_create(GDS_block_context,
366                                  cqr->type,
367                                  GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
368                                                           UINT32_MAX),
369                                  NULL,
370                                  0,
371                                  "seen-set-size",
372                                  cqr->seen_replies_count,
373                                  NULL);
374   GNUNET_BLOCK_group_set_seen(bg,
375                               cqr->seen_replies,
376                               cqr->seen_replies_count);
377   peer_bf
378     = GNUNET_CONTAINER_bloomfilter_init(NULL,
379                                         DHT_BLOOM_SIZE,
380                                         GNUNET_CONSTANTS_BLOOMFILTER_K);
381   LOG(GNUNET_ERROR_TYPE_DEBUG,
382       "Initiating GET for %s, replication %u, already have %u replies\n",
383       GNUNET_h2s(&cqr->key),
384       cqr->replication,
385       cqr->seen_replies_count);
386   GDS_NEIGHBOURS_handle_get(cqr->type,
387                             cqr->msg_options,
388                             cqr->replication,
389                             0 /* hop count */,
390                             &cqr->key,
391                             cqr->xquery,
392                             cqr->xquery_size,
393                             bg,
394                             peer_bf);
395   GNUNET_BLOCK_group_destroy(bg);
396   GNUNET_CONTAINER_bloomfilter_free(peer_bf);
397
398   /* exponential back-off for retries.
399    * max GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
400   cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF(cqr->retry_frequency);
401   cqr->retry_time = GNUNET_TIME_relative_to_absolute(cqr->retry_frequency);
402 }
403
404
405 /**
406  * Task that looks at the #retry_heap and transmits all of the requests
407  * on the heap that are ready for transmission.  Then re-schedules
408  * itself (unless the heap is empty).
409  *
410  * @param cls unused
411  */
412 static void
413 transmit_next_request_task(void *cls)
414 {
415   struct ClientQueryRecord *cqr;
416   struct GNUNET_TIME_Relative delay;
417
418   retry_task = NULL;
419   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root(retry_heap)))
420     {
421       cqr->hnode = NULL;
422       delay = GNUNET_TIME_absolute_get_remaining(cqr->retry_time);
423       if (delay.rel_value_us > 0)
424         {
425           cqr->hnode
426             = GNUNET_CONTAINER_heap_insert(retry_heap,
427                                            cqr,
428                                            cqr->retry_time.abs_value_us);
429           retry_task
430             = GNUNET_SCHEDULER_add_at(cqr->retry_time,
431                                       &transmit_next_request_task,
432                                       NULL);
433           return;
434         }
435       transmit_request(cqr);
436       cqr->hnode
437         = GNUNET_CONTAINER_heap_insert(retry_heap,
438                                        cqr,
439                                        cqr->retry_time.abs_value_us);
440     }
441 }
442
443
444 /**
445  * Check DHT PUT messages from the client.
446  *
447  * @param cls the client we received this message from
448  * @param dht_msg the actual message received
449  * @return #GNUNET_OK (always)
450  */
451 static int
452 check_dht_local_put(void *cls,
453                     const struct GNUNET_DHT_ClientPutMessage *dht_msg)
454 {
455   /* always well-formed */
456   return GNUNET_OK;
457 }
458
459
460 /**
461  * Handler for PUT messages.
462  *
463  * @param cls the client we received this message from
464  * @param dht_msg the actual message received
465  */
466 static void
467 handle_dht_local_put(void *cls,
468                      const struct GNUNET_DHT_ClientPutMessage *dht_msg)
469 {
470   struct ClientHandle *ch = cls;
471   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
472   uint16_t size;
473
474   size = ntohs(dht_msg->header.size);
475   GNUNET_STATISTICS_update(GDS_stats,
476                            gettext_noop("# PUT requests received from clients"),
477                            1,
478                            GNUNET_NO);
479   LOG_TRAFFIC(GNUNET_ERROR_TYPE_DEBUG,
480               "CLIENT-PUT %s\n",
481               GNUNET_h2s_full(&dht_msg->key));
482   /* give to local clients */
483   LOG(GNUNET_ERROR_TYPE_DEBUG,
484       "Handling local PUT of %u-bytes for query %s\n",
485       size - sizeof(struct GNUNET_DHT_ClientPutMessage),
486       GNUNET_h2s(&dht_msg->key));
487   GDS_CLIENTS_handle_reply(GNUNET_TIME_absolute_ntoh(dht_msg->expiration),
488                            &dht_msg->key,
489                            0,
490                            NULL,
491                            0,
492                            NULL,
493                            ntohl(dht_msg->type),
494                            size - sizeof(struct GNUNET_DHT_ClientPutMessage),
495                            &dht_msg[1]);
496   /* store locally */
497   GDS_DATACACHE_handle_put(GNUNET_TIME_absolute_ntoh(dht_msg->expiration),
498                            &dht_msg->key,
499                            0,
500                            NULL,
501                            ntohl(dht_msg->type),
502                            size - sizeof(struct GNUNET_DHT_ClientPutMessage),
503                            &dht_msg[1]);
504   /* route to other peers */
505   peer_bf
506     = GNUNET_CONTAINER_bloomfilter_init(NULL,
507                                         DHT_BLOOM_SIZE,
508                                         GNUNET_CONSTANTS_BLOOMFILTER_K);
509   GDS_NEIGHBOURS_handle_put(ntohl(dht_msg->type),
510                             ntohl(dht_msg->options),
511                             ntohl(dht_msg->desired_replication_level),
512                             GNUNET_TIME_absolute_ntoh(dht_msg->expiration),
513                             0 /* hop count */,
514                             peer_bf,
515                             &dht_msg->key,
516                             0,
517                             NULL,
518                             &dht_msg[1],
519                             size - sizeof(struct GNUNET_DHT_ClientPutMessage));
520   GDS_CLIENTS_process_put(ntohl(dht_msg->options),
521                           ntohl(dht_msg->type),
522                           0,
523                           ntohl(dht_msg->desired_replication_level),
524                           1,
525                           GDS_NEIGHBOURS_get_id(),
526                           GNUNET_TIME_absolute_ntoh(dht_msg->expiration),
527                           &dht_msg->key,
528                           &dht_msg[1],
529                           size - sizeof(struct GNUNET_DHT_ClientPutMessage));
530   GNUNET_CONTAINER_bloomfilter_free(peer_bf);
531   GNUNET_SERVICE_client_continue(ch->client);
532 }
533
534
535 /**
536  * Check DHT GET messages from the client.
537  *
538  * @param cls the client we received this message from
539  * @param message the actual message received
540  * @return #GNUNET_OK (always)
541  */
542 static int
543 check_dht_local_get(void *cls,
544                     const struct GNUNET_DHT_ClientGetMessage *get)
545 {
546   /* always well-formed */
547   return GNUNET_OK;
548 }
549
550
551 /**
552  * Handle a result from local datacache for a GET operation.
553  *
554  * @param cls the `struct ClientHandle` of the client doing the query
555  * @param type type of the block
556  * @param expiration_time when does the content expire
557  * @param key key for the content
558  * @param put_path_length number of entries in @a put_path
559  * @param put_path peers the original PUT traversed (if tracked)
560  * @param get_path_length number of entries in @a get_path
561  * @param get_path peers this reply has traversed so far (if tracked)
562  * @param data payload of the reply
563  * @param data_size number of bytes in @a data
564  */
565 static void
566 handle_local_result(void *cls,
567                     enum GNUNET_BLOCK_Type type,
568                     struct GNUNET_TIME_Absolute expiration_time,
569                     const struct GNUNET_HashCode *key,
570                     unsigned int put_path_length,
571                     const struct GNUNET_PeerIdentity *put_path,
572                     unsigned int get_path_length,
573                     const struct GNUNET_PeerIdentity *get_path,
574                     const void *data,
575                     size_t data_size)
576 {
577   // FIXME: this needs some clean up: inline the function,
578   // possibly avoid even looking up the client!
579   GDS_CLIENTS_handle_reply(expiration_time,
580                            key,
581                            0, NULL,
582                            put_path_length, put_path,
583                            type,
584                            data_size, data);
585 }
586
587
588 /**
589  * Handler for DHT GET messages from the client.
590  *
591  * @param cls the client we received this message from
592  * @param message the actual message received
593  */
594 static void
595 handle_dht_local_get(void *cls,
596                      const struct GNUNET_DHT_ClientGetMessage *get)
597 {
598   struct ClientHandle *ch = cls;
599   struct ClientQueryRecord *cqr;
600   size_t xquery_size;
601   const char *xquery;
602   uint16_t size;
603
604   size = ntohs(get->header.size);
605   xquery_size = size - sizeof(struct GNUNET_DHT_ClientGetMessage);
606   xquery = (const char *)&get[1];
607   GNUNET_STATISTICS_update(GDS_stats,
608                            gettext_noop
609                              ("# GET requests received from clients"), 1,
610                            GNUNET_NO);
611   LOG(GNUNET_ERROR_TYPE_DEBUG,
612       "Received GET request for %s from local client %p, xq: %.*s\n",
613       GNUNET_h2s(&get->key),
614       ch->client,
615       xquery_size,
616       xquery);
617   LOG_TRAFFIC(GNUNET_ERROR_TYPE_DEBUG,
618               "CLIENT-GET %s\n",
619               GNUNET_h2s_full(&get->key));
620
621   cqr = GNUNET_malloc(sizeof(struct ClientQueryRecord) + xquery_size);
622   cqr->key = get->key;
623   cqr->ch = ch;
624   cqr->xquery = (void *)&cqr[1];
625   GNUNET_memcpy(&cqr[1], xquery, xquery_size);
626   cqr->hnode = GNUNET_CONTAINER_heap_insert(retry_heap, cqr, 0);
627   cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
628   cqr->retry_time = GNUNET_TIME_absolute_get();
629   cqr->unique_id = get->unique_id;
630   cqr->xquery_size = xquery_size;
631   cqr->replication = ntohl(get->desired_replication_level);
632   cqr->msg_options = ntohl(get->options);
633   cqr->type = ntohl(get->type);
634   GNUNET_CONTAINER_DLL_insert(ch->cqr_head,
635                               ch->cqr_tail,
636                               cqr);
637   GNUNET_CONTAINER_multihashmap_put(forward_map,
638                                     &cqr->key,
639                                     cqr,
640                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
641   GDS_CLIENTS_process_get(ntohl(get->options),
642                           ntohl(get->type),
643                           0,
644                           ntohl(get->desired_replication_level),
645                           1,
646                           GDS_NEIGHBOURS_get_id(),
647                           &get->key);
648   /* start remote requests */
649   if (NULL != retry_task)
650     GNUNET_SCHEDULER_cancel(retry_task);
651   retry_task = GNUNET_SCHEDULER_add_now(&transmit_next_request_task,
652                                         NULL);
653   /* perform local lookup */
654   GDS_DATACACHE_handle_get(&get->key,
655                            cqr->type,
656                            cqr->xquery,
657                            xquery_size,
658                            NULL,
659                            &handle_local_result,
660                            ch);
661   GNUNET_SERVICE_client_continue(ch->client);
662 }
663
664
665 /**
666  * Closure for #find_by_unique_id().
667  */
668 struct FindByUniqueIdContext {
669   /**
670    * Where to store the result, if found.
671    */
672   struct ClientQueryRecord *cqr;
673
674   uint64_t unique_id;
675 };
676
677
678 /**
679  * Function called for each existing DHT record for the given
680  * query.  Checks if it matches the UID given in the closure
681  * and if so returns the entry as a result.
682  *
683  * @param cls the search context
684  * @param key query for the lookup (not used)
685  * @param value the `struct ClientQueryRecord`
686  * @return #GNUNET_YES to continue iteration (result not yet found)
687  */
688 static int
689 find_by_unique_id(void *cls,
690                   const struct GNUNET_HashCode *key,
691                   void *value)
692 {
693   struct FindByUniqueIdContext *fui_ctx = cls;
694   struct ClientQueryRecord *cqr = value;
695
696   if (cqr->unique_id != fui_ctx->unique_id)
697     return GNUNET_YES;
698   fui_ctx->cqr = cqr;
699   return GNUNET_NO;
700 }
701
702
703 /**
704  * Check "GET result seen" messages from the client.
705  *
706  * @param cls the client we received this message from
707  * @param message the actual message received
708  * @return #GNUNET_OK if @a seen is well-formed
709  */
710 static int
711 check_dht_local_get_result_seen(void *cls,
712                                 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
713 {
714   uint16_t size;
715   unsigned int hash_count;
716
717   size = ntohs(seen->header.size);
718   hash_count = (size - sizeof(struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof(struct GNUNET_HashCode);
719   if (size != sizeof(struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof(struct GNUNET_HashCode))
720     {
721       GNUNET_break(0);
722       return GNUNET_SYSERR;
723     }
724   return GNUNET_OK;
725 }
726
727
728 /**
729  * Handler for "GET result seen" messages from the client.
730  *
731  * @param cls the client we received this message from
732  * @param message the actual message received
733  */
734 static void
735 handle_dht_local_get_result_seen(void *cls,
736                                  const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
737 {
738   struct ClientHandle *ch = cls;
739   uint16_t size;
740   unsigned int hash_count;
741   unsigned int old_count;
742   const struct GNUNET_HashCode *hc;
743   struct FindByUniqueIdContext fui_ctx;
744   struct ClientQueryRecord *cqr;
745
746   size = ntohs(seen->header.size);
747   hash_count = (size - sizeof(struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof(struct GNUNET_HashCode);
748   hc = (const struct GNUNET_HashCode*)&seen[1];
749   fui_ctx.unique_id = seen->unique_id;
750   fui_ctx.cqr = NULL;
751   GNUNET_CONTAINER_multihashmap_get_multiple(forward_map,
752                                              &seen->key,
753                                              &find_by_unique_id,
754                                              &fui_ctx);
755   if (NULL == (cqr = fui_ctx.cqr))
756     {
757       GNUNET_break(0);
758       GNUNET_SERVICE_client_drop(ch->client);
759       return;
760     }
761   /* finally, update 'seen' list */
762   old_count = cqr->seen_replies_count;
763   GNUNET_array_grow(cqr->seen_replies,
764                     cqr->seen_replies_count,
765                     cqr->seen_replies_count + hash_count);
766   GNUNET_memcpy(&cqr->seen_replies[old_count],
767                 hc,
768                 sizeof(struct GNUNET_HashCode) * hash_count);
769 }
770
771
772 /**
773  * Closure for #remove_by_unique_id().
774  */
775 struct RemoveByUniqueIdContext {
776   /**
777    * Client that issued the removal request.
778    */
779   struct ClientHandle *ch;
780
781   /**
782    * Unique ID of the request.
783    */
784   uint64_t unique_id;
785 };
786
787
788 /**
789  * Iterator over hash map entries that frees all entries
790  * that match the given client and unique ID.
791  *
792  * @param cls unique ID and client to search for in source routes
793  * @param key current key code
794  * @param value value in the hash map, a ClientQueryRecord
795  * @return #GNUNET_YES (we should continue to iterate)
796  */
797 static int
798 remove_by_unique_id(void *cls,
799                     const struct GNUNET_HashCode *key,
800                     void *value)
801 {
802   const struct RemoveByUniqueIdContext *ctx = cls;
803   struct ClientQueryRecord *cqr = value;
804
805   if (cqr->unique_id != ctx->unique_id)
806     return GNUNET_YES;
807   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
808              "Removing client %p's record for key %s (by unique id)\n",
809              ctx->ch->client,
810              GNUNET_h2s(key));
811   remove_client_record(cqr);
812   return GNUNET_YES;
813 }
814
815
816 /**
817  * Handler for any generic DHT stop messages, calls the appropriate handler
818  * depending on message type (if processed locally)
819  *
820  * @param cls client we received this message from
821  * @param message the actual message received
822  *
823  */
824 static void
825 handle_dht_local_get_stop(void *cls,
826                           const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
827 {
828   struct ClientHandle *ch = cls;
829   struct RemoveByUniqueIdContext ctx;
830
831   GNUNET_STATISTICS_update(GDS_stats,
832                            gettext_noop
833                              ("# GET STOP requests received from clients"), 1,
834                            GNUNET_NO);
835   LOG(GNUNET_ERROR_TYPE_DEBUG,
836       "Received GET STOP request for %s from local client %p\n",
837       GNUNET_h2s(&dht_stop_msg->key),
838       ch->client);
839   ctx.ch = ch;
840   ctx.unique_id = dht_stop_msg->unique_id;
841   GNUNET_CONTAINER_multihashmap_get_multiple(forward_map,
842                                              &dht_stop_msg->key,
843                                              &remove_by_unique_id,
844                                              &ctx);
845   GNUNET_SERVICE_client_continue(ch->client);
846 }
847
848
849 /**
850  * Handler for monitor start messages
851  *
852  * @param cls the client we received this message from
853  * @param msg the actual message received
854  *
855  */
856 static void
857 handle_dht_local_monitor(void *cls,
858                          const struct GNUNET_DHT_MonitorStartStopMessage *msg)
859 {
860   struct ClientHandle *ch = cls;
861   struct ClientMonitorRecord *r;
862
863   r = GNUNET_new(struct ClientMonitorRecord);
864   r->ch = ch;
865   r->type = ntohl(msg->type);
866   r->get = ntohs(msg->get);
867   r->get_resp = ntohs(msg->get_resp);
868   r->put = ntohs(msg->put);
869   if (0 == ntohs(msg->filter_key))
870     {
871       r->key = NULL;
872     }
873   else
874     {
875       r->key = GNUNET_new(struct GNUNET_HashCode);
876       GNUNET_memcpy(r->key,
877                     &msg->key,
878                     sizeof(struct GNUNET_HashCode));
879     }
880   GNUNET_CONTAINER_DLL_insert(monitor_head,
881                               monitor_tail,
882                               r);
883   GNUNET_SERVICE_client_continue(ch->client);
884 }
885
886
887 /**
888  * Handler for monitor stop messages
889  *
890  * @param cls the client we received this message from
891  * @param msg the actual message received
892  */
893 static void
894 handle_dht_local_monitor_stop(void *cls,
895                               const struct GNUNET_DHT_MonitorStartStopMessage *msg)
896 {
897   struct ClientHandle *ch = cls;
898   struct ClientMonitorRecord *r;
899   int keys_match;
900
901   GNUNET_SERVICE_client_continue(ch->client);
902   for (r = monitor_head; NULL != r; r = r->next)
903     {
904       if (NULL == r->key)
905         {
906           keys_match = (0 == ntohs(msg->filter_key));
907         }
908       else
909         {
910           keys_match = ((0 != ntohs(msg->filter_key)) &&
911                         (!memcmp(r->key,
912                                  &msg->key,
913                                  sizeof(struct GNUNET_HashCode))));
914         }
915       if ((ch == r->ch) &&
916           (ntohl(msg->type) == r->type) &&
917           (r->get == msg->get) &&
918           (r->get_resp == msg->get_resp) &&
919           (r->put == msg->put) &&
920           keys_match)
921         {
922           GNUNET_CONTAINER_DLL_remove(monitor_head,
923                                       monitor_tail,
924                                       r);
925           GNUNET_free_non_null(r->key);
926           GNUNET_free(r);
927           return; /* Delete only ONE entry */
928         }
929     }
930 }
931
932
933 /**
934  * Closure for #forward_reply()
935  */
936 struct ForwardReplyContext {
937   /**
938    * Expiration time of the reply.
939    */
940   struct GNUNET_TIME_Absolute expiration;
941
942   /**
943    * GET path taken.
944    */
945   const struct GNUNET_PeerIdentity *get_path;
946
947   /**
948    * PUT path taken.
949    */
950   const struct GNUNET_PeerIdentity *put_path;
951
952   /**
953    * Embedded payload.
954    */
955   const void *data;
956
957   /**
958    * Number of bytes in data.
959    */
960   size_t data_size;
961
962   /**
963    * Number of entries in @e get_path.
964    */
965   unsigned int get_path_length;
966
967   /**
968    * Number of entries in @e put_path.
969    */
970   unsigned int put_path_length;
971
972   /**
973    * Type of the data.
974    */
975   enum GNUNET_BLOCK_Type type;
976 };
977
978
979 /**
980  * Iterator over hash map entries that send a given reply to
981  * each of the matching clients.  With some tricky recycling
982  * of the buffer.
983  *
984  * @param cls the 'struct ForwardReplyContext'
985  * @param key current key
986  * @param value value in the hash map, a ClientQueryRecord
987  * @return #GNUNET_YES (we should continue to iterate),
988  *         if the result is mal-formed, #GNUNET_NO
989  */
990 static int
991 forward_reply(void *cls,
992               const struct GNUNET_HashCode *key,
993               void *value)
994 {
995   struct ForwardReplyContext *frc = cls;
996   struct ClientQueryRecord *record = value;
997   struct GNUNET_MQ_Envelope *env;
998   struct GNUNET_DHT_ClientResultMessage *reply;
999   enum GNUNET_BLOCK_EvaluationResult eval;
1000   int do_free;
1001   struct GNUNET_HashCode ch;
1002   struct GNUNET_PeerIdentity *paths;
1003
1004   LOG_TRAFFIC(GNUNET_ERROR_TYPE_DEBUG,
1005               "CLIENT-RESULT %s\n",
1006               GNUNET_h2s_full(key));
1007   if ((record->type != GNUNET_BLOCK_TYPE_ANY) &&
1008       (record->type != frc->type))
1009     {
1010       LOG(GNUNET_ERROR_TYPE_DEBUG,
1011           "Record type mismatch, not passing request for key %s to local client\n",
1012           GNUNET_h2s(key));
1013       GNUNET_STATISTICS_update(GDS_stats,
1014                                gettext_noop
1015                                  ("# Key match, type mismatches in REPLY to CLIENT"),
1016                                1, GNUNET_NO);
1017       return GNUNET_YES;        /* type mismatch */
1018     }
1019   GNUNET_CRYPTO_hash(frc->data, frc->data_size, &ch);
1020   for (unsigned int i = 0; i < record->seen_replies_count; i++)
1021     if (0 == memcmp(&record->seen_replies[i],
1022                     &ch,
1023                     sizeof(struct GNUNET_HashCode)))
1024       {
1025         LOG(GNUNET_ERROR_TYPE_DEBUG,
1026             "Duplicate reply, not passing request for key %s to local client\n",
1027             GNUNET_h2s(key));
1028         GNUNET_STATISTICS_update(GDS_stats,
1029                                  gettext_noop
1030                                    ("# Duplicate REPLIES to CLIENT request dropped"),
1031                                  1, GNUNET_NO);
1032         return GNUNET_YES;      /* duplicate */
1033       }
1034   eval
1035     = GNUNET_BLOCK_evaluate(GDS_block_context,
1036                             record->type,
1037                             NULL,
1038                             GNUNET_BLOCK_EO_NONE,
1039                             key,
1040                             record->xquery,
1041                             record->xquery_size,
1042                             frc->data,
1043                             frc->data_size);
1044   LOG(GNUNET_ERROR_TYPE_DEBUG,
1045       "Evaluation result is %d for key %s for local client's query\n",
1046       (int)eval,
1047       GNUNET_h2s(key));
1048   switch (eval)
1049     {
1050     case GNUNET_BLOCK_EVALUATION_OK_LAST:
1051       do_free = GNUNET_YES;
1052       break;
1053
1054     case GNUNET_BLOCK_EVALUATION_OK_MORE:
1055       GNUNET_array_append(record->seen_replies,
1056                           record->seen_replies_count,
1057                           ch);
1058       do_free = GNUNET_NO;
1059       break;
1060
1061     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1062       /* should be impossible to encounter here */
1063       GNUNET_break(0);
1064       return GNUNET_YES;
1065
1066     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1067       GNUNET_break_op(0);
1068       return GNUNET_NO;
1069
1070     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1071       GNUNET_break(0);
1072       return GNUNET_NO;
1073
1074     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1075       GNUNET_break(0);
1076       return GNUNET_NO;
1077
1078     case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1079       return GNUNET_YES;
1080
1081     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1082       GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
1083                  _("Unsupported block type (%u) in request!\n"), record->type);
1084       return GNUNET_NO;
1085
1086     default:
1087       GNUNET_break(0);
1088       return GNUNET_NO;
1089     }
1090   GNUNET_STATISTICS_update(GDS_stats,
1091                            gettext_noop("# RESULTS queued for clients"),
1092                            1,
1093                            GNUNET_NO);
1094   env = GNUNET_MQ_msg_extra(reply,
1095                             frc->data_size +
1096                             (frc->get_path_length + frc->put_path_length) * sizeof(struct GNUNET_PeerIdentity),
1097                             GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1098   reply->type = htonl(frc->type);
1099   reply->get_path_length = htonl(frc->get_path_length);
1100   reply->put_path_length = htonl(frc->put_path_length);
1101   reply->unique_id = record->unique_id;
1102   reply->expiration = GNUNET_TIME_absolute_hton(frc->expiration);
1103   reply->key = *key;
1104   paths = (struct GNUNET_PeerIdentity *)&reply[1];
1105   GNUNET_memcpy(paths,
1106                 frc->put_path,
1107                 sizeof(struct GNUNET_PeerIdentity) * frc->put_path_length);
1108   GNUNET_memcpy(&paths[frc->put_path_length],
1109                 frc->get_path,
1110                 sizeof(struct GNUNET_PeerIdentity) * frc->get_path_length);
1111   GNUNET_memcpy(&paths[frc->get_path_length + frc->put_path_length],
1112                 frc->data,
1113                 frc->data_size);
1114   LOG(GNUNET_ERROR_TYPE_DEBUG,
1115       "Sending reply to query %s for client %p\n",
1116       GNUNET_h2s(key),
1117       record->ch->client);
1118   GNUNET_MQ_send(record->ch->mq,
1119                  env);
1120   if (GNUNET_YES == do_free)
1121     remove_client_record(record);
1122   return GNUNET_YES;
1123 }
1124
1125
1126 /**
1127  * Handle a reply we've received from another peer.  If the reply
1128  * matches any of our pending queries, forward it to the respective
1129  * client(s).
1130  *
1131  * @param expiration when will the reply expire
1132  * @param key the query this reply is for
1133  * @param get_path_length number of peers in @a get_path
1134  * @param get_path path the reply took on get
1135  * @param put_path_length number of peers in @a put_path
1136  * @param put_path path the reply took on put
1137  * @param type type of the reply
1138  * @param data_size number of bytes in @a data
1139  * @param data application payload data
1140  */
1141 void
1142 GDS_CLIENTS_handle_reply(struct GNUNET_TIME_Absolute expiration,
1143                          const struct GNUNET_HashCode *key,
1144                          unsigned int get_path_length,
1145                          const struct GNUNET_PeerIdentity *get_path,
1146                          unsigned int put_path_length,
1147                          const struct GNUNET_PeerIdentity *put_path,
1148                          enum GNUNET_BLOCK_Type type,
1149                          size_t data_size,
1150                          const void *data)
1151 {
1152   struct ForwardReplyContext frc;
1153   size_t msize;
1154
1155   msize = sizeof(struct GNUNET_DHT_ClientResultMessage) + data_size +
1156           (get_path_length + put_path_length) * sizeof(struct GNUNET_PeerIdentity);
1157   if (msize >= GNUNET_MAX_MESSAGE_SIZE)
1158     {
1159       GNUNET_break(0);
1160       return;
1161     }
1162   if (NULL == GNUNET_CONTAINER_multihashmap_get(forward_map,
1163                                                 key))
1164     {
1165       LOG(GNUNET_ERROR_TYPE_DEBUG,
1166           "No matching client for reply for key %s\n",
1167           GNUNET_h2s(key));
1168       GNUNET_STATISTICS_update(GDS_stats,
1169                                gettext_noop("# REPLIES ignored for CLIENTS (no match)"),
1170                                1,
1171                                GNUNET_NO);
1172       return;                   /* no matching request, fast exit! */
1173     }
1174   frc.expiration = expiration;
1175   frc.get_path = get_path;
1176   frc.put_path = put_path;
1177   frc.data = data;
1178   frc.data_size = data_size;
1179   frc.get_path_length = get_path_length;
1180   frc.put_path_length = put_path_length;
1181   frc.type = type;
1182   LOG(GNUNET_ERROR_TYPE_DEBUG,
1183       "Forwarding reply for key %s to client\n",
1184       GNUNET_h2s(key));
1185   GNUNET_CONTAINER_multihashmap_get_multiple(forward_map,
1186                                              key,
1187                                              &forward_reply,
1188                                              &frc);
1189 }
1190
1191
1192 /**
1193  * Check if some client is monitoring GET messages and notify
1194  * them in that case.
1195  *
1196  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1197  * @param type The type of data in the request.
1198  * @param hop_count Hop count so far.
1199  * @param path_length number of entries in path (or 0 if not recorded).
1200  * @param path peers on the GET path (or NULL if not recorded).
1201  * @param desired_replication_level Desired replication level.
1202  * @param key Key of the requested data.
1203  */
1204 void
1205 GDS_CLIENTS_process_get(uint32_t options,
1206                         enum GNUNET_BLOCK_Type type,
1207                         uint32_t hop_count,
1208                         uint32_t desired_replication_level,
1209                         unsigned int path_length,
1210                         const struct GNUNET_PeerIdentity *path,
1211                         const struct GNUNET_HashCode * key)
1212 {
1213   struct ClientMonitorRecord *m;
1214   struct ClientHandle **cl;
1215   unsigned int cl_size;
1216
1217   cl = NULL;
1218   cl_size = 0;
1219   for (m = monitor_head; NULL != m; m = m->next)
1220     {
1221       if (((GNUNET_BLOCK_TYPE_ANY == m->type) ||
1222            (m->type == type)) &&
1223           ((NULL == m->key) ||
1224            (0 == memcmp(key,
1225                         m->key,
1226                         sizeof(struct GNUNET_HashCode)))))
1227         {
1228           struct GNUNET_MQ_Envelope *env;
1229           struct GNUNET_DHT_MonitorGetMessage *mmsg;
1230           struct GNUNET_PeerIdentity *msg_path;
1231           size_t msize;
1232           unsigned int i;
1233
1234           /* Don't send duplicates */
1235           for (i = 0; i < cl_size; i++)
1236             if (cl[i] == m->ch)
1237               break;
1238           if (i < cl_size)
1239             continue;
1240           GNUNET_array_append(cl,
1241                               cl_size,
1242                               m->ch);
1243
1244           msize = path_length * sizeof(struct GNUNET_PeerIdentity);
1245           env = GNUNET_MQ_msg_extra(mmsg,
1246                                     msize,
1247                                     GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1248           mmsg->options = htonl(options);
1249           mmsg->type = htonl(type);
1250           mmsg->hop_count = htonl(hop_count);
1251           mmsg->desired_replication_level = htonl(desired_replication_level);
1252           mmsg->get_path_length = htonl(path_length);
1253           mmsg->key = *key;
1254           msg_path = (struct GNUNET_PeerIdentity *)&mmsg[1];
1255           GNUNET_memcpy(msg_path,
1256                         path,
1257                         path_length * sizeof(struct GNUNET_PeerIdentity));
1258           GNUNET_MQ_send(m->ch->mq,
1259                          env);
1260         }
1261     }
1262   GNUNET_free_non_null(cl);
1263 }
1264
1265
1266 /**
1267  * Check if some client is monitoring GET RESP messages and notify
1268  * them in that case.
1269  *
1270  * @param type The type of data in the result.
1271  * @param get_path Peers on GET path (or NULL if not recorded).
1272  * @param get_path_length number of entries in get_path.
1273  * @param put_path peers on the PUT path (or NULL if not recorded).
1274  * @param put_path_length number of entries in get_path.
1275  * @param exp Expiration time of the data.
1276  * @param key Key of the data.
1277  * @param data Pointer to the result data.
1278  * @param size Number of bytes in @a data.
1279  */
1280 void
1281 GDS_CLIENTS_process_get_resp(enum GNUNET_BLOCK_Type type,
1282                              const struct GNUNET_PeerIdentity *get_path,
1283                              unsigned int get_path_length,
1284                              const struct GNUNET_PeerIdentity *put_path,
1285                              unsigned int put_path_length,
1286                              struct GNUNET_TIME_Absolute exp,
1287                              const struct GNUNET_HashCode * key,
1288                              const void *data,
1289                              size_t size)
1290 {
1291   struct ClientMonitorRecord *m;
1292   struct ClientHandle **cl;
1293   unsigned int cl_size;
1294
1295   cl = NULL;
1296   cl_size = 0;
1297   for (m = monitor_head; NULL != m; m = m->next)
1298     {
1299       if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1300           (NULL == m->key ||
1301            memcmp(key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1302         {
1303           struct GNUNET_MQ_Envelope *env;
1304           struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1305           struct GNUNET_PeerIdentity *path;
1306           size_t msize;
1307           unsigned int i;
1308
1309           /* Don't send duplicates */
1310           for (i = 0; i < cl_size; i++)
1311             if (cl[i] == m->ch)
1312               break;
1313           if (i < cl_size)
1314             continue;
1315           GNUNET_array_append(cl,
1316                               cl_size,
1317                               m->ch);
1318
1319           msize = size;
1320           msize += (get_path_length + put_path_length)
1321                    * sizeof(struct GNUNET_PeerIdentity);
1322           env = GNUNET_MQ_msg_extra(mmsg,
1323                                     msize,
1324                                     GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1325           mmsg->type = htonl(type);
1326           mmsg->put_path_length = htonl(put_path_length);
1327           mmsg->get_path_length = htonl(get_path_length);
1328           mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1329           mmsg->key = *key;
1330           path = (struct GNUNET_PeerIdentity *)&mmsg[1];
1331           GNUNET_memcpy(path,
1332                         put_path,
1333                         put_path_length * sizeof(struct GNUNET_PeerIdentity));
1334           GNUNET_memcpy(path,
1335                         get_path,
1336                         get_path_length * sizeof(struct GNUNET_PeerIdentity));
1337           GNUNET_memcpy(&path[get_path_length],
1338                         data,
1339                         size);
1340           GNUNET_MQ_send(m->ch->mq,
1341                          env);
1342         }
1343     }
1344   GNUNET_free_non_null(cl);
1345 }
1346
1347
1348 /**
1349  * Check if some client is monitoring PUT messages and notify
1350  * them in that case.
1351  *
1352  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1353  * @param type The type of data in the request.
1354  * @param hop_count Hop count so far.
1355  * @param path_length number of entries in path (or 0 if not recorded).
1356  * @param path peers on the PUT path (or NULL if not recorded).
1357  * @param desired_replication_level Desired replication level.
1358  * @param exp Expiration time of the data.
1359  * @param key Key under which data is to be stored.
1360  * @param data Pointer to the data carried.
1361  * @param size Number of bytes in data.
1362  */
1363 void
1364 GDS_CLIENTS_process_put(uint32_t options,
1365                         enum GNUNET_BLOCK_Type type,
1366                         uint32_t hop_count,
1367                         uint32_t desired_replication_level,
1368                         unsigned int path_length,
1369                         const struct GNUNET_PeerIdentity *path,
1370                         struct GNUNET_TIME_Absolute exp,
1371                         const struct GNUNET_HashCode *key,
1372                         const void *data,
1373                         size_t size)
1374 {
1375   struct ClientMonitorRecord *m;
1376   struct ClientHandle **cl;
1377   unsigned int cl_size;
1378
1379   cl = NULL;
1380   cl_size = 0;
1381   for (m = monitor_head; NULL != m; m = m->next)
1382     {
1383       if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1384           (NULL == m->key ||
1385            memcmp(key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1386         {
1387           struct GNUNET_MQ_Envelope *env;
1388           struct GNUNET_DHT_MonitorPutMessage *mmsg;
1389           struct GNUNET_PeerIdentity *msg_path;
1390           size_t msize;
1391           unsigned int i;
1392
1393           /* Don't send duplicates */
1394           for (i = 0; i < cl_size; i++)
1395             if (cl[i] == m->ch)
1396               break;
1397           if (i < cl_size)
1398             continue;
1399           GNUNET_array_append(cl,
1400                               cl_size,
1401                               m->ch);
1402
1403           msize = size;
1404           msize += path_length * sizeof(struct GNUNET_PeerIdentity);
1405           env = GNUNET_MQ_msg_extra(mmsg,
1406                                     msize,
1407                                     GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1408           mmsg->options = htonl(options);
1409           mmsg->type = htonl(type);
1410           mmsg->hop_count = htonl(hop_count);
1411           mmsg->desired_replication_level = htonl(desired_replication_level);
1412           mmsg->put_path_length = htonl(path_length);
1413           mmsg->key = *key;
1414           mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1415           msg_path = (struct GNUNET_PeerIdentity *)&mmsg[1];
1416           GNUNET_memcpy(msg_path,
1417                         path,
1418                         path_length * sizeof(struct GNUNET_PeerIdentity));
1419           GNUNET_memcpy(&msg_path[path_length],
1420                         data,
1421                         size);
1422           GNUNET_MQ_send(m->ch->mq,
1423                          env);
1424         }
1425     }
1426   GNUNET_free_non_null(cl);
1427 }
1428
1429
1430 /**
1431  * Initialize client subsystem.
1432  *
1433  * @param server the initialized server
1434  */
1435 static void
1436 GDS_CLIENTS_init()
1437 {
1438   forward_map
1439     = GNUNET_CONTAINER_multihashmap_create(1024,
1440                                            GNUNET_YES);
1441   retry_heap
1442     = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
1443 }
1444
1445
1446 /**
1447  * Shutdown client subsystem.
1448  */
1449 static void
1450 GDS_CLIENTS_stop()
1451 {
1452   if (NULL != retry_task)
1453     {
1454       GNUNET_SCHEDULER_cancel(retry_task);
1455       retry_task = NULL;
1456     }
1457 }
1458
1459
1460 /**
1461  * Define "main" method using service macro.
1462  *
1463  * @param name name of the service, i.e. "dht" or "xdht"
1464  * @param run name of the initializaton method for the service
1465  */
1466 #define GDS_DHT_SERVICE_INIT(name, run)          \
1467   GNUNET_SERVICE_MAIN \
1468     (name, \
1469     GNUNET_SERVICE_OPTION_NONE, \
1470     run, \
1471     &client_connect_cb, \
1472     &client_disconnect_cb, \
1473     NULL, \
1474     GNUNET_MQ_hd_var_size(dht_local_put, \
1475                           GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1476                           struct GNUNET_DHT_ClientPutMessage, \
1477                           NULL), \
1478     GNUNET_MQ_hd_var_size(dht_local_get, \
1479                           GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1480                           struct GNUNET_DHT_ClientGetMessage, \
1481                           NULL), \
1482     GNUNET_MQ_hd_fixed_size(dht_local_get_stop, \
1483                             GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1484                             struct GNUNET_DHT_ClientGetStopMessage, \
1485                             NULL), \
1486     GNUNET_MQ_hd_fixed_size(dht_local_monitor, \
1487                             GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1488                             struct GNUNET_DHT_MonitorStartStopMessage, \
1489                             NULL), \
1490     GNUNET_MQ_hd_fixed_size(dht_local_monitor_stop, \
1491                             GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1492                             struct GNUNET_DHT_MonitorStartStopMessage, \
1493                             NULL), \
1494     GNUNET_MQ_hd_var_size(dht_local_get_result_seen, \
1495                           GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1496                           struct GNUNET_DHT_ClientGetResultSeenMessage, \
1497                           NULL), \
1498     GNUNET_MQ_handler_end())
1499
1500
1501 /**
1502  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1503  */
1504 void __attribute__ ((destructor))
1505 GDS_CLIENTS_done()
1506 {
1507   if (NULL != retry_heap)
1508     {
1509       GNUNET_assert(0 == GNUNET_CONTAINER_heap_get_size(retry_heap));
1510       GNUNET_CONTAINER_heap_destroy(retry_heap);
1511       retry_heap = NULL;
1512     }
1513   if (NULL != forward_map)
1514     {
1515       GNUNET_assert(0 == GNUNET_CONTAINER_multihashmap_size(forward_map));
1516       GNUNET_CONTAINER_multihashmap_destroy(forward_map);
1517       forward_map = NULL;
1518     }
1519 }
1520
1521 /* end of gnunet-service-dht_clients.c */