REST: nothing triggers rest
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-service-dht.h"
33 #include "gnunet-service-dht_datacache.h"
34 #include "gnunet-service-dht_neighbours.h"
35 #include "dht.h"
36
37
38 /**
39  * Should routing details be logged to stderr (for debugging)?
40  */
41 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "dht-clients",__VA_ARGS__)
44
45
46 /**
47  * Struct containing information about a client,
48  * handle to connect to it, and any pending messages
49  * that need to be sent to it.
50  */
51 struct ClientHandle;
52
53
54 /**
55  * Entry in the local forwarding map for a client's GET request.
56  */
57 struct ClientQueryRecord
58 {
59
60   /**
61    * The key this request was about
62    */
63   struct GNUNET_HashCode key;
64
65   /**
66    * Kept in a DLL with @e client.
67    */
68   struct ClientQueryRecord *next;
69
70   /**
71    * Kept in a DLL with @e client.
72    */
73   struct ClientQueryRecord *prev;
74
75   /**
76    * Client responsible for the request.
77    */
78   struct ClientHandle *ch;
79
80   /**
81    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
82    */
83   const void *xquery;
84
85   /**
86    * Replies we have already seen for this request.
87    */
88   struct GNUNET_HashCode *seen_replies;
89
90   /**
91    * Pointer to this nodes heap location in the retry-heap (for fast removal)
92    */
93   struct GNUNET_CONTAINER_HeapNode *hnode;
94
95   /**
96    * What's the delay between re-try operations that we currently use for this
97    * request?
98    */
99   struct GNUNET_TIME_Relative retry_frequency;
100
101   /**
102    * What's the next time we should re-try this request?
103    */
104   struct GNUNET_TIME_Absolute retry_time;
105
106   /**
107    * The unique identifier of this request
108    */
109   uint64_t unique_id;
110
111   /**
112    * Number of bytes in xquery.
113    */
114   size_t xquery_size;
115
116   /**
117    * Number of entries in 'seen_replies'.
118    */
119   unsigned int seen_replies_count;
120
121   /**
122    * Desired replication level
123    */
124   uint32_t replication;
125
126   /**
127    * Any message options for this request
128    */
129   uint32_t msg_options;
130
131   /**
132    * The type for the data for the GET request.
133    */
134   enum GNUNET_BLOCK_Type type;
135
136 };
137
138
139 /**
140  * Struct containing paremeters of monitoring requests.
141  */
142 struct ClientMonitorRecord
143 {
144
145   /**
146    * Next element in DLL.
147    */
148   struct ClientMonitorRecord *next;
149
150   /**
151    * Previous element in DLL.
152    */
153   struct ClientMonitorRecord *prev;
154
155   /**
156    * Type of blocks that are of interest
157    */
158   enum GNUNET_BLOCK_Type type;
159
160   /**
161    * Key of data of interest, NULL for all.
162    */
163   struct GNUNET_HashCode *key;
164
165   /**
166    * Flag whether to notify about GET messages.
167    */
168   int16_t get;
169
170   /**
171    * Flag whether to notify about GET_REPONSE messages.
172    */
173   int16_t get_resp;
174
175   /**
176    * Flag whether to notify about PUT messages.
177    */
178   uint16_t put;
179
180   /**
181    * Client to notify of these requests.
182    */
183   struct ClientHandle *ch;
184 };
185
186
187 /**
188  * Struct containing information about a client,
189  * handle to connect to it, and any pending messages
190  * that need to be sent to it.
191  */
192 struct ClientHandle
193 {
194   /**
195    * Linked list of active queries of this client.
196    */
197   struct ClientQueryRecord *cqr_head;
198
199   /**
200    * Linked list of active queries of this client.
201    */
202   struct ClientQueryRecord *cqr_tail;
203
204   /**
205    * The handle to this client
206    */
207   struct GNUNET_SERVICE_Client *client;
208
209   /**
210    * The message queue to this client
211    */
212   struct GNUNET_MQ_Handle *mq;
213
214 };
215
216 /**
217  * Our handle to the BLOCK library.
218  */
219 struct GNUNET_BLOCK_Context *GDS_block_context;
220
221 /**
222  * Handle for the statistics service.
223  */
224 struct GNUNET_STATISTICS_Handle *GDS_stats;
225
226 /**
227  * Handle for the service.
228  */
229 struct GNUNET_SERVICE_Handle *GDS_service;
230
231 /**
232  * The configuration the DHT service is running with
233  */
234 const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
235
236 /**
237  * List of active monitoring requests.
238  */
239 static struct ClientMonitorRecord *monitor_head;
240
241 /**
242  * List of active monitoring requests.
243  */
244 static struct ClientMonitorRecord *monitor_tail;
245
246 /**
247  * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
248  */
249 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
250
251 /**
252  * Heap with all of our client's request, sorted by retry time (earliest on top).
253  */
254 static struct GNUNET_CONTAINER_Heap *retry_heap;
255
256 /**
257  * Task that re-transmits requests (using retry_heap).
258  */
259 static struct GNUNET_SCHEDULER_Task *retry_task;
260
261
262 /**
263  * Free data structures associated with the given query.
264  *
265  * @param record record to remove
266  */
267 static void
268 remove_client_record (struct ClientQueryRecord *record)
269 {
270   struct ClientHandle *ch = record->ch;
271
272   GNUNET_CONTAINER_DLL_remove (ch->cqr_head,
273                                ch->cqr_tail,
274                                record);
275   GNUNET_assert (GNUNET_YES ==
276                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
277                                                        &record->key,
278                                                        record));
279   if (NULL != record->hnode)
280     GNUNET_CONTAINER_heap_remove_node (record->hnode);
281   GNUNET_array_grow (record->seen_replies,
282                      record->seen_replies_count,
283                      0);
284   GNUNET_free (record);
285 }
286
287
288 /**
289  * Functions with this signature are called whenever a local client is
290  * connects to us.
291  *
292  * @param cls closure (NULL for dht)
293  * @param client identification of the client
294  * @param mq message queue for talking to @a client
295  * @return our `struct ClientHandle` for @a client
296  */
297 static void *
298 client_connect_cb (void *cls,
299                    struct GNUNET_SERVICE_Client *client,
300                    struct GNUNET_MQ_Handle *mq)
301 {
302   struct ClientHandle *ch;
303
304   ch = GNUNET_new (struct ClientHandle);
305   ch->client = client;
306   ch->mq = mq;
307   return ch;
308 }
309
310
311 /**
312  * Functions with this signature are called whenever a client
313  * is disconnected on the network level.
314  *
315  * @param cls closure (NULL for dht)
316  * @param client identification of the client
317  * @param app_ctx our `struct ClientHandle` for @a client
318  */
319 static void
320 client_disconnect_cb (void *cls,
321                       struct GNUNET_SERVICE_Client *client,
322                       void *app_ctx)
323 {
324   struct ClientHandle *ch = app_ctx;
325   struct ClientQueryRecord *cqr;
326   struct ClientMonitorRecord *monitor;
327
328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329               "Local client %p disconnects\n",
330               ch);
331   monitor = monitor_head;
332   while (NULL != monitor)
333   {
334     if (monitor->ch == ch)
335     {
336       struct ClientMonitorRecord *next;
337
338       next = monitor->next;
339       GNUNET_free_non_null (monitor->key);
340       GNUNET_CONTAINER_DLL_remove (monitor_head,
341                                    monitor_tail,
342                                    monitor);
343       GNUNET_free (monitor);
344       monitor = next;
345     }
346     else
347     {
348       monitor = monitor->next;
349     }
350   }
351   while (NULL != (cqr = ch->cqr_head))
352     remove_client_record (cqr);
353   GNUNET_free (ch);
354 }
355
356
357 /**
358  * Route the given request via the DHT.  This includes updating
359  * the bloom filter and retransmission times, building the P2P
360  * message and initiating the routing operation.
361  */
362 static void
363 transmit_request (struct ClientQueryRecord *cqr)
364 {
365   struct GNUNET_BLOCK_Group *bg;
366   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
367
368   GNUNET_STATISTICS_update (GDS_stats,
369                             gettext_noop ("# GET requests from clients injected"),
370                             1,
371                             GNUNET_NO);
372   bg = GNUNET_BLOCK_group_create (GDS_block_context,
373                                   cqr->type,
374                                   GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
375                                                             UINT32_MAX),
376                                   NULL,
377                                   0,
378                                   "seen-set-size",
379                                   cqr->seen_replies_count,
380                                   NULL);
381   GNUNET_BLOCK_group_set_seen (bg,
382                                cqr->seen_replies,
383                                cqr->seen_replies_count);
384   peer_bf
385     = GNUNET_CONTAINER_bloomfilter_init (NULL,
386                                          DHT_BLOOM_SIZE,
387                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
388   LOG (GNUNET_ERROR_TYPE_DEBUG,
389        "Initiating GET for %s, replication %u, already have %u replies\n",
390        GNUNET_h2s (&cqr->key),
391        cqr->replication,
392        cqr->seen_replies_count);
393   GDS_NEIGHBOURS_handle_get (cqr->type,
394                              cqr->msg_options,
395                              cqr->replication,
396                              0 /* hop count */ ,
397                              &cqr->key,
398                              cqr->xquery,
399                              cqr->xquery_size,
400                              bg,
401                              peer_bf);
402   GNUNET_BLOCK_group_destroy (bg);
403   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
404
405   /* exponential back-off for retries.
406    * max GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
407   cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
408   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
409 }
410
411
412 /**
413  * Task that looks at the #retry_heap and transmits all of the requests
414  * on the heap that are ready for transmission.  Then re-schedules
415  * itself (unless the heap is empty).
416  *
417  * @param cls unused
418  */
419 static void
420 transmit_next_request_task (void *cls)
421 {
422   struct ClientQueryRecord *cqr;
423   struct GNUNET_TIME_Relative delay;
424
425   retry_task = NULL;
426   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
427   {
428     cqr->hnode = NULL;
429     delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
430     if (delay.rel_value_us > 0)
431     {
432       cqr->hnode
433         = GNUNET_CONTAINER_heap_insert (retry_heap,
434                                         cqr,
435                                         cqr->retry_time.abs_value_us);
436       retry_task
437         = GNUNET_SCHEDULER_add_at (cqr->retry_time,
438                                    &transmit_next_request_task,
439                                    NULL);
440       return;
441     }
442     transmit_request (cqr);
443     cqr->hnode
444       = GNUNET_CONTAINER_heap_insert (retry_heap,
445                                       cqr,
446                                       cqr->retry_time.abs_value_us);
447   }
448 }
449
450
451 /**
452  * Check DHT PUT messages from the client.
453  *
454  * @param cls the client we received this message from
455  * @param dht_msg the actual message received
456  * @return #GNUNET_OK (always)
457  */
458 static int
459 check_dht_local_put (void *cls,
460                       const struct GNUNET_DHT_ClientPutMessage *dht_msg)
461 {
462   /* always well-formed */
463   return GNUNET_OK;
464 }
465
466
467 /**
468  * Handler for PUT messages.
469  *
470  * @param cls the client we received this message from
471  * @param dht_msg the actual message received
472  */
473 static void
474 handle_dht_local_put (void *cls,
475                       const struct GNUNET_DHT_ClientPutMessage *dht_msg)
476 {
477   struct ClientHandle *ch = cls;
478   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
479   uint16_t size;
480
481   size = ntohs (dht_msg->header.size);
482   GNUNET_STATISTICS_update (GDS_stats,
483                             gettext_noop ("# PUT requests received from clients"),
484                             1,
485                             GNUNET_NO);
486   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
487                "CLIENT-PUT %s\n",
488                GNUNET_h2s_full (&dht_msg->key));
489   /* give to local clients */
490   LOG (GNUNET_ERROR_TYPE_DEBUG,
491        "Handling local PUT of %u-bytes for query %s\n",
492        size - sizeof (struct GNUNET_DHT_ClientPutMessage),
493        GNUNET_h2s (&dht_msg->key));
494   GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
495                             &dht_msg->key,
496                             0,
497                             NULL,
498                             0,
499                             NULL,
500                             ntohl (dht_msg->type),
501                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
502                             &dht_msg[1]);
503   /* store locally */
504   GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
505                             &dht_msg->key,
506                             0,
507                             NULL,
508                             ntohl (dht_msg->type),
509                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
510                             &dht_msg[1]);
511   /* route to other peers */
512   peer_bf
513     = GNUNET_CONTAINER_bloomfilter_init (NULL,
514                                          DHT_BLOOM_SIZE,
515                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
516   GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
517                              ntohl (dht_msg->options),
518                              ntohl (dht_msg->desired_replication_level),
519                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
520                              0 /* hop count */,
521                              peer_bf,
522                              &dht_msg->key,
523                              0,
524                              NULL,
525                              &dht_msg[1],
526                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
527   GDS_CLIENTS_process_put (ntohl (dht_msg->options),
528                            ntohl (dht_msg->type),
529                            0,
530                            ntohl (dht_msg->desired_replication_level),
531                            1,
532                            GDS_NEIGHBOURS_get_id(),
533                            GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
534                            &dht_msg->key,
535                            &dht_msg[1],
536                            size - sizeof (struct GNUNET_DHT_ClientPutMessage));
537   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
538   GNUNET_SERVICE_client_continue (ch->client);
539 }
540
541
542 /**
543  * Check DHT GET messages from the client.
544  *
545  * @param cls the client we received this message from
546  * @param message the actual message received
547  * @return #GNUNET_OK (always)
548  */
549 static int
550 check_dht_local_get (void *cls,
551                       const struct GNUNET_DHT_ClientGetMessage *get)
552 {
553   /* always well-formed */
554   return GNUNET_OK;
555 }
556
557
558 /**
559  * Handle a result from local datacache for a GET operation.
560  *
561  * @param cls the `struct ClientHandle` of the client doing the query
562  * @param type type of the block
563  * @param expiration_time when does the content expire
564  * @param key key for the content
565  * @param put_path_length number of entries in @a put_path
566  * @param put_path peers the original PUT traversed (if tracked)
567  * @param get_path_length number of entries in @a get_path
568  * @param get_path peers this reply has traversed so far (if tracked)
569  * @param data payload of the reply
570  * @param data_size number of bytes in @a data
571  */
572 static void
573 handle_local_result (void *cls,
574                      enum GNUNET_BLOCK_Type type,
575                      struct GNUNET_TIME_Absolute expiration_time,
576                      const struct GNUNET_HashCode *key,
577                      unsigned int put_path_length,
578                      const struct GNUNET_PeerIdentity *put_path,
579                      unsigned int get_path_length,
580                      const struct GNUNET_PeerIdentity *get_path,
581                      const void *data,
582                      size_t data_size)
583 {
584   // FIXME: this needs some clean up: inline the function,
585   // possibly avoid even looking up the client!
586   GDS_CLIENTS_handle_reply (expiration_time,
587                             key,
588                             0, NULL,
589                             put_path_length, put_path,
590                             type,
591                             data_size, data);
592 }
593
594
595 /**
596  * Handler for DHT GET messages from the client.
597  *
598  * @param cls the client we received this message from
599  * @param message the actual message received
600  */
601 static void
602 handle_dht_local_get (void *cls,
603                       const struct GNUNET_DHT_ClientGetMessage *get)
604 {
605   struct ClientHandle *ch = cls;
606   struct ClientQueryRecord *cqr;
607   size_t xquery_size;
608   const char *xquery;
609   uint16_t size;
610
611   size = ntohs (get->header.size);
612   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
613   xquery = (const char *) &get[1];
614   GNUNET_STATISTICS_update (GDS_stats,
615                             gettext_noop
616                             ("# GET requests received from clients"), 1,
617                             GNUNET_NO);
618   LOG (GNUNET_ERROR_TYPE_DEBUG,
619        "Received GET request for %s from local client %p, xq: %.*s\n",
620        GNUNET_h2s (&get->key),
621        ch->client,
622        xquery_size,
623        xquery);
624   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
625                "CLIENT-GET %s\n",
626                GNUNET_h2s_full (&get->key));
627
628   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
629   cqr->key = get->key;
630   cqr->ch = ch;
631   cqr->xquery = (void *) &cqr[1];
632   GNUNET_memcpy (&cqr[1], xquery, xquery_size);
633   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0);
634   cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
635   cqr->retry_time = GNUNET_TIME_absolute_get ();
636   cqr->unique_id = get->unique_id;
637   cqr->xquery_size = xquery_size;
638   cqr->replication = ntohl (get->desired_replication_level);
639   cqr->msg_options = ntohl (get->options);
640   cqr->type = ntohl (get->type);
641   GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
642                                ch->cqr_tail,
643                                cqr);
644   GNUNET_CONTAINER_multihashmap_put (forward_map,
645                                      &cqr->key,
646                                      cqr,
647                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
648   GDS_CLIENTS_process_get (ntohl (get->options),
649                            ntohl (get->type),
650                            0,
651                            ntohl (get->desired_replication_level),
652                            1,
653                            GDS_NEIGHBOURS_get_id(),
654                            &get->key);
655   /* start remote requests */
656   if (NULL != retry_task)
657     GNUNET_SCHEDULER_cancel (retry_task);
658   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task,
659                                          NULL);
660   /* perform local lookup */
661   GDS_DATACACHE_handle_get (&get->key,
662                             cqr->type,
663                             cqr->xquery,
664                             xquery_size,
665                             NULL,
666                             &handle_local_result,
667                             ch);
668   GNUNET_SERVICE_client_continue (ch->client);
669 }
670
671
672 /**
673  * Closure for #find_by_unique_id().
674  */
675 struct FindByUniqueIdContext
676 {
677   /**
678    * Where to store the result, if found.
679    */
680   struct ClientQueryRecord *cqr;
681
682   uint64_t unique_id;
683 };
684
685
686 /**
687  * Function called for each existing DHT record for the given
688  * query.  Checks if it matches the UID given in the closure
689  * and if so returns the entry as a result.
690  *
691  * @param cls the search context
692  * @param key query for the lookup (not used)
693  * @param value the `struct ClientQueryRecord`
694  * @return #GNUNET_YES to continue iteration (result not yet found)
695  */
696 static int
697 find_by_unique_id (void *cls,
698                    const struct GNUNET_HashCode *key,
699                    void *value)
700 {
701   struct FindByUniqueIdContext *fui_ctx = cls;
702   struct ClientQueryRecord *cqr = value;
703
704   if (cqr->unique_id != fui_ctx->unique_id)
705     return GNUNET_YES;
706   fui_ctx->cqr = cqr;
707   return GNUNET_NO;
708 }
709
710
711 /**
712  * Check "GET result seen" messages from the client.
713  *
714  * @param cls the client we received this message from
715  * @param message the actual message received
716  * @return #GNUNET_OK if @a seen is well-formed
717  */
718 static int
719 check_dht_local_get_result_seen (void *cls,
720                                  const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
721 {
722   uint16_t size;
723   unsigned int hash_count;
724
725   size = ntohs (seen->header.size);
726   hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
727   if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode))
728   {
729     GNUNET_break (0);
730     return GNUNET_SYSERR;
731   }
732   return GNUNET_OK;
733 }
734
735
736 /**
737  * Handler for "GET result seen" messages from the client.
738  *
739  * @param cls the client we received this message from
740  * @param message the actual message received
741  */
742 static void
743 handle_dht_local_get_result_seen (void *cls,
744                                   const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
745 {
746   struct ClientHandle *ch = cls;
747   uint16_t size;
748   unsigned int hash_count;
749   unsigned int old_count;
750   const struct GNUNET_HashCode *hc;
751   struct FindByUniqueIdContext fui_ctx;
752   struct ClientQueryRecord *cqr;
753
754   size = ntohs (seen->header.size);
755   hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
756   hc = (const struct GNUNET_HashCode*) &seen[1];
757   fui_ctx.unique_id = seen->unique_id;
758   fui_ctx.cqr = NULL;
759   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
760                                               &seen->key,
761                                               &find_by_unique_id,
762                                               &fui_ctx);
763   if (NULL == (cqr = fui_ctx.cqr))
764   {
765     GNUNET_break (0);
766     GNUNET_SERVICE_client_drop (ch->client);
767     return;
768   }
769   /* finally, update 'seen' list */
770   old_count = cqr->seen_replies_count;
771   GNUNET_array_grow (cqr->seen_replies,
772                      cqr->seen_replies_count,
773                      cqr->seen_replies_count + hash_count);
774   GNUNET_memcpy (&cqr->seen_replies[old_count],
775                  hc,
776                  sizeof (struct GNUNET_HashCode) * hash_count);
777 }
778
779
780 /**
781  * Closure for #remove_by_unique_id().
782  */
783 struct RemoveByUniqueIdContext
784 {
785   /**
786    * Client that issued the removal request.
787    */
788   struct ClientHandle *ch;
789
790   /**
791    * Unique ID of the request.
792    */
793   uint64_t unique_id;
794 };
795
796
797 /**
798  * Iterator over hash map entries that frees all entries
799  * that match the given client and unique ID.
800  *
801  * @param cls unique ID and client to search for in source routes
802  * @param key current key code
803  * @param value value in the hash map, a ClientQueryRecord
804  * @return #GNUNET_YES (we should continue to iterate)
805  */
806 static int
807 remove_by_unique_id (void *cls,
808                      const struct GNUNET_HashCode *key,
809                      void *value)
810 {
811   const struct RemoveByUniqueIdContext *ctx = cls;
812   struct ClientQueryRecord *cqr = value;
813
814   if (cqr->unique_id != ctx->unique_id)
815     return GNUNET_YES;
816   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817               "Removing client %p's record for key %s (by unique id)\n",
818               ctx->ch->client,
819               GNUNET_h2s (key));
820   remove_client_record (cqr);
821   return GNUNET_YES;
822 }
823
824
825 /**
826  * Handler for any generic DHT stop messages, calls the appropriate handler
827  * depending on message type (if processed locally)
828  *
829  * @param cls client we received this message from
830  * @param message the actual message received
831  *
832  */
833 static void
834 handle_dht_local_get_stop (void *cls,
835                            const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
836 {
837   struct ClientHandle *ch = cls;
838   struct RemoveByUniqueIdContext ctx;
839
840   GNUNET_STATISTICS_update (GDS_stats,
841                             gettext_noop
842                             ("# GET STOP requests received from clients"), 1,
843                             GNUNET_NO);
844   LOG (GNUNET_ERROR_TYPE_DEBUG,
845        "Received GET STOP request for %s from local client %p\n",
846        GNUNET_h2s (&dht_stop_msg->key),
847        ch->client);
848   ctx.ch = ch;
849   ctx.unique_id = dht_stop_msg->unique_id;
850   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
851                                               &dht_stop_msg->key,
852                                               &remove_by_unique_id,
853                                               &ctx);
854   GNUNET_SERVICE_client_continue (ch->client);
855 }
856
857
858 /**
859  * Handler for monitor start messages
860  *
861  * @param cls the client we received this message from
862  * @param msg the actual message received
863  *
864  */
865 static void
866 handle_dht_local_monitor (void *cls,
867                           const struct GNUNET_DHT_MonitorStartStopMessage *msg)
868 {
869   struct ClientHandle *ch = cls;
870   struct ClientMonitorRecord *r;
871
872   r = GNUNET_new (struct ClientMonitorRecord);
873   r->ch = ch;
874   r->type = ntohl (msg->type);
875   r->get = ntohs (msg->get);
876   r->get_resp = ntohs (msg->get_resp);
877   r->put = ntohs (msg->put);
878   if (0 == ntohs (msg->filter_key))
879   {
880     r->key = NULL;
881   }
882   else
883   {
884     r->key = GNUNET_new (struct GNUNET_HashCode);
885     GNUNET_memcpy (r->key,
886                    &msg->key,
887                    sizeof (struct GNUNET_HashCode));
888   }
889   GNUNET_CONTAINER_DLL_insert (monitor_head,
890                                monitor_tail,
891                                r);
892   GNUNET_SERVICE_client_continue (ch->client);
893 }
894
895
896 /**
897  * Handler for monitor stop messages
898  *
899  * @param cls the client we received this message from
900  * @param msg the actual message received
901  */
902 static void
903 handle_dht_local_monitor_stop (void *cls,
904                                const struct GNUNET_DHT_MonitorStartStopMessage *msg)
905 {
906   struct ClientHandle *ch = cls;
907   struct ClientMonitorRecord *r;
908   int keys_match;
909
910   GNUNET_SERVICE_client_continue (ch->client);
911   for (r = monitor_head; NULL != r; r = r->next)
912   {
913     if (NULL == r->key)
914     {
915       keys_match = (0 == ntohs(msg->filter_key));
916     }
917     else
918     {
919       keys_match = ( (0 != ntohs(msg->filter_key)) &&
920                      (! memcmp (r->key,
921                                 &msg->key,
922                                 sizeof(struct GNUNET_HashCode))) );
923     }
924     if ( (ch == r->ch) &&
925          (ntohl(msg->type) == r->type) &&
926          (r->get == msg->get) &&
927          (r->get_resp == msg->get_resp) &&
928          (r->put == msg->put) &&
929          keys_match )
930     {
931       GNUNET_CONTAINER_DLL_remove (monitor_head,
932                                    monitor_tail,
933                                    r);
934       GNUNET_free_non_null (r->key);
935       GNUNET_free (r);
936       return; /* Delete only ONE entry */
937     }
938   }
939 }
940
941
942 /**
943  * Closure for #forward_reply()
944  */
945 struct ForwardReplyContext
946 {
947
948   /**
949    * Expiration time of the reply.
950    */
951   struct GNUNET_TIME_Absolute expiration;
952
953   /**
954    * GET path taken.
955    */
956   const struct GNUNET_PeerIdentity *get_path;
957
958   /**
959    * PUT path taken.
960    */
961   const struct GNUNET_PeerIdentity *put_path;
962
963   /**
964    * Embedded payload.
965    */
966   const void *data;
967
968   /**
969    * Number of bytes in data.
970    */
971   size_t data_size;
972
973   /**
974    * Number of entries in @e get_path.
975    */
976   unsigned int get_path_length;
977
978   /**
979    * Number of entries in @e put_path.
980    */
981   unsigned int put_path_length;
982
983   /**
984    * Type of the data.
985    */
986   enum GNUNET_BLOCK_Type type;
987
988 };
989
990
991 /**
992  * Iterator over hash map entries that send a given reply to
993  * each of the matching clients.  With some tricky recycling
994  * of the buffer.
995  *
996  * @param cls the 'struct ForwardReplyContext'
997  * @param key current key
998  * @param value value in the hash map, a ClientQueryRecord
999  * @return #GNUNET_YES (we should continue to iterate),
1000  *         if the result is mal-formed, #GNUNET_NO
1001  */
1002 static int
1003 forward_reply (void *cls,
1004                const struct GNUNET_HashCode *key,
1005                void *value)
1006 {
1007   struct ForwardReplyContext *frc = cls;
1008   struct ClientQueryRecord *record = value;
1009   struct GNUNET_MQ_Envelope *env;
1010   struct GNUNET_DHT_ClientResultMessage *reply;
1011   enum GNUNET_BLOCK_EvaluationResult eval;
1012   int do_free;
1013   struct GNUNET_HashCode ch;
1014   struct GNUNET_PeerIdentity *paths;
1015
1016   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1017                "CLIENT-RESULT %s\n",
1018                GNUNET_h2s_full (key));
1019   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
1020        (record->type != frc->type))
1021   {
1022     LOG (GNUNET_ERROR_TYPE_DEBUG,
1023          "Record type mismatch, not passing request for key %s to local client\n",
1024          GNUNET_h2s (key));
1025     GNUNET_STATISTICS_update (GDS_stats,
1026                               gettext_noop
1027                               ("# Key match, type mismatches in REPLY to CLIENT"),
1028                               1, GNUNET_NO);
1029     return GNUNET_YES;          /* type mismatch */
1030   }
1031   GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch);
1032   for (unsigned int i = 0; i < record->seen_replies_count; i++)
1033     if (0 == memcmp (&record->seen_replies[i],
1034                      &ch,
1035                      sizeof (struct GNUNET_HashCode)))
1036     {
1037       LOG (GNUNET_ERROR_TYPE_DEBUG,
1038            "Duplicate reply, not passing request for key %s to local client\n",
1039            GNUNET_h2s (key));
1040       GNUNET_STATISTICS_update (GDS_stats,
1041                                 gettext_noop
1042                                 ("# Duplicate REPLIES to CLIENT request dropped"),
1043                                 1, GNUNET_NO);
1044       return GNUNET_YES;        /* duplicate */
1045     }
1046   eval
1047     = GNUNET_BLOCK_evaluate (GDS_block_context,
1048                              record->type,
1049                              NULL,
1050                              GNUNET_BLOCK_EO_NONE,
1051                              key,
1052                              record->xquery,
1053                              record->xquery_size,
1054                              frc->data,
1055                              frc->data_size);
1056   LOG (GNUNET_ERROR_TYPE_DEBUG,
1057        "Evaluation result is %d for key %s for local client's query\n",
1058        (int) eval,
1059        GNUNET_h2s (key));
1060   switch (eval)
1061   {
1062   case GNUNET_BLOCK_EVALUATION_OK_LAST:
1063     do_free = GNUNET_YES;
1064     break;
1065   case GNUNET_BLOCK_EVALUATION_OK_MORE:
1066     GNUNET_array_append (record->seen_replies,
1067                          record->seen_replies_count,
1068                          ch);
1069     do_free = GNUNET_NO;
1070     break;
1071   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1072     /* should be impossible to encounter here */
1073     GNUNET_break (0);
1074     return GNUNET_YES;
1075   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1076     GNUNET_break_op (0);
1077     return GNUNET_NO;
1078   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1079     GNUNET_break (0);
1080     return GNUNET_NO;
1081   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1082     GNUNET_break (0);
1083     return GNUNET_NO;
1084   case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1085     return GNUNET_YES;
1086   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1087     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1088                 _("Unsupported block type (%u) in request!\n"), record->type);
1089     return GNUNET_NO;
1090   default:
1091     GNUNET_break (0);
1092     return GNUNET_NO;
1093   }
1094   GNUNET_STATISTICS_update (GDS_stats,
1095                             gettext_noop ("# RESULTS queued for clients"),
1096                             1,
1097                             GNUNET_NO);
1098   env = GNUNET_MQ_msg_extra (reply,
1099                              frc->data_size +
1100                              (frc->get_path_length + frc->put_path_length) * sizeof (struct GNUNET_PeerIdentity),
1101                              GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1102   reply->type = htonl (frc->type);
1103   reply->get_path_length = htonl (frc->get_path_length);
1104   reply->put_path_length = htonl (frc->put_path_length);
1105   reply->unique_id = record->unique_id;
1106   reply->expiration = GNUNET_TIME_absolute_hton (frc->expiration);
1107   reply->key = *key;
1108   paths = (struct GNUNET_PeerIdentity *) &reply[1];
1109   GNUNET_memcpy (paths,
1110                  frc->put_path,
1111                  sizeof (struct GNUNET_PeerIdentity) * frc->put_path_length);
1112   GNUNET_memcpy (&paths[frc->put_path_length],
1113                  frc->get_path,
1114                  sizeof (struct GNUNET_PeerIdentity) * frc->get_path_length);
1115   GNUNET_memcpy (&paths[frc->get_path_length + frc->put_path_length],
1116                  frc->data,
1117                  frc->data_size);
1118   LOG (GNUNET_ERROR_TYPE_DEBUG,
1119        "Sending reply to query %s for client %p\n",
1120        GNUNET_h2s (key),
1121        record->ch->client);
1122   GNUNET_MQ_send (record->ch->mq,
1123                   env);
1124   if (GNUNET_YES == do_free)
1125     remove_client_record (record);
1126   return GNUNET_YES;
1127 }
1128
1129
1130 /**
1131  * Handle a reply we've received from another peer.  If the reply
1132  * matches any of our pending queries, forward it to the respective
1133  * client(s).
1134  *
1135  * @param expiration when will the reply expire
1136  * @param key the query this reply is for
1137  * @param get_path_length number of peers in @a get_path
1138  * @param get_path path the reply took on get
1139  * @param put_path_length number of peers in @a put_path
1140  * @param put_path path the reply took on put
1141  * @param type type of the reply
1142  * @param data_size number of bytes in @a data
1143  * @param data application payload data
1144  */
1145 void
1146 GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
1147                           const struct GNUNET_HashCode *key,
1148                           unsigned int get_path_length,
1149                           const struct GNUNET_PeerIdentity *get_path,
1150                           unsigned int put_path_length,
1151                           const struct GNUNET_PeerIdentity *put_path,
1152                           enum GNUNET_BLOCK_Type type,
1153                           size_t data_size,
1154                           const void *data)
1155 {
1156   struct ForwardReplyContext frc;
1157   size_t msize;
1158
1159   msize = sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size +
1160     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1161   if (msize >= GNUNET_MAX_MESSAGE_SIZE)
1162   {
1163     GNUNET_break (0);
1164     return;
1165   }
1166   if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map,
1167                                                  key))
1168   {
1169     LOG (GNUNET_ERROR_TYPE_DEBUG,
1170          "No matching client for reply for key %s\n",
1171          GNUNET_h2s (key));
1172     GNUNET_STATISTICS_update (GDS_stats,
1173                               gettext_noop ("# REPLIES ignored for CLIENTS (no match)"),
1174                               1,
1175                               GNUNET_NO);
1176     return;                     /* no matching request, fast exit! */
1177   }
1178   frc.expiration = expiration;
1179   frc.get_path = get_path;
1180   frc.put_path = put_path;
1181   frc.data = data;
1182   frc.data_size = data_size;
1183   frc.get_path_length = get_path_length;
1184   frc.put_path_length = put_path_length;
1185   frc.type = type;
1186   LOG (GNUNET_ERROR_TYPE_DEBUG,
1187        "Forwarding reply for key %s to client\n",
1188        GNUNET_h2s (key));
1189   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1190                                               key,
1191                                               &forward_reply,
1192                                               &frc);
1193
1194 }
1195
1196
1197 /**
1198  * Check if some client is monitoring GET messages and notify
1199  * them in that case.
1200  *
1201  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1202  * @param type The type of data in the request.
1203  * @param hop_count Hop count so far.
1204  * @param path_length number of entries in path (or 0 if not recorded).
1205  * @param path peers on the GET path (or NULL if not recorded).
1206  * @param desired_replication_level Desired replication level.
1207  * @param key Key of the requested data.
1208  */
1209 void
1210 GDS_CLIENTS_process_get (uint32_t options,
1211                          enum GNUNET_BLOCK_Type type,
1212                          uint32_t hop_count,
1213                          uint32_t desired_replication_level,
1214                          unsigned int path_length,
1215                          const struct GNUNET_PeerIdentity *path,
1216                          const struct GNUNET_HashCode * key)
1217 {
1218   struct ClientMonitorRecord *m;
1219   struct ClientHandle **cl;
1220   unsigned int cl_size;
1221
1222   cl = NULL;
1223   cl_size = 0;
1224   for (m = monitor_head; NULL != m; m = m->next)
1225   {
1226     if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1227            (m->type == type) ) &&
1228          ( (NULL == m->key) ||
1229            (0 == memcmp (key,
1230                          m->key,
1231                          sizeof(struct GNUNET_HashCode))) ) )
1232     {
1233       struct GNUNET_MQ_Envelope *env;
1234       struct GNUNET_DHT_MonitorGetMessage *mmsg;
1235       struct GNUNET_PeerIdentity *msg_path;
1236       size_t msize;
1237       unsigned int i;
1238
1239       /* Don't send duplicates */
1240       for (i = 0; i < cl_size; i++)
1241         if (cl[i] == m->ch)
1242           break;
1243       if (i < cl_size)
1244         continue;
1245       GNUNET_array_append (cl,
1246                            cl_size,
1247                            m->ch);
1248
1249       msize = path_length * sizeof (struct GNUNET_PeerIdentity);
1250       env = GNUNET_MQ_msg_extra (mmsg,
1251                                  msize,
1252                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1253       mmsg->options = htonl(options);
1254       mmsg->type = htonl(type);
1255       mmsg->hop_count = htonl(hop_count);
1256       mmsg->desired_replication_level = htonl(desired_replication_level);
1257       mmsg->get_path_length = htonl(path_length);
1258       mmsg->key = *key;
1259       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1260       GNUNET_memcpy (msg_path,
1261                      path,
1262                      path_length * sizeof (struct GNUNET_PeerIdentity));
1263       GNUNET_MQ_send (m->ch->mq,
1264                       env);
1265     }
1266   }
1267   GNUNET_free_non_null (cl);
1268 }
1269
1270
1271 /**
1272  * Check if some client is monitoring GET RESP messages and notify
1273  * them in that case.
1274  *
1275  * @param type The type of data in the result.
1276  * @param get_path Peers on GET path (or NULL if not recorded).
1277  * @param get_path_length number of entries in get_path.
1278  * @param put_path peers on the PUT path (or NULL if not recorded).
1279  * @param put_path_length number of entries in get_path.
1280  * @param exp Expiration time of the data.
1281  * @param key Key of the data.
1282  * @param data Pointer to the result data.
1283  * @param size Number of bytes in @a data.
1284  */
1285 void
1286 GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1287                               const struct GNUNET_PeerIdentity *get_path,
1288                               unsigned int get_path_length,
1289                               const struct GNUNET_PeerIdentity *put_path,
1290                               unsigned int put_path_length,
1291                               struct GNUNET_TIME_Absolute exp,
1292                               const struct GNUNET_HashCode * key,
1293                               const void *data,
1294                               size_t size)
1295 {
1296   struct ClientMonitorRecord *m;
1297   struct ClientHandle **cl;
1298   unsigned int cl_size;
1299
1300   cl = NULL;
1301   cl_size = 0;
1302   for (m = monitor_head; NULL != m; m = m->next)
1303   {
1304     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1305         (NULL == m->key ||
1306          memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1307     {
1308       struct GNUNET_MQ_Envelope *env;
1309       struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1310       struct GNUNET_PeerIdentity *path;
1311       size_t msize;
1312       unsigned int i;
1313
1314       /* Don't send duplicates */
1315       for (i = 0; i < cl_size; i++)
1316         if (cl[i] == m->ch)
1317           break;
1318       if (i < cl_size)
1319         continue;
1320       GNUNET_array_append (cl,
1321                            cl_size,
1322                            m->ch);
1323
1324       msize = size;
1325       msize += (get_path_length + put_path_length)
1326                * sizeof (struct GNUNET_PeerIdentity);
1327       env = GNUNET_MQ_msg_extra (mmsg,
1328                                  msize,
1329                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1330       mmsg->type = htonl(type);
1331       mmsg->put_path_length = htonl(put_path_length);
1332       mmsg->get_path_length = htonl(get_path_length);
1333       mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1334       mmsg->key = *key;
1335       path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1336       GNUNET_memcpy (path,
1337                      put_path,
1338                      put_path_length * sizeof (struct GNUNET_PeerIdentity));
1339       GNUNET_memcpy (path,
1340                      get_path,
1341                      get_path_length * sizeof (struct GNUNET_PeerIdentity));
1342       GNUNET_memcpy (&path[get_path_length],
1343                      data,
1344                      size);
1345       GNUNET_MQ_send (m->ch->mq,
1346                       env);
1347     }
1348   }
1349   GNUNET_free_non_null (cl);
1350 }
1351
1352
1353 /**
1354  * Check if some client is monitoring PUT messages and notify
1355  * them in that case.
1356  *
1357  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1358  * @param type The type of data in the request.
1359  * @param hop_count Hop count so far.
1360  * @param path_length number of entries in path (or 0 if not recorded).
1361  * @param path peers on the PUT path (or NULL if not recorded).
1362  * @param desired_replication_level Desired replication level.
1363  * @param exp Expiration time of the data.
1364  * @param key Key under which data is to be stored.
1365  * @param data Pointer to the data carried.
1366  * @param size Number of bytes in data.
1367  */
1368 void
1369 GDS_CLIENTS_process_put (uint32_t options,
1370                          enum GNUNET_BLOCK_Type type,
1371                          uint32_t hop_count,
1372                          uint32_t desired_replication_level,
1373                          unsigned int path_length,
1374                          const struct GNUNET_PeerIdentity *path,
1375                          struct GNUNET_TIME_Absolute exp,
1376                          const struct GNUNET_HashCode *key,
1377                          const void *data,
1378                          size_t size)
1379 {
1380   struct ClientMonitorRecord *m;
1381   struct ClientHandle **cl;
1382   unsigned int cl_size;
1383
1384   cl = NULL;
1385   cl_size = 0;
1386   for (m = monitor_head; NULL != m; m = m->next)
1387   {
1388     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1389         (NULL == m->key ||
1390          memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1391     {
1392       struct GNUNET_MQ_Envelope *env;
1393       struct GNUNET_DHT_MonitorPutMessage *mmsg;
1394       struct GNUNET_PeerIdentity *msg_path;
1395       size_t msize;
1396       unsigned int i;
1397
1398       /* Don't send duplicates */
1399       for (i = 0; i < cl_size; i++)
1400         if (cl[i] == m->ch)
1401           break;
1402       if (i < cl_size)
1403         continue;
1404       GNUNET_array_append (cl,
1405                            cl_size,
1406                            m->ch);
1407
1408       msize = size;
1409       msize += path_length * sizeof (struct GNUNET_PeerIdentity);
1410       env = GNUNET_MQ_msg_extra (mmsg,
1411                                  msize,
1412                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1413       mmsg->options = htonl(options);
1414       mmsg->type = htonl(type);
1415       mmsg->hop_count = htonl(hop_count);
1416       mmsg->desired_replication_level = htonl (desired_replication_level);
1417       mmsg->put_path_length = htonl (path_length);
1418       mmsg->key = *key;
1419       mmsg->expiration_time = GNUNET_TIME_absolute_hton (exp);
1420       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1421       GNUNET_memcpy (msg_path,
1422                      path,
1423                      path_length * sizeof (struct GNUNET_PeerIdentity));
1424       GNUNET_memcpy (&msg_path[path_length],
1425                      data,
1426                      size);
1427       GNUNET_MQ_send (m->ch->mq,
1428                       env);
1429     }
1430   }
1431   GNUNET_free_non_null (cl);
1432 }
1433
1434
1435 /**
1436  * Initialize client subsystem.
1437  *
1438  * @param server the initialized server
1439  */
1440 static void
1441 GDS_CLIENTS_init ()
1442 {
1443   forward_map
1444     = GNUNET_CONTAINER_multihashmap_create (1024,
1445                                             GNUNET_YES);
1446   retry_heap
1447     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1448 }
1449
1450
1451 /**
1452  * Shutdown client subsystem.
1453  */
1454 static void
1455 GDS_CLIENTS_stop ()
1456 {
1457   if (NULL != retry_task)
1458   {
1459     GNUNET_SCHEDULER_cancel (retry_task);
1460     retry_task = NULL;
1461   }
1462 }
1463
1464
1465 /**
1466  * Define "main" method using service macro.
1467  *
1468  * @param name name of the service, i.e. "dht" or "xdht"
1469  * @param run name of the initializaton method for the service
1470  */
1471 #define GDS_DHT_SERVICE_INIT(name,run)          \
1472  GNUNET_SERVICE_MAIN \
1473   (name, \
1474   GNUNET_SERVICE_OPTION_NONE, \
1475   run, \
1476   &client_connect_cb, \
1477   &client_disconnect_cb, \
1478   NULL, \
1479   GNUNET_MQ_hd_var_size (dht_local_put, \
1480                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1481                          struct GNUNET_DHT_ClientPutMessage, \
1482                          NULL), \
1483   GNUNET_MQ_hd_var_size (dht_local_get, \
1484                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1485                          struct GNUNET_DHT_ClientGetMessage, \
1486                          NULL), \
1487   GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \
1488                           GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1489                           struct GNUNET_DHT_ClientGetStopMessage, \
1490                           NULL), \
1491   GNUNET_MQ_hd_fixed_size (dht_local_monitor, \
1492                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1493                            struct GNUNET_DHT_MonitorStartStopMessage, \
1494                            NULL), \
1495   GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \
1496                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1497                            struct GNUNET_DHT_MonitorStartStopMessage, \
1498                            NULL), \
1499   GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \
1500                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1501                          struct GNUNET_DHT_ClientGetResultSeenMessage , \
1502                          NULL), \
1503   GNUNET_MQ_handler_end ())
1504
1505
1506 /**
1507  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1508  */
1509 void __attribute__ ((destructor))
1510 GDS_CLIENTS_done ()
1511 {
1512   if (NULL != retry_heap)
1513   {
1514     GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1515     GNUNET_CONTAINER_heap_destroy (retry_heap);
1516     retry_heap = NULL;
1517   }
1518   if (NULL != forward_map)
1519   {
1520     GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
1521     GNUNET_CONTAINER_multihashmap_destroy (forward_map);
1522     forward_map = NULL;
1523   }
1524 }
1525
1526 /* end of gnunet-service-dht_clients.c */