More API function tests...
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016, 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-service-dht.h"
33 #include "gnunet-service-dht_datacache.h"
34 #include "gnunet-service-dht_neighbours.h"
35 #include "dht.h"
36
37
38 /**
39  * Should routing details be logged to stderr (for debugging)?
40  */
41 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "dht-clients",__VA_ARGS__)
44
45
46 /**
47  * Struct containing information about a client,
48  * handle to connect to it, and any pending messages
49  * that need to be sent to it.
50  */
51 struct ClientHandle;
52
53
54 /**
55  * Entry in the local forwarding map for a client's GET request.
56  */
57 struct ClientQueryRecord
58 {
59
60   /**
61    * The key this request was about
62    */
63   struct GNUNET_HashCode key;
64
65   /**
66    * Kept in a DLL with @e client.
67    */
68   struct ClientQueryRecord *next;
69
70   /**
71    * Kept in a DLL with @e client.
72    */
73   struct ClientQueryRecord *prev;
74
75   /**
76    * Client responsible for the request.
77    */
78   struct ClientHandle *ch;
79
80   /**
81    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
82    */
83   const void *xquery;
84
85   /**
86    * Replies we have already seen for this request.
87    */
88   struct GNUNET_HashCode *seen_replies;
89
90   /**
91    * Pointer to this nodes heap location in the retry-heap (for fast removal)
92    */
93   struct GNUNET_CONTAINER_HeapNode *hnode;
94
95   /**
96    * What's the delay between re-try operations that we currently use for this
97    * request?
98    */
99   struct GNUNET_TIME_Relative retry_frequency;
100
101   /**
102    * What's the next time we should re-try this request?
103    */
104   struct GNUNET_TIME_Absolute retry_time;
105
106   /**
107    * The unique identifier of this request
108    */
109   uint64_t unique_id;
110
111   /**
112    * Number of bytes in xquery.
113    */
114   size_t xquery_size;
115
116   /**
117    * Number of entries in 'seen_replies'.
118    */
119   unsigned int seen_replies_count;
120
121   /**
122    * Desired replication level
123    */
124   uint32_t replication;
125
126   /**
127    * Any message options for this request
128    */
129   uint32_t msg_options;
130
131   /**
132    * The type for the data for the GET request.
133    */
134   enum GNUNET_BLOCK_Type type;
135
136 };
137
138
139 /**
140  * Struct containing paremeters of monitoring requests.
141  */
142 struct ClientMonitorRecord
143 {
144
145   /**
146    * Next element in DLL.
147    */
148   struct ClientMonitorRecord *next;
149
150   /**
151    * Previous element in DLL.
152    */
153   struct ClientMonitorRecord *prev;
154
155   /**
156    * Type of blocks that are of interest
157    */
158   enum GNUNET_BLOCK_Type type;
159
160   /**
161    * Key of data of interest, NULL for all.
162    */
163   struct GNUNET_HashCode *key;
164
165   /**
166    * Flag whether to notify about GET messages.
167    */
168   int16_t get;
169
170   /**
171    * Flag whether to notify about GET_REPONSE messages.
172    */
173   int16_t get_resp;
174
175   /**
176    * Flag whether to notify about PUT messages.
177    */
178   uint16_t put;
179
180   /**
181    * Client to notify of these requests.
182    */
183   struct ClientHandle *ch;
184 };
185
186
187 /**
188  * Struct containing information about a client,
189  * handle to connect to it, and any pending messages
190  * that need to be sent to it.
191  */
192 struct ClientHandle
193 {
194   /**
195    * Linked list of active queries of this client.
196    */
197   struct ClientQueryRecord *cqr_head;
198
199   /**
200    * Linked list of active queries of this client.
201    */
202   struct ClientQueryRecord *cqr_tail;
203
204   /**
205    * The handle to this client
206    */
207   struct GNUNET_SERVICE_Client *client;
208
209   /**
210    * The message queue to this client
211    */
212   struct GNUNET_MQ_Handle *mq;
213
214 };
215
216 /**
217  * Our handle to the BLOCK library.
218  */
219 struct GNUNET_BLOCK_Context *GDS_block_context;
220
221 /**
222  * Handle for the statistics service.
223  */
224 struct GNUNET_STATISTICS_Handle *GDS_stats;
225
226 /**
227  * Handle for the service.
228  */
229 struct GNUNET_SERVICE_Handle *GDS_service;
230
231 /**
232  * The configuration the DHT service is running with
233  */
234 const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
235
236 /**
237  * List of active monitoring requests.
238  */
239 static struct ClientMonitorRecord *monitor_head;
240
241 /**
242  * List of active monitoring requests.
243  */
244 static struct ClientMonitorRecord *monitor_tail;
245
246 /**
247  * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
248  */
249 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
250
251 /**
252  * Heap with all of our client's request, sorted by retry time (earliest on top).
253  */
254 static struct GNUNET_CONTAINER_Heap *retry_heap;
255
256 /**
257  * Task that re-transmits requests (using retry_heap).
258  */
259 static struct GNUNET_SCHEDULER_Task *retry_task;
260
261
262 /**
263  * Free data structures associated with the given query.
264  *
265  * @param record record to remove
266  */
267 static void
268 remove_client_record (struct ClientQueryRecord *record)
269 {
270   struct ClientHandle *ch = record->ch;
271
272   GNUNET_CONTAINER_DLL_remove (ch->cqr_head,
273                                ch->cqr_tail,
274                                record);
275   GNUNET_assert (GNUNET_YES ==
276                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
277                                                        &record->key,
278                                                        record));
279   if (NULL != record->hnode)
280     GNUNET_CONTAINER_heap_remove_node (record->hnode);
281   GNUNET_array_grow (record->seen_replies,
282                      record->seen_replies_count,
283                      0);
284   GNUNET_free (record);
285 }
286
287
288 /**
289  * Functions with this signature are called whenever a local client is
290  * connects to us.
291  *
292  * @param cls closure (NULL for dht)
293  * @param client identification of the client
294  * @param mq message queue for talking to @a client
295  * @return our `struct ClientHandle` for @a client
296  */
297 static void *
298 client_connect_cb (void *cls,
299                    struct GNUNET_SERVICE_Client *client,
300                    struct GNUNET_MQ_Handle *mq)
301 {
302   struct ClientHandle *ch;
303
304   ch = GNUNET_new (struct ClientHandle);
305   ch->client = client;
306   ch->mq = mq;
307   return ch;
308 }
309
310
311 /**
312  * Functions with this signature are called whenever a client
313  * is disconnected on the network level.
314  *
315  * @param cls closure (NULL for dht)
316  * @param client identification of the client
317  * @param app_ctx our `struct ClientHandle` for @a client
318  */
319 static void
320 client_disconnect_cb (void *cls,
321                       struct GNUNET_SERVICE_Client *client,
322                       void *app_ctx)
323 {
324   struct ClientHandle *ch = app_ctx;
325   struct ClientQueryRecord *cqr;
326   struct ClientMonitorRecord *monitor;
327
328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329               "Local client %p disconnects\n",
330               ch);
331   monitor = monitor_head;
332   while (NULL != monitor)
333   {
334     if (monitor->ch == ch)
335     {
336       struct ClientMonitorRecord *next;
337
338       next = monitor->next;
339       GNUNET_free_non_null (monitor->key);
340       GNUNET_CONTAINER_DLL_remove (monitor_head,
341                                    monitor_tail,
342                                    monitor);
343       GNUNET_free (monitor);
344       monitor = next;
345     }
346     else
347     {
348       monitor = monitor->next;
349     }
350   }
351   while (NULL != (cqr = ch->cqr_head))
352     remove_client_record (cqr);
353   GNUNET_free (ch);
354 }
355
356
357 /**
358  * Route the given request via the DHT.  This includes updating
359  * the bloom filter and retransmission times, building the P2P
360  * message and initiating the routing operation.
361  */
362 static void
363 transmit_request (struct ClientQueryRecord *cqr)
364 {
365   struct GNUNET_BLOCK_Group *bg;
366   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
367
368   GNUNET_STATISTICS_update (GDS_stats,
369                             gettext_noop ("# GET requests from clients injected"),
370                             1,
371                             GNUNET_NO);
372   bg = GNUNET_BLOCK_group_create (GDS_block_context,
373                                   cqr->type,
374                                   GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
375                                                             UINT32_MAX),
376                                   NULL,
377                                   0);
378   GNUNET_BLOCK_group_set_seen (bg,
379                                cqr->seen_replies,
380                                cqr->seen_replies_count);
381   peer_bf
382     = GNUNET_CONTAINER_bloomfilter_init (NULL,
383                                          DHT_BLOOM_SIZE,
384                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
385   LOG (GNUNET_ERROR_TYPE_DEBUG,
386        "Initiating GET for %s, replication %u, already have %u replies\n",
387        GNUNET_h2s (&cqr->key),
388        cqr->replication,
389        cqr->seen_replies_count);
390   GDS_NEIGHBOURS_handle_get (cqr->type,
391                              cqr->msg_options,
392                              cqr->replication,
393                              0 /* hop count */ ,
394                              &cqr->key,
395                              cqr->xquery,
396                              cqr->xquery_size,
397                              bg,
398                              peer_bf);
399   GNUNET_BLOCK_group_destroy (bg);
400   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
401
402   /* exponential back-off for retries.
403    * max GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
404   cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
405   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
406 }
407
408
409 /**
410  * Task that looks at the #retry_heap and transmits all of the requests
411  * on the heap that are ready for transmission.  Then re-schedules
412  * itself (unless the heap is empty).
413  *
414  * @param cls unused
415  */
416 static void
417 transmit_next_request_task (void *cls)
418 {
419   struct ClientQueryRecord *cqr;
420   struct GNUNET_TIME_Relative delay;
421
422   retry_task = NULL;
423   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
424   {
425     cqr->hnode = NULL;
426     delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
427     if (delay.rel_value_us > 0)
428     {
429       cqr->hnode
430         = GNUNET_CONTAINER_heap_insert (retry_heap,
431                                         cqr,
432                                         cqr->retry_time.abs_value_us);
433       retry_task
434         = GNUNET_SCHEDULER_add_at (cqr->retry_time,
435                                    &transmit_next_request_task,
436                                    NULL);
437       return;
438     }
439     transmit_request (cqr);
440     cqr->hnode
441       = GNUNET_CONTAINER_heap_insert (retry_heap,
442                                       cqr,
443                                       cqr->retry_time.abs_value_us);
444   }
445 }
446
447
448 /**
449  * Check DHT PUT messages from the client.
450  *
451  * @param cls the client we received this message from
452  * @param dht_msg the actual message received
453  * @return #GNUNET_OK (always)
454  */
455 static int
456 check_dht_local_put (void *cls,
457                       const struct GNUNET_DHT_ClientPutMessage *dht_msg)
458 {
459   /* always well-formed */
460   return GNUNET_OK;
461 }
462
463
464 /**
465  * Handler for PUT messages.
466  *
467  * @param cls the client we received this message from
468  * @param dht_msg the actual message received
469  */
470 static void
471 handle_dht_local_put (void *cls,
472                       const struct GNUNET_DHT_ClientPutMessage *dht_msg)
473 {
474   struct ClientHandle *ch = cls;
475   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
476   uint16_t size;
477   struct GNUNET_MQ_Envelope *env;
478   struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
479
480   size = ntohs (dht_msg->header.size);
481   GNUNET_STATISTICS_update (GDS_stats,
482                             gettext_noop ("# PUT requests received from clients"),
483                             1,
484                             GNUNET_NO);
485   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
486                "CLIENT-PUT %s\n",
487                GNUNET_h2s_full (&dht_msg->key));
488   /* give to local clients */
489   LOG (GNUNET_ERROR_TYPE_DEBUG,
490        "Handling local PUT of %u-bytes for query %s\n",
491        size - sizeof (struct GNUNET_DHT_ClientPutMessage),
492        GNUNET_h2s (&dht_msg->key));
493   GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
494                             &dht_msg->key,
495                             0,
496                             NULL,
497                             0,
498                             NULL,
499                             ntohl (dht_msg->type),
500                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
501                             &dht_msg[1]);
502   /* store locally */
503   GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
504                             &dht_msg->key,
505                             0,
506                             NULL,
507                             ntohl (dht_msg->type),
508                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
509                             &dht_msg[1]);
510   /* route to other peers */
511   peer_bf
512     = GNUNET_CONTAINER_bloomfilter_init (NULL,
513                                          DHT_BLOOM_SIZE,
514                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
515   GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
516                              ntohl (dht_msg->options),
517                              ntohl (dht_msg->desired_replication_level),
518                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
519                              0 /* hop count */,
520                              peer_bf,
521                              &dht_msg->key,
522                              0,
523                              NULL,
524                              &dht_msg[1],
525                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
526   GDS_CLIENTS_process_put (ntohl (dht_msg->options),
527                            ntohl (dht_msg->type),
528                            0,
529                            ntohl (dht_msg->desired_replication_level),
530                            1,
531                            GDS_NEIGHBOURS_get_id(),
532                            GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
533                            &dht_msg->key,
534                            &dht_msg[1],
535                            size - sizeof (struct GNUNET_DHT_ClientPutMessage));
536   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
537   env = GNUNET_MQ_msg (conf,
538                        GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
539   conf->reserved = htonl (0);
540   conf->unique_id = dht_msg->unique_id;
541   GNUNET_MQ_send (ch->mq,
542                   env);
543   GNUNET_SERVICE_client_continue (ch->client);
544 }
545
546
547 /**
548  * Check DHT GET messages from the client.
549  *
550  * @param cls the client we received this message from
551  * @param message the actual message received
552  * @return #GNUNET_OK (always)
553  */
554 static int
555 check_dht_local_get (void *cls,
556                       const struct GNUNET_DHT_ClientGetMessage *get)
557 {
558   /* always well-formed */
559   return GNUNET_OK;
560 }
561
562
563 /**
564  * Handle a result from local datacache for a GET operation.
565  *
566  * @param cls the `struct ClientHandle` of the client doing the query
567  * @param type type of the block
568  * @param expiration_time when does the content expire
569  * @param key key for the content
570  * @param put_path_length number of entries in @a put_path
571  * @param put_path peers the original PUT traversed (if tracked)
572  * @param get_path_length number of entries in @a get_path
573  * @param get_path peers this reply has traversed so far (if tracked)
574  * @param data payload of the reply
575  * @param data_size number of bytes in @a data
576  */
577 static void
578 handle_local_result (void *cls,
579                      enum GNUNET_BLOCK_Type type,
580                      struct GNUNET_TIME_Absolute expiration_time,
581                      const struct GNUNET_HashCode *key,
582                      unsigned int put_path_length,
583                      const struct GNUNET_PeerIdentity *put_path,
584                      unsigned int get_path_length,
585                      const struct GNUNET_PeerIdentity *get_path,
586                      const void *data,
587                      size_t data_size)
588 {
589   // FIXME: this needs some clean up: inline the function,
590   // possibly avoid even looking up the client!
591   GDS_CLIENTS_handle_reply (expiration_time,
592                             key,
593                             0, NULL,
594                             put_path_length, put_path,
595                             type,
596                             data_size, data);
597 }
598
599
600 /**
601  * Handler for DHT GET messages from the client.
602  *
603  * @param cls the client we received this message from
604  * @param message the actual message received
605  */
606 static void
607 handle_dht_local_get (void *cls,
608                       const struct GNUNET_DHT_ClientGetMessage *get)
609 {
610   struct ClientHandle *ch = cls;
611   struct ClientQueryRecord *cqr;
612   size_t xquery_size;
613   const char *xquery;
614   uint16_t size;
615
616   size = ntohs (get->header.size);
617   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
618   xquery = (const char *) &get[1];
619   GNUNET_STATISTICS_update (GDS_stats,
620                             gettext_noop
621                             ("# GET requests received from clients"), 1,
622                             GNUNET_NO);
623   LOG (GNUNET_ERROR_TYPE_DEBUG,
624        "Received GET request for %s from local client %p, xq: %.*s\n",
625        GNUNET_h2s (&get->key),
626        ch->client,
627        xquery_size,
628        xquery);
629   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
630                "CLIENT-GET %s\n",
631                GNUNET_h2s_full (&get->key));
632
633   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
634   cqr->key = get->key;
635   cqr->ch = ch;
636   cqr->xquery = (void *) &cqr[1];
637   GNUNET_memcpy (&cqr[1], xquery, xquery_size);
638   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0);
639   cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
640   cqr->retry_time = GNUNET_TIME_absolute_get ();
641   cqr->unique_id = get->unique_id;
642   cqr->xquery_size = xquery_size;
643   cqr->replication = ntohl (get->desired_replication_level);
644   cqr->msg_options = ntohl (get->options);
645   cqr->type = ntohl (get->type);
646   GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
647                                ch->cqr_tail,
648                                cqr);
649   GNUNET_CONTAINER_multihashmap_put (forward_map,
650                                      &cqr->key,
651                                      cqr,
652                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
653   GDS_CLIENTS_process_get (ntohl (get->options),
654                            ntohl (get->type),
655                            0,
656                            ntohl (get->desired_replication_level),
657                            1,
658                            GDS_NEIGHBOURS_get_id(),
659                            &get->key);
660   /* start remote requests */
661   if (NULL != retry_task)
662     GNUNET_SCHEDULER_cancel (retry_task);
663   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task,
664                                          NULL);
665   /* perform local lookup */
666   GDS_DATACACHE_handle_get (&get->key,
667                             cqr->type,
668                             cqr->xquery,
669                             xquery_size,
670                             NULL,
671                             &handle_local_result,
672                             ch);
673   GNUNET_SERVICE_client_continue (ch->client);
674 }
675
676
677 /**
678  * Closure for #find_by_unique_id().
679  */
680 struct FindByUniqueIdContext
681 {
682   /**
683    * Where to store the result, if found.
684    */
685   struct ClientQueryRecord *cqr;
686
687   uint64_t unique_id;
688 };
689
690
691 /**
692  * Function called for each existing DHT record for the given
693  * query.  Checks if it matches the UID given in the closure
694  * and if so returns the entry as a result.
695  *
696  * @param cls the search context
697  * @param key query for the lookup (not used)
698  * @param value the `struct ClientQueryRecord`
699  * @return #GNUNET_YES to continue iteration (result not yet found)
700  */
701 static int
702 find_by_unique_id (void *cls,
703                    const struct GNUNET_HashCode *key,
704                    void *value)
705 {
706   struct FindByUniqueIdContext *fui_ctx = cls;
707   struct ClientQueryRecord *cqr = value;
708
709   if (cqr->unique_id != fui_ctx->unique_id)
710     return GNUNET_YES;
711   fui_ctx->cqr = cqr;
712   return GNUNET_NO;
713 }
714
715
716 /**
717  * Check "GET result seen" messages from the client.
718  *
719  * @param cls the client we received this message from
720  * @param message the actual message received
721  * @return #GNUNET_OK if @a seen is well-formed
722  */
723 static int
724 check_dht_local_get_result_seen (void *cls,
725                                  const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
726 {
727   uint16_t size;
728   unsigned int hash_count;
729
730   size = ntohs (seen->header.size);
731   hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
732   if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode))
733   {
734     GNUNET_break (0);
735     return GNUNET_SYSERR;
736   }
737   return GNUNET_OK;
738 }
739
740
741 /**
742  * Handler for "GET result seen" messages from the client.
743  *
744  * @param cls the client we received this message from
745  * @param message the actual message received
746  */
747 static void
748 handle_dht_local_get_result_seen (void *cls,
749                                   const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
750 {
751   struct ClientHandle *ch = cls;
752   uint16_t size;
753   unsigned int hash_count;
754   unsigned int old_count;
755   const struct GNUNET_HashCode *hc;
756   struct FindByUniqueIdContext fui_ctx;
757   struct ClientQueryRecord *cqr;
758
759   size = ntohs (seen->header.size);
760   hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
761   hc = (const struct GNUNET_HashCode*) &seen[1];
762   fui_ctx.unique_id = seen->unique_id;
763   fui_ctx.cqr = NULL;
764   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
765                                               &seen->key,
766                                               &find_by_unique_id,
767                                               &fui_ctx);
768   if (NULL == (cqr = fui_ctx.cqr))
769   {
770     GNUNET_break (0);
771     GNUNET_SERVICE_client_drop (ch->client);
772     return;
773   }
774   /* finally, update 'seen' list */
775   old_count = cqr->seen_replies_count;
776   GNUNET_array_grow (cqr->seen_replies,
777                      cqr->seen_replies_count,
778                      cqr->seen_replies_count + hash_count);
779   GNUNET_memcpy (&cqr->seen_replies[old_count],
780                  hc,
781                  sizeof (struct GNUNET_HashCode) * hash_count);
782 }
783
784
785 /**
786  * Closure for #remove_by_unique_id().
787  */
788 struct RemoveByUniqueIdContext
789 {
790   /**
791    * Client that issued the removal request.
792    */
793   struct ClientHandle *ch;
794
795   /**
796    * Unique ID of the request.
797    */
798   uint64_t unique_id;
799 };
800
801
802 /**
803  * Iterator over hash map entries that frees all entries
804  * that match the given client and unique ID.
805  *
806  * @param cls unique ID and client to search for in source routes
807  * @param key current key code
808  * @param value value in the hash map, a ClientQueryRecord
809  * @return #GNUNET_YES (we should continue to iterate)
810  */
811 static int
812 remove_by_unique_id (void *cls,
813                      const struct GNUNET_HashCode *key,
814                      void *value)
815 {
816   const struct RemoveByUniqueIdContext *ctx = cls;
817   struct ClientQueryRecord *cqr = value;
818
819   if (cqr->unique_id != ctx->unique_id)
820     return GNUNET_YES;
821   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822               "Removing client %p's record for key %s (by unique id)\n",
823               ctx->ch->client,
824               GNUNET_h2s (key));
825   remove_client_record (cqr);
826   return GNUNET_YES;
827 }
828
829
830 /**
831  * Handler for any generic DHT stop messages, calls the appropriate handler
832  * depending on message type (if processed locally)
833  *
834  * @param cls client we received this message from
835  * @param message the actual message received
836  *
837  */
838 static void
839 handle_dht_local_get_stop (void *cls,
840                            const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
841 {
842   struct ClientHandle *ch = cls;
843   struct RemoveByUniqueIdContext ctx;
844
845   GNUNET_STATISTICS_update (GDS_stats,
846                             gettext_noop
847                             ("# GET STOP requests received from clients"), 1,
848                             GNUNET_NO);
849   LOG (GNUNET_ERROR_TYPE_DEBUG,
850        "Received GET STOP request for %s from local client %p\n",
851        GNUNET_h2s (&dht_stop_msg->key),
852        ch->client);
853   ctx.ch = ch;
854   ctx.unique_id = dht_stop_msg->unique_id;
855   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
856                                               &dht_stop_msg->key,
857                                               &remove_by_unique_id,
858                                               &ctx);
859   GNUNET_SERVICE_client_continue (ch->client);
860 }
861
862
863 /**
864  * Handler for monitor start messages
865  *
866  * @param cls the client we received this message from
867  * @param msg the actual message received
868  *
869  */
870 static void
871 handle_dht_local_monitor (void *cls,
872                           const struct GNUNET_DHT_MonitorStartStopMessage *msg)
873 {
874   struct ClientHandle *ch = cls;
875   struct ClientMonitorRecord *r;
876
877   r = GNUNET_new (struct ClientMonitorRecord);
878   r->ch = ch;
879   r->type = ntohl (msg->type);
880   r->get = ntohs (msg->get);
881   r->get_resp = ntohs (msg->get_resp);
882   r->put = ntohs (msg->put);
883   if (0 == ntohs (msg->filter_key))
884   {
885     r->key = NULL;
886   }
887   else
888   {
889     r->key = GNUNET_new (struct GNUNET_HashCode);
890     GNUNET_memcpy (r->key,
891                    &msg->key,
892                    sizeof (struct GNUNET_HashCode));
893   }
894   GNUNET_CONTAINER_DLL_insert (monitor_head,
895                                monitor_tail,
896                                r);
897   GNUNET_SERVICE_client_continue (ch->client);
898 }
899
900
901 /**
902  * Handler for monitor stop messages
903  *
904  * @param cls the client we received this message from
905  * @param msg the actual message received
906  */
907 static void
908 handle_dht_local_monitor_stop (void *cls,
909                                const struct GNUNET_DHT_MonitorStartStopMessage *msg)
910 {
911   struct ClientHandle *ch = cls;
912   struct ClientMonitorRecord *r;
913   int keys_match;
914
915   GNUNET_SERVICE_client_continue (ch->client);
916   for (r = monitor_head; NULL != r; r = r->next)
917   {
918     if (NULL == r->key)
919     {
920       keys_match = (0 == ntohs(msg->filter_key));
921     }
922     else
923     {
924       keys_match = ( (0 != ntohs(msg->filter_key)) &&
925                      (! memcmp (r->key,
926                                 &msg->key,
927                                 sizeof(struct GNUNET_HashCode))) );
928     }
929     if ( (ch == r->ch) &&
930          (ntohl(msg->type) == r->type) &&
931          (r->get == msg->get) &&
932          (r->get_resp == msg->get_resp) &&
933          (r->put == msg->put) &&
934          keys_match )
935     {
936       GNUNET_CONTAINER_DLL_remove (monitor_head,
937                                    monitor_tail,
938                                    r);
939       GNUNET_free_non_null (r->key);
940       GNUNET_free (r);
941       return; /* Delete only ONE entry */
942     }
943   }
944 }
945
946
947 /**
948  * Closure for #forward_reply()
949  */
950 struct ForwardReplyContext
951 {
952
953   /**
954    * Expiration time of the reply.
955    */
956   struct GNUNET_TIME_Absolute expiration;
957
958   /**
959    * GET path taken.
960    */
961   const struct GNUNET_PeerIdentity *get_path;
962
963   /**
964    * PUT path taken.
965    */
966   const struct GNUNET_PeerIdentity *put_path;
967
968   /**
969    * Embedded payload.
970    */
971   const void *data;
972
973   /**
974    * Number of bytes in data.
975    */
976   size_t data_size;
977
978   /**
979    * Number of entries in @e get_path.
980    */
981   unsigned int get_path_length;
982
983   /**
984    * Number of entries in @e put_path.
985    */
986   unsigned int put_path_length;
987
988   /**
989    * Type of the data.
990    */
991   enum GNUNET_BLOCK_Type type;
992
993 };
994
995
996 /**
997  * Iterator over hash map entries that send a given reply to
998  * each of the matching clients.  With some tricky recycling
999  * of the buffer.
1000  *
1001  * @param cls the 'struct ForwardReplyContext'
1002  * @param key current key
1003  * @param value value in the hash map, a ClientQueryRecord
1004  * @return #GNUNET_YES (we should continue to iterate),
1005  *         if the result is mal-formed, #GNUNET_NO
1006  */
1007 static int
1008 forward_reply (void *cls,
1009                const struct GNUNET_HashCode *key,
1010                void *value)
1011 {
1012   struct ForwardReplyContext *frc = cls;
1013   struct ClientQueryRecord *record = value;
1014   struct GNUNET_MQ_Envelope *env;
1015   struct GNUNET_DHT_ClientResultMessage *reply;
1016   enum GNUNET_BLOCK_EvaluationResult eval;
1017   int do_free;
1018   struct GNUNET_HashCode ch;
1019   struct GNUNET_PeerIdentity *paths;
1020
1021   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1022                "CLIENT-RESULT %s\n",
1023                GNUNET_h2s_full (key));
1024   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
1025        (record->type != frc->type))
1026   {
1027     LOG (GNUNET_ERROR_TYPE_DEBUG,
1028          "Record type missmatch, not passing request for key %s to local client\n",
1029          GNUNET_h2s (key));
1030     GNUNET_STATISTICS_update (GDS_stats,
1031                               gettext_noop
1032                               ("# Key match, type mismatches in REPLY to CLIENT"),
1033                               1, GNUNET_NO);
1034     return GNUNET_YES;          /* type mismatch */
1035   }
1036   GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch);
1037   for (unsigned int i = 0; i < record->seen_replies_count; i++)
1038     if (0 == memcmp (&record->seen_replies[i],
1039                      &ch,
1040                      sizeof (struct GNUNET_HashCode)))
1041     {
1042       LOG (GNUNET_ERROR_TYPE_DEBUG,
1043            "Duplicate reply, not passing request for key %s to local client\n",
1044            GNUNET_h2s (key));
1045       GNUNET_STATISTICS_update (GDS_stats,
1046                                 gettext_noop
1047                                 ("# Duplicate REPLIES to CLIENT request dropped"),
1048                                 1, GNUNET_NO);
1049       return GNUNET_YES;        /* duplicate */
1050     }
1051   eval
1052     = GNUNET_BLOCK_evaluate (GDS_block_context,
1053                              record->type,
1054                              NULL,
1055                              GNUNET_BLOCK_EO_NONE,
1056                              key,
1057                              record->xquery,
1058                              record->xquery_size,
1059                              frc->data,
1060                              frc->data_size);
1061   LOG (GNUNET_ERROR_TYPE_DEBUG,
1062        "Evaluation result is %d for key %s for local client's query\n",
1063        (int) eval,
1064        GNUNET_h2s (key));
1065   switch (eval)
1066   {
1067   case GNUNET_BLOCK_EVALUATION_OK_LAST:
1068     do_free = GNUNET_YES;
1069     break;
1070   case GNUNET_BLOCK_EVALUATION_OK_MORE:
1071     GNUNET_array_append (record->seen_replies,
1072                          record->seen_replies_count,
1073                          ch);
1074     do_free = GNUNET_NO;
1075     break;
1076   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1077     /* should be impossible to encounter here */
1078     GNUNET_break (0);
1079     return GNUNET_YES;
1080   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1081     GNUNET_break_op (0);
1082     return GNUNET_NO;
1083   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1084     GNUNET_break (0);
1085     return GNUNET_NO;
1086   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1087     GNUNET_break (0);
1088     return GNUNET_NO;
1089   case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1090     return GNUNET_YES;
1091   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1092     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1093                 _("Unsupported block type (%u) in request!\n"), record->type);
1094     return GNUNET_NO;
1095   default:
1096     GNUNET_break (0);
1097     return GNUNET_NO;
1098   }
1099   GNUNET_STATISTICS_update (GDS_stats,
1100                             gettext_noop ("# RESULTS queued for clients"),
1101                             1,
1102                             GNUNET_NO);
1103   env = GNUNET_MQ_msg_extra (reply,
1104                              frc->data_size +
1105                              (frc->get_path_length + frc->put_path_length) * sizeof (struct GNUNET_PeerIdentity),
1106                              GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1107   reply->type = htonl (frc->type);
1108   reply->get_path_length = htonl (frc->get_path_length);
1109   reply->put_path_length = htonl (frc->put_path_length);
1110   reply->unique_id = record->unique_id;
1111   reply->expiration = GNUNET_TIME_absolute_hton (frc->expiration);
1112   reply->key = *key;
1113   paths = (struct GNUNET_PeerIdentity *) &reply[1];
1114   GNUNET_memcpy (paths,
1115                  frc->put_path,
1116                  sizeof (struct GNUNET_PeerIdentity) * frc->put_path_length);
1117   GNUNET_memcpy (&paths[frc->put_path_length],
1118                  frc->get_path,
1119                  sizeof (struct GNUNET_PeerIdentity) * frc->get_path_length);
1120   GNUNET_memcpy (&paths[frc->get_path_length + frc->put_path_length],
1121                  frc->data,
1122                  frc->data_size);
1123   LOG (GNUNET_ERROR_TYPE_DEBUG,
1124        "Sending reply to query %s for client %p\n",
1125        GNUNET_h2s (key),
1126        record->ch->client);
1127   GNUNET_MQ_send (record->ch->mq,
1128                   env);
1129   if (GNUNET_YES == do_free)
1130     remove_client_record (record);
1131   return GNUNET_YES;
1132 }
1133
1134
1135 /**
1136  * Handle a reply we've received from another peer.  If the reply
1137  * matches any of our pending queries, forward it to the respective
1138  * client(s).
1139  *
1140  * @param expiration when will the reply expire
1141  * @param key the query this reply is for
1142  * @param get_path_length number of peers in @a get_path
1143  * @param get_path path the reply took on get
1144  * @param put_path_length number of peers in @a put_path
1145  * @param put_path path the reply took on put
1146  * @param type type of the reply
1147  * @param data_size number of bytes in @a data
1148  * @param data application payload data
1149  */
1150 void
1151 GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
1152                           const struct GNUNET_HashCode *key,
1153                           unsigned int get_path_length,
1154                           const struct GNUNET_PeerIdentity *get_path,
1155                           unsigned int put_path_length,
1156                           const struct GNUNET_PeerIdentity *put_path,
1157                           enum GNUNET_BLOCK_Type type,
1158                           size_t data_size,
1159                           const void *data)
1160 {
1161   struct ForwardReplyContext frc;
1162   size_t msize;
1163
1164   msize = sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size +
1165     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1166   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1167   {
1168     GNUNET_break (0);
1169     return;
1170   }
1171   if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map,
1172                                                  key))
1173   {
1174     LOG (GNUNET_ERROR_TYPE_DEBUG,
1175          "No matching client for reply for key %s\n",
1176          GNUNET_h2s (key));
1177     GNUNET_STATISTICS_update (GDS_stats,
1178                               gettext_noop ("# REPLIES ignored for CLIENTS (no match)"),
1179                               1,
1180                               GNUNET_NO);
1181     return;                     /* no matching request, fast exit! */
1182   }
1183   frc.expiration = expiration;
1184   frc.get_path = get_path;
1185   frc.put_path = put_path;
1186   frc.data = data;
1187   frc.data_size = data_size;
1188   frc.get_path_length = get_path_length;
1189   frc.put_path_length = put_path_length;
1190   frc.type = type;
1191   LOG (GNUNET_ERROR_TYPE_DEBUG,
1192        "Forwarding reply for key %s to client\n",
1193        GNUNET_h2s (key));
1194   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1195                                               key,
1196                                               &forward_reply,
1197                                               &frc);
1198
1199 }
1200
1201
1202 /**
1203  * Check if some client is monitoring GET messages and notify
1204  * them in that case.
1205  *
1206  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1207  * @param type The type of data in the request.
1208  * @param hop_count Hop count so far.
1209  * @param path_length number of entries in path (or 0 if not recorded).
1210  * @param path peers on the GET path (or NULL if not recorded).
1211  * @param desired_replication_level Desired replication level.
1212  * @param key Key of the requested data.
1213  */
1214 void
1215 GDS_CLIENTS_process_get (uint32_t options,
1216                          enum GNUNET_BLOCK_Type type,
1217                          uint32_t hop_count,
1218                          uint32_t desired_replication_level,
1219                          unsigned int path_length,
1220                          const struct GNUNET_PeerIdentity *path,
1221                          const struct GNUNET_HashCode * key)
1222 {
1223   struct ClientMonitorRecord *m;
1224   struct ClientHandle **cl;
1225   unsigned int cl_size;
1226
1227   cl = NULL;
1228   cl_size = 0;
1229   for (m = monitor_head; NULL != m; m = m->next)
1230   {
1231     if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1232            (m->type == type) ) &&
1233          ( (NULL == m->key) ||
1234            (0 == memcmp (key,
1235                          m->key,
1236                          sizeof(struct GNUNET_HashCode))) ) )
1237     {
1238       struct GNUNET_MQ_Envelope *env;
1239       struct GNUNET_DHT_MonitorGetMessage *mmsg;
1240       struct GNUNET_PeerIdentity *msg_path;
1241       size_t msize;
1242       unsigned int i;
1243
1244       /* Don't send duplicates */
1245       for (i = 0; i < cl_size; i++)
1246         if (cl[i] == m->ch)
1247           break;
1248       if (i < cl_size)
1249         continue;
1250       GNUNET_array_append (cl,
1251                            cl_size,
1252                            m->ch);
1253
1254       msize = path_length * sizeof (struct GNUNET_PeerIdentity);
1255       env = GNUNET_MQ_msg_extra (mmsg,
1256                                  msize,
1257                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1258       mmsg->options = htonl(options);
1259       mmsg->type = htonl(type);
1260       mmsg->hop_count = htonl(hop_count);
1261       mmsg->desired_replication_level = htonl(desired_replication_level);
1262       mmsg->get_path_length = htonl(path_length);
1263       mmsg->key = *key;
1264       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1265       GNUNET_memcpy (msg_path,
1266                      path,
1267                      path_length * sizeof (struct GNUNET_PeerIdentity));
1268       GNUNET_MQ_send (m->ch->mq,
1269                       env);
1270     }
1271   }
1272   GNUNET_free_non_null (cl);
1273 }
1274
1275
1276 /**
1277  * Check if some client is monitoring GET RESP messages and notify
1278  * them in that case.
1279  *
1280  * @param type The type of data in the result.
1281  * @param get_path Peers on GET path (or NULL if not recorded).
1282  * @param get_path_length number of entries in get_path.
1283  * @param put_path peers on the PUT path (or NULL if not recorded).
1284  * @param put_path_length number of entries in get_path.
1285  * @param exp Expiration time of the data.
1286  * @param key Key of the data.
1287  * @param data Pointer to the result data.
1288  * @param size Number of bytes in @a data.
1289  */
1290 void
1291 GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1292                               const struct GNUNET_PeerIdentity *get_path,
1293                               unsigned int get_path_length,
1294                               const struct GNUNET_PeerIdentity *put_path,
1295                               unsigned int put_path_length,
1296                               struct GNUNET_TIME_Absolute exp,
1297                               const struct GNUNET_HashCode * key,
1298                               const void *data,
1299                               size_t size)
1300 {
1301   struct ClientMonitorRecord *m;
1302   struct ClientHandle **cl;
1303   unsigned int cl_size;
1304
1305   cl = NULL;
1306   cl_size = 0;
1307   for (m = monitor_head; NULL != m; m = m->next)
1308   {
1309     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1310         (NULL == m->key ||
1311          memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1312     {
1313       struct GNUNET_MQ_Envelope *env;
1314       struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1315       struct GNUNET_PeerIdentity *path;
1316       size_t msize;
1317       unsigned int i;
1318
1319       /* Don't send duplicates */
1320       for (i = 0; i < cl_size; i++)
1321         if (cl[i] == m->ch)
1322           break;
1323       if (i < cl_size)
1324         continue;
1325       GNUNET_array_append (cl,
1326                            cl_size,
1327                            m->ch);
1328
1329       msize = size;
1330       msize += (get_path_length + put_path_length)
1331                * sizeof (struct GNUNET_PeerIdentity);
1332       env = GNUNET_MQ_msg_extra (mmsg,
1333                                  msize,
1334                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1335       mmsg->type = htonl(type);
1336       mmsg->put_path_length = htonl(put_path_length);
1337       mmsg->get_path_length = htonl(get_path_length);
1338       mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1339       mmsg->key = *key;
1340       path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1341       GNUNET_memcpy (path,
1342                      put_path,
1343                      put_path_length * sizeof (struct GNUNET_PeerIdentity));
1344       GNUNET_memcpy (path,
1345                      get_path,
1346                      get_path_length * sizeof (struct GNUNET_PeerIdentity));
1347       GNUNET_memcpy (&path[get_path_length],
1348                      data,
1349                      size);
1350       GNUNET_MQ_send (m->ch->mq,
1351                       env);
1352     }
1353   }
1354   GNUNET_free_non_null (cl);
1355 }
1356
1357
1358 /**
1359  * Check if some client is monitoring PUT messages and notify
1360  * them in that case.
1361  *
1362  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1363  * @param type The type of data in the request.
1364  * @param hop_count Hop count so far.
1365  * @param path_length number of entries in path (or 0 if not recorded).
1366  * @param path peers on the PUT path (or NULL if not recorded).
1367  * @param desired_replication_level Desired replication level.
1368  * @param exp Expiration time of the data.
1369  * @param key Key under which data is to be stored.
1370  * @param data Pointer to the data carried.
1371  * @param size Number of bytes in data.
1372  */
1373 void
1374 GDS_CLIENTS_process_put (uint32_t options,
1375                          enum GNUNET_BLOCK_Type type,
1376                          uint32_t hop_count,
1377                          uint32_t desired_replication_level,
1378                          unsigned int path_length,
1379                          const struct GNUNET_PeerIdentity *path,
1380                          struct GNUNET_TIME_Absolute exp,
1381                          const struct GNUNET_HashCode *key,
1382                          const void *data,
1383                          size_t size)
1384 {
1385   struct ClientMonitorRecord *m;
1386   struct ClientHandle **cl;
1387   unsigned int cl_size;
1388
1389   cl = NULL;
1390   cl_size = 0;
1391   for (m = monitor_head; NULL != m; m = m->next)
1392   {
1393     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1394         (NULL == m->key ||
1395          memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1396     {
1397       struct GNUNET_MQ_Envelope *env;
1398       struct GNUNET_DHT_MonitorPutMessage *mmsg;
1399       struct GNUNET_PeerIdentity *msg_path;
1400       size_t msize;
1401       unsigned int i;
1402
1403       /* Don't send duplicates */
1404       for (i = 0; i < cl_size; i++)
1405         if (cl[i] == m->ch)
1406           break;
1407       if (i < cl_size)
1408         continue;
1409       GNUNET_array_append (cl,
1410                            cl_size,
1411                            m->ch);
1412
1413       msize = size;
1414       msize += path_length * sizeof (struct GNUNET_PeerIdentity);
1415       env = GNUNET_MQ_msg_extra (mmsg,
1416                                  msize,
1417                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1418       mmsg->options = htonl(options);
1419       mmsg->type = htonl(type);
1420       mmsg->hop_count = htonl(hop_count);
1421       mmsg->desired_replication_level = htonl (desired_replication_level);
1422       mmsg->put_path_length = htonl (path_length);
1423       mmsg->key = *key;
1424       mmsg->expiration_time = GNUNET_TIME_absolute_hton (exp);
1425       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1426       GNUNET_memcpy (msg_path,
1427                      path,
1428                      path_length * sizeof (struct GNUNET_PeerIdentity));
1429       GNUNET_memcpy (&msg_path[path_length],
1430                      data,
1431                      size);
1432       GNUNET_MQ_send (m->ch->mq,
1433                       env);
1434     }
1435   }
1436   GNUNET_free_non_null (cl);
1437 }
1438
1439
1440 /**
1441  * Initialize client subsystem.
1442  *
1443  * @param server the initialized server
1444  */
1445 static void
1446 GDS_CLIENTS_init ()
1447 {
1448   forward_map
1449     = GNUNET_CONTAINER_multihashmap_create (1024,
1450                                             GNUNET_YES);
1451   retry_heap
1452     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1453 }
1454
1455
1456 /**
1457  * Shutdown client subsystem.
1458  */
1459 static void
1460 GDS_CLIENTS_stop ()
1461 {
1462   if (NULL != retry_task)
1463   {
1464     GNUNET_SCHEDULER_cancel (retry_task);
1465     retry_task = NULL;
1466   }
1467 }
1468
1469
1470 /**
1471  * Define "main" method using service macro.
1472  *
1473  * @param name name of the service, i.e. "dht" or "xdht"
1474  * @param run name of the initializaton method for the service
1475  */
1476 #define GDS_DHT_SERVICE_INIT(name,run)          \
1477  GNUNET_SERVICE_MAIN \
1478   (name, \
1479   GNUNET_SERVICE_OPTION_NONE, \
1480   run, \
1481   &client_connect_cb, \
1482   &client_disconnect_cb, \
1483   NULL, \
1484   GNUNET_MQ_hd_var_size (dht_local_put, \
1485                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1486                          struct GNUNET_DHT_ClientPutMessage, \
1487                          NULL), \
1488   GNUNET_MQ_hd_var_size (dht_local_get, \
1489                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1490                          struct GNUNET_DHT_ClientGetMessage, \
1491                          NULL), \
1492   GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \
1493                           GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1494                           struct GNUNET_DHT_ClientGetStopMessage, \
1495                           NULL), \
1496   GNUNET_MQ_hd_fixed_size (dht_local_monitor, \
1497                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1498                            struct GNUNET_DHT_MonitorStartStopMessage, \
1499                            NULL), \
1500   GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \
1501                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1502                            struct GNUNET_DHT_MonitorStartStopMessage, \
1503                            NULL), \
1504   GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \
1505                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1506                          struct GNUNET_DHT_ClientGetResultSeenMessage , \
1507                          NULL), \
1508   GNUNET_MQ_handler_end ())
1509
1510
1511 /**
1512  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1513  */
1514 void __attribute__ ((destructor))
1515 GDS_CLIENTS_done ()
1516 {
1517   if (NULL != retry_heap)
1518   {
1519     GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1520     GNUNET_CONTAINER_heap_destroy (retry_heap);
1521     retry_heap = NULL;
1522   }
1523   if (NULL != forward_map)
1524   {
1525     GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
1526     GNUNET_CONTAINER_multihashmap_destroy (forward_map);
1527     forward_map = NULL;
1528   }
1529 }
1530
1531 /* end of gnunet-service-dht_clients.c */