psycstore
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-service-dht.h"
33 #include "gnunet-service-dht_datacache.h"
34 #include "gnunet-service-dht_neighbours.h"
35 #include "dht.h"
36
37
38 /**
39  * Should routing details be logged to stderr (for debugging)?
40  */
41 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "dht-clients",__VA_ARGS__)
44
45
46 /**
47  * Struct containing information about a client,
48  * handle to connect to it, and any pending messages
49  * that need to be sent to it.
50  */
51 struct ClientHandle;
52
53
54 /**
55  * Entry in the local forwarding map for a client's GET request.
56  */
57 struct ClientQueryRecord
58 {
59
60   /**
61    * The key this request was about
62    */
63   struct GNUNET_HashCode key;
64
65   /**
66    * Kept in a DLL with @e client.
67    */
68   struct ClientQueryRecord *next;
69
70   /**
71    * Kept in a DLL with @e client.
72    */
73   struct ClientQueryRecord *prev;
74
75   /**
76    * Client responsible for the request.
77    */
78   struct ClientHandle *ch;
79
80   /**
81    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
82    */
83   const void *xquery;
84
85   /**
86    * Replies we have already seen for this request.
87    */
88   struct GNUNET_HashCode *seen_replies;
89
90   /**
91    * Pointer to this nodes heap location in the retry-heap (for fast removal)
92    */
93   struct GNUNET_CONTAINER_HeapNode *hnode;
94
95   /**
96    * What's the delay between re-try operations that we currently use for this
97    * request?
98    */
99   struct GNUNET_TIME_Relative retry_frequency;
100
101   /**
102    * What's the next time we should re-try this request?
103    */
104   struct GNUNET_TIME_Absolute retry_time;
105
106   /**
107    * The unique identifier of this request
108    */
109   uint64_t unique_id;
110
111   /**
112    * Number of bytes in xquery.
113    */
114   size_t xquery_size;
115
116   /**
117    * Number of entries in 'seen_replies'.
118    */
119   unsigned int seen_replies_count;
120
121   /**
122    * Desired replication level
123    */
124   uint32_t replication;
125
126   /**
127    * Any message options for this request
128    */
129   uint32_t msg_options;
130
131   /**
132    * The type for the data for the GET request.
133    */
134   enum GNUNET_BLOCK_Type type;
135
136 };
137
138
139 /**
140  * Struct containing paremeters of monitoring requests.
141  */
142 struct ClientMonitorRecord
143 {
144
145   /**
146    * Next element in DLL.
147    */
148   struct ClientMonitorRecord *next;
149
150   /**
151    * Previous element in DLL.
152    */
153   struct ClientMonitorRecord *prev;
154
155   /**
156    * Type of blocks that are of interest
157    */
158   enum GNUNET_BLOCK_Type type;
159
160   /**
161    * Key of data of interest, NULL for all.
162    */
163   struct GNUNET_HashCode *key;
164
165   /**
166    * Flag whether to notify about GET messages.
167    */
168   int16_t get;
169
170   /**
171    * Flag whether to notify about GET_REPONSE messages.
172    */
173   int16_t get_resp;
174
175   /**
176    * Flag whether to notify about PUT messages.
177    */
178   uint16_t put;
179
180   /**
181    * Client to notify of these requests.
182    */
183   struct ClientHandle *ch;
184 };
185
186
187 /**
188  * Struct containing information about a client,
189  * handle to connect to it, and any pending messages
190  * that need to be sent to it.
191  */
192 struct ClientHandle
193 {
194   /**
195    * Linked list of active queries of this client.
196    */
197   struct ClientQueryRecord *cqr_head;
198
199   /**
200    * Linked list of active queries of this client.
201    */
202   struct ClientQueryRecord *cqr_tail;
203
204   /**
205    * The handle to this client
206    */
207   struct GNUNET_SERVICE_Client *client;
208
209   /**
210    * The message queue to this client
211    */
212   struct GNUNET_MQ_Handle *mq;
213
214 };
215
216 /**
217  * Our handle to the BLOCK library.
218  */
219 struct GNUNET_BLOCK_Context *GDS_block_context;
220
221 /**
222  * Handle for the statistics service.
223  */
224 struct GNUNET_STATISTICS_Handle *GDS_stats;
225
226 /**
227  * Handle for the service.
228  */
229 struct GNUNET_SERVICE_Handle *GDS_service;
230
231 /**
232  * The configuration the DHT service is running with
233  */
234 const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
235
236 /**
237  * List of active monitoring requests.
238  */
239 static struct ClientMonitorRecord *monitor_head;
240
241 /**
242  * List of active monitoring requests.
243  */
244 static struct ClientMonitorRecord *monitor_tail;
245
246 /**
247  * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
248  */
249 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
250
251 /**
252  * Heap with all of our client's request, sorted by retry time (earliest on top).
253  */
254 static struct GNUNET_CONTAINER_Heap *retry_heap;
255
256 /**
257  * Task that re-transmits requests (using retry_heap).
258  */
259 static struct GNUNET_SCHEDULER_Task *retry_task;
260
261
262 /**
263  * Free data structures associated with the given query.
264  *
265  * @param record record to remove
266  */
267 static void
268 remove_client_record (struct ClientQueryRecord *record)
269 {
270   struct ClientHandle *ch = record->ch;
271
272   GNUNET_CONTAINER_DLL_remove (ch->cqr_head,
273                                ch->cqr_tail,
274                                record);
275   GNUNET_assert (GNUNET_YES ==
276                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
277                                                        &record->key,
278                                                        record));
279   if (NULL != record->hnode)
280     GNUNET_CONTAINER_heap_remove_node (record->hnode);
281   GNUNET_array_grow (record->seen_replies,
282                      record->seen_replies_count,
283                      0);
284   GNUNET_free (record);
285 }
286
287
288 /**
289  * Functions with this signature are called whenever a local client is
290  * connects to us.
291  *
292  * @param cls closure (NULL for dht)
293  * @param client identification of the client
294  * @param mq message queue for talking to @a client
295  * @return our `struct ClientHandle` for @a client
296  */
297 static void *
298 client_connect_cb (void *cls,
299                    struct GNUNET_SERVICE_Client *client,
300                    struct GNUNET_MQ_Handle *mq)
301 {
302   struct ClientHandle *ch;
303
304   ch = GNUNET_new (struct ClientHandle);
305   ch->client = client;
306   ch->mq = mq;
307   return ch;
308 }
309
310
311 /**
312  * Functions with this signature are called whenever a client
313  * is disconnected on the network level.
314  *
315  * @param cls closure (NULL for dht)
316  * @param client identification of the client
317  * @param app_ctx our `struct ClientHandle` for @a client
318  */
319 static void
320 client_disconnect_cb (void *cls,
321                       struct GNUNET_SERVICE_Client *client,
322                       void *app_ctx)
323 {
324   struct ClientHandle *ch = app_ctx;
325   struct ClientQueryRecord *cqr;
326   struct ClientMonitorRecord *monitor;
327
328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329               "Local client %p disconnects\n",
330               ch);
331   monitor = monitor_head;
332   while (NULL != monitor)
333   {
334     if (monitor->ch == ch)
335     {
336       struct ClientMonitorRecord *next;
337
338       next = monitor->next;
339       GNUNET_free_non_null (monitor->key);
340       GNUNET_CONTAINER_DLL_remove (monitor_head,
341                                    monitor_tail,
342                                    monitor);
343       GNUNET_free (monitor);
344       monitor = next;
345     }
346     else
347     {
348       monitor = monitor->next;
349     }
350   }
351   while (NULL != (cqr = ch->cqr_head))
352     remove_client_record (cqr);
353   GNUNET_free (ch);
354 }
355
356
357 /**
358  * Route the given request via the DHT.  This includes updating
359  * the bloom filter and retransmission times, building the P2P
360  * message and initiating the routing operation.
361  */
362 static void
363 transmit_request (struct ClientQueryRecord *cqr)
364 {
365   int32_t reply_bf_mutator;
366   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
367   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
368
369   GNUNET_STATISTICS_update (GDS_stats,
370                             gettext_noop ("# GET requests from clients injected"),
371                             1,
372                             GNUNET_NO);
373   reply_bf_mutator =
374       (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
375                                           UINT32_MAX);
376   reply_bf
377     = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
378                                           cqr->seen_replies,
379                                           cqr->seen_replies_count);
380   peer_bf
381     = GNUNET_CONTAINER_bloomfilter_init (NULL,
382                                          DHT_BLOOM_SIZE,
383                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
384   LOG (GNUNET_ERROR_TYPE_DEBUG,
385        "Initiating GET for %s, replication %u, already have %u replies\n",
386        GNUNET_h2s (&cqr->key),
387        cqr->replication,
388        cqr->seen_replies_count);
389   GDS_NEIGHBOURS_handle_get (cqr->type,
390                              cqr->msg_options,
391                              cqr->replication,
392                              0 /* hop count */ ,
393                              &cqr->key,
394                              cqr->xquery,
395                              cqr->xquery_size,
396                              reply_bf,
397                              reply_bf_mutator,
398                              peer_bf);
399   GNUNET_CONTAINER_bloomfilter_free (reply_bf);
400   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
401
402   /* exponential back-off for retries.
403    * max GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
404   cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
405   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
406 }
407
408
409 /**
410  * Task that looks at the #retry_heap and transmits all of the requests
411  * on the heap that are ready for transmission.  Then re-schedules
412  * itself (unless the heap is empty).
413  *
414  * @param cls unused
415  */
416 static void
417 transmit_next_request_task (void *cls)
418 {
419   struct ClientQueryRecord *cqr;
420   struct GNUNET_TIME_Relative delay;
421
422   retry_task = NULL;
423   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
424   {
425     cqr->hnode = NULL;
426     delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
427     if (delay.rel_value_us > 0)
428     {
429       cqr->hnode =
430           GNUNET_CONTAINER_heap_insert (retry_heap,
431                                         cqr,
432                                         cqr->retry_time.abs_value_us);
433       retry_task =
434           GNUNET_SCHEDULER_add_delayed (delay,
435                                         &transmit_next_request_task,
436                                         NULL);
437       return;
438     }
439     transmit_request (cqr);
440     cqr->hnode
441       = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
442                                       cqr->retry_time.abs_value_us);
443   }
444 }
445
446
447 /**
448  * Check DHT PUT messages from the client.
449  *
450  * @param cls the client we received this message from
451  * @param dht_msg the actual message received
452  * @return #GNUNET_OK (always)
453  */
454 static int
455 check_dht_local_put (void *cls,
456                       const struct GNUNET_DHT_ClientPutMessage *dht_msg)
457 {
458   /* always well-formed */
459   return GNUNET_OK;
460 }
461
462
463 /**
464  * Handler for PUT messages.
465  *
466  * @param cls the client we received this message from
467  * @param dht_msg the actual message received
468  */
469 static void
470 handle_dht_local_put (void *cls,
471                       const struct GNUNET_DHT_ClientPutMessage *dht_msg)
472 {
473   struct ClientHandle *ch = cls;
474   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
475   uint16_t size;
476   struct GNUNET_MQ_Envelope *env;
477   struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
478
479   size = ntohs (dht_msg->header.size);
480   GNUNET_STATISTICS_update (GDS_stats,
481                             gettext_noop ("# PUT requests received from clients"),
482                             1,
483                             GNUNET_NO);
484   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
485                "CLIENT-PUT %s\n",
486                GNUNET_h2s_full (&dht_msg->key));
487   /* give to local clients */
488   LOG (GNUNET_ERROR_TYPE_DEBUG,
489        "Handling local PUT of %u-bytes for query %s\n",
490        size - sizeof (struct GNUNET_DHT_ClientPutMessage),
491        GNUNET_h2s (&dht_msg->key));
492   GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
493                             &dht_msg->key,
494                             0,
495                             NULL,
496                             0,
497                             NULL,
498                             ntohl (dht_msg->type),
499                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
500                             &dht_msg[1]);
501   /* store locally */
502   GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
503                             &dht_msg->key,
504                             0,
505                             NULL,
506                             ntohl (dht_msg->type),
507                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
508                             &dht_msg[1]);
509   /* route to other peers */
510   peer_bf
511     = GNUNET_CONTAINER_bloomfilter_init (NULL,
512                                          DHT_BLOOM_SIZE,
513                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
514   GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
515                              ntohl (dht_msg->options),
516                              ntohl (dht_msg->desired_replication_level),
517                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
518                              0 /* hop count */,
519                              peer_bf,
520                              &dht_msg->key,
521                              0,
522                              NULL,
523                              &dht_msg[1],
524                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
525   GDS_CLIENTS_process_put (ntohl (dht_msg->options),
526                            ntohl (dht_msg->type),
527                            0,
528                            ntohl (dht_msg->desired_replication_level),
529                            1,
530                            GDS_NEIGHBOURS_get_id(),
531                            GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
532                            &dht_msg->key,
533                            &dht_msg[1],
534                            size - sizeof (struct GNUNET_DHT_ClientPutMessage));
535   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
536   env = GNUNET_MQ_msg (conf,
537                        GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
538   conf->reserved = htonl (0);
539   conf->unique_id = dht_msg->unique_id;
540   GNUNET_MQ_send (ch->mq,
541                   env);
542   GNUNET_SERVICE_client_continue (ch->client);
543 }
544
545
546 /**
547  * Check DHT GET messages from the client.
548  *
549  * @param cls the client we received this message from
550  * @param message the actual message received
551  * @return #GNUNET_OK (always)
552  */
553 static int
554 check_dht_local_get (void *cls,
555                       const struct GNUNET_DHT_ClientGetMessage *get)
556 {
557   /* always well-formed */
558   return GNUNET_OK;
559 }
560
561
562 /**
563  * Handle a result from local datacache for a GET operation.
564  *
565  * @param cls the `struct ClientHandle` of the client doing the query
566  * @param type type of the block
567  * @param expiration_time when does the content expire
568  * @param key key for the content
569  * @param put_path_length number of entries in @a put_path
570  * @param put_path peers the original PUT traversed (if tracked)
571  * @param get_path_length number of entries in @a get_path
572  * @param get_path peers this reply has traversed so far (if tracked)
573  * @param data payload of the reply
574  * @param data_size number of bytes in @a data
575  */
576 static void
577 handle_local_result (void *cls,
578                      enum GNUNET_BLOCK_Type type,
579                      struct GNUNET_TIME_Absolute expiration_time,
580                      const struct GNUNET_HashCode *key,
581                      unsigned int put_path_length,
582                      const struct GNUNET_PeerIdentity *put_path,
583                      unsigned int get_path_length,
584                      const struct GNUNET_PeerIdentity *get_path,
585                      const void *data,
586                      size_t data_size)
587 {
588   // FIXME: this needs some clean up: inline the function,
589   // possibly avoid even looking up the client!
590   GDS_CLIENTS_handle_reply (expiration_time,
591                             key,
592                             0, NULL,
593                             put_path_length, put_path,
594                             type,
595                             data_size, data);
596 }
597
598
599 /**
600  * Handler for DHT GET messages from the client.
601  *
602  * @param cls the client we received this message from
603  * @param message the actual message received
604  */
605 static void
606 handle_dht_local_get (void *cls,
607                       const struct GNUNET_DHT_ClientGetMessage *get)
608 {
609   struct ClientHandle *ch = cls;
610   struct ClientQueryRecord *cqr;
611   size_t xquery_size;
612   const char *xquery;
613   uint16_t size;
614
615   size = ntohs (get->header.size);
616   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
617   xquery = (const char *) &get[1];
618   GNUNET_STATISTICS_update (GDS_stats,
619                             gettext_noop
620                             ("# GET requests received from clients"), 1,
621                             GNUNET_NO);
622   LOG (GNUNET_ERROR_TYPE_DEBUG,
623        "Received GET request for %s from local client %p, xq: %.*s\n",
624        GNUNET_h2s (&get->key),
625        ch->client,
626        xquery_size,
627        xquery);
628   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
629                "CLIENT-GET %s\n",
630                GNUNET_h2s_full (&get->key));
631
632   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
633   cqr->key = get->key;
634   cqr->ch = ch;
635   cqr->xquery = (void *) &cqr[1];
636   GNUNET_memcpy (&cqr[1], xquery, xquery_size);
637   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0);
638   cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
639   cqr->retry_time = GNUNET_TIME_absolute_get ();
640   cqr->unique_id = get->unique_id;
641   cqr->xquery_size = xquery_size;
642   cqr->replication = ntohl (get->desired_replication_level);
643   cqr->msg_options = ntohl (get->options);
644   cqr->type = ntohl (get->type);
645   GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
646                                ch->cqr_tail,
647                                cqr);
648   GNUNET_CONTAINER_multihashmap_put (forward_map,
649                                      &cqr->key,
650                                      cqr,
651                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
652   GDS_CLIENTS_process_get (ntohl (get->options),
653                            ntohl (get->type),
654                            0,
655                            ntohl (get->desired_replication_level),
656                            1,
657                            GDS_NEIGHBOURS_get_id(),
658                            &get->key);
659   /* start remote requests */
660   if (NULL != retry_task)
661     GNUNET_SCHEDULER_cancel (retry_task);
662   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task,
663                                          NULL);
664   /* perform local lookup */
665   GDS_DATACACHE_handle_get (&get->key,
666                             cqr->type,
667                             cqr->xquery,
668                             xquery_size,
669                             NULL,
670                             0,
671                             &handle_local_result,
672                             ch);
673   GNUNET_SERVICE_client_continue (ch->client);
674 }
675
676
677 /**
678  * Closure for #find_by_unique_id().
679  */
680 struct FindByUniqueIdContext
681 {
682   /**
683    * Where to store the result, if found.
684    */
685   struct ClientQueryRecord *cqr;
686
687   uint64_t unique_id;
688 };
689
690
691 /**
692  * Function called for each existing DHT record for the given
693  * query.  Checks if it matches the UID given in the closure
694  * and if so returns the entry as a result.
695  *
696  * @param cls the search context
697  * @param key query for the lookup (not used)
698  * @param value the `struct ClientQueryRecord`
699  * @return #GNUNET_YES to continue iteration (result not yet found)
700  */
701 static int
702 find_by_unique_id (void *cls,
703                    const struct GNUNET_HashCode *key,
704                    void *value)
705 {
706   struct FindByUniqueIdContext *fui_ctx = cls;
707   struct ClientQueryRecord *cqr = value;
708
709   if (cqr->unique_id != fui_ctx->unique_id)
710     return GNUNET_YES;
711   fui_ctx->cqr = cqr;
712   return GNUNET_NO;
713 }
714
715
716 /**
717  * Check "GET result seen" messages from the client.
718  *
719  * @param cls the client we received this message from
720  * @param message the actual message received
721  * @return #GNUNET_OK if @a seen is well-formed
722  */
723 static int
724 check_dht_local_get_result_seen (void *cls,
725                                  const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
726 {
727   uint16_t size;
728   unsigned int hash_count;
729
730   size = ntohs (seen->header.size);
731   hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
732   if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode))
733   {
734     GNUNET_break (0);
735     return GNUNET_SYSERR;
736   }
737   return GNUNET_OK;
738 }
739
740
741 /**
742  * Handler for "GET result seen" messages from the client.
743  *
744  * @param cls the client we received this message from
745  * @param message the actual message received
746  */
747 static void
748 handle_dht_local_get_result_seen (void *cls,
749                                   const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
750 {
751   struct ClientHandle *ch = cls;
752   uint16_t size;
753   unsigned int hash_count;
754   unsigned int old_count;
755   const struct GNUNET_HashCode *hc;
756   struct FindByUniqueIdContext fui_ctx;
757   struct ClientQueryRecord *cqr;
758
759   size = ntohs (seen->header.size);
760   hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
761   hc = (const struct GNUNET_HashCode*) &seen[1];
762   fui_ctx.unique_id = seen->unique_id;
763   fui_ctx.cqr = NULL;
764   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
765                                               &seen->key,
766                                               &find_by_unique_id,
767                                               &fui_ctx);
768   if (NULL == (cqr = fui_ctx.cqr))
769   {
770     GNUNET_break (0);
771     GNUNET_SERVICE_client_drop (ch->client);
772     return;
773   }
774   /* finally, update 'seen' list */
775   old_count = cqr->seen_replies_count;
776   GNUNET_array_grow (cqr->seen_replies,
777                      cqr->seen_replies_count,
778                      cqr->seen_replies_count + hash_count);
779   GNUNET_memcpy (&cqr->seen_replies[old_count],
780                  hc,
781                  sizeof (struct GNUNET_HashCode) * hash_count);
782 }
783
784
785 /**
786  * Closure for #remove_by_unique_id().
787  */
788 struct RemoveByUniqueIdContext
789 {
790   /**
791    * Client that issued the removal request.
792    */
793   struct ClientHandle *ch;
794
795   /**
796    * Unique ID of the request.
797    */
798   uint64_t unique_id;
799 };
800
801
802 /**
803  * Iterator over hash map entries that frees all entries
804  * that match the given client and unique ID.
805  *
806  * @param cls unique ID and client to search for in source routes
807  * @param key current key code
808  * @param value value in the hash map, a ClientQueryRecord
809  * @return #GNUNET_YES (we should continue to iterate)
810  */
811 static int
812 remove_by_unique_id (void *cls,
813                      const struct GNUNET_HashCode *key,
814                      void *value)
815 {
816   const struct RemoveByUniqueIdContext *ctx = cls;
817   struct ClientQueryRecord *cqr = value;
818
819   if (cqr->unique_id != ctx->unique_id)
820     return GNUNET_YES;
821   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822               "Removing client %p's record for key %s (by unique id)\n",
823               ctx->ch->client,
824               GNUNET_h2s (key));
825   remove_client_record (cqr);
826   return GNUNET_YES;
827 }
828
829
830 /**
831  * Handler for any generic DHT stop messages, calls the appropriate handler
832  * depending on message type (if processed locally)
833  *
834  * @param cls client we received this message from
835  * @param message the actual message received
836  *
837  */
838 static void
839 handle_dht_local_get_stop (void *cls,
840                            const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
841 {
842   struct ClientHandle *ch = cls;
843   struct RemoveByUniqueIdContext ctx;
844
845   GNUNET_STATISTICS_update (GDS_stats,
846                             gettext_noop
847                             ("# GET STOP requests received from clients"), 1,
848                             GNUNET_NO);
849   LOG (GNUNET_ERROR_TYPE_DEBUG,
850        "Received GET STOP request for %s from local client %p\n",
851        GNUNET_h2s (&dht_stop_msg->key),
852        ch->client);
853   ctx.ch = ch;
854   ctx.unique_id = dht_stop_msg->unique_id;
855   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
856                                               &dht_stop_msg->key,
857                                               &remove_by_unique_id,
858                                               &ctx);
859   GNUNET_SERVICE_client_continue (ch->client);
860 }
861
862
863 /**
864  * Handler for monitor start messages
865  *
866  * @param cls the client we received this message from
867  * @param msg the actual message received
868  *
869  */
870 static void
871 handle_dht_local_monitor (void *cls,
872                           const struct GNUNET_DHT_MonitorStartStopMessage *msg)
873 {
874   struct ClientHandle *ch = cls;
875   struct ClientMonitorRecord *r;
876
877   r = GNUNET_new (struct ClientMonitorRecord);
878   r->ch = ch;
879   r->type = ntohl (msg->type);
880   r->get = ntohs (msg->get);
881   r->get_resp = ntohs (msg->get_resp);
882   r->put = ntohs (msg->put);
883   if (0 == ntohs (msg->filter_key))
884   {
885     r->key = NULL;
886   }
887   else
888   {
889     r->key = GNUNET_new (struct GNUNET_HashCode);
890     GNUNET_memcpy (r->key,
891                    &msg->key,
892                    sizeof (struct GNUNET_HashCode));
893   }
894   GNUNET_CONTAINER_DLL_insert (monitor_head,
895                                monitor_tail,
896                                r);
897   GNUNET_SERVICE_client_continue (ch->client);
898 }
899
900
901 /**
902  * Handler for monitor stop messages
903  *
904  * @param cls the client we received this message from
905  * @param msg the actual message received
906  */
907 static void
908 handle_dht_local_monitor_stop (void *cls,
909                                const struct GNUNET_DHT_MonitorStartStopMessage *msg)
910 {
911   struct ClientHandle *ch = cls;
912   struct ClientMonitorRecord *r;
913   int keys_match;
914
915   GNUNET_SERVICE_client_continue (ch->client);
916   for (r = monitor_head; NULL != r; r = r->next)
917   {
918     if (NULL == r->key)
919     {
920       keys_match = (0 == ntohs(msg->filter_key));
921     }
922     else
923     {
924       keys_match = ( (0 != ntohs(msg->filter_key)) &&
925                      (! memcmp (r->key,
926                                 &msg->key,
927                                 sizeof(struct GNUNET_HashCode))) );
928     }
929     if ( (ch == r->ch) &&
930          (ntohl(msg->type) == r->type) &&
931          (r->get == msg->get) &&
932          (r->get_resp == msg->get_resp) &&
933          (r->put == msg->put) &&
934          keys_match )
935     {
936       GNUNET_CONTAINER_DLL_remove (monitor_head,
937                                    monitor_tail,
938                                    r);
939       GNUNET_free_non_null (r->key);
940       GNUNET_free (r);
941       return; /* Delete only ONE entry */
942     }
943   }
944 }
945
946
947 /**
948  * Closure for #forward_reply()
949  */
950 struct ForwardReplyContext
951 {
952
953   /**
954    * Expiration time of the reply.
955    */
956   struct GNUNET_TIME_Absolute expiration;
957
958   /**
959    * GET path taken.
960    */
961   const struct GNUNET_PeerIdentity *get_path;
962
963   /**
964    * PUT path taken.
965    */
966   const struct GNUNET_PeerIdentity *put_path;
967
968   /**
969    * Embedded payload.
970    */
971   const void *data;
972
973   /**
974    * Number of bytes in data.
975    */
976   size_t data_size;
977
978   /**
979    * Number of entries in @e get_path.
980    */
981   unsigned int get_path_length;
982
983   /**
984    * Number of entries in @e put_path.
985    */
986   unsigned int put_path_length;
987
988   /**
989    * Type of the data.
990    */
991   enum GNUNET_BLOCK_Type type;
992
993 };
994
995
996 /**
997  * Iterator over hash map entries that send a given reply to
998  * each of the matching clients.  With some tricky recycling
999  * of the buffer.
1000  *
1001  * @param cls the 'struct ForwardReplyContext'
1002  * @param key current key
1003  * @param value value in the hash map, a ClientQueryRecord
1004  * @return #GNUNET_YES (we should continue to iterate),
1005  *         if the result is mal-formed, #GNUNET_NO
1006  */
1007 static int
1008 forward_reply (void *cls,
1009                const struct GNUNET_HashCode *key,
1010                void *value)
1011 {
1012   struct ForwardReplyContext *frc = cls;
1013   struct ClientQueryRecord *record = value;
1014   struct GNUNET_MQ_Envelope *env;
1015   struct GNUNET_DHT_ClientResultMessage *reply;
1016   enum GNUNET_BLOCK_EvaluationResult eval;
1017   int do_free;
1018   struct GNUNET_HashCode ch;
1019   struct GNUNET_PeerIdentity *paths;
1020
1021   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1022                "CLIENT-RESULT %s\n",
1023                GNUNET_h2s_full (key));
1024   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
1025        (record->type != frc->type))
1026   {
1027     LOG (GNUNET_ERROR_TYPE_DEBUG,
1028          "Record type missmatch, not passing request for key %s to local client\n",
1029          GNUNET_h2s (key));
1030     GNUNET_STATISTICS_update (GDS_stats,
1031                               gettext_noop
1032                               ("# Key match, type mismatches in REPLY to CLIENT"),
1033                               1, GNUNET_NO);
1034     return GNUNET_YES;          /* type mismatch */
1035   }
1036   GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch);
1037   for (unsigned int i = 0; i < record->seen_replies_count; i++)
1038     if (0 == memcmp (&record->seen_replies[i],
1039                      &ch,
1040                      sizeof (struct GNUNET_HashCode)))
1041     {
1042       LOG (GNUNET_ERROR_TYPE_DEBUG,
1043            "Duplicate reply, not passing request for key %s to local client\n",
1044            GNUNET_h2s (key));
1045       GNUNET_STATISTICS_update (GDS_stats,
1046                                 gettext_noop
1047                                 ("# Duplicate REPLIES to CLIENT request dropped"),
1048                                 1, GNUNET_NO);
1049       return GNUNET_YES;        /* duplicate */
1050     }
1051   eval
1052     = GNUNET_BLOCK_evaluate (GDS_block_context,
1053                              record->type,
1054                              GNUNET_BLOCK_EO_NONE,
1055                              key,
1056                              NULL,
1057                              0,
1058                              record->xquery,
1059                              record->xquery_size,
1060                              frc->data,
1061                              frc->data_size);
1062   LOG (GNUNET_ERROR_TYPE_DEBUG,
1063        "Evaluation result is %d for key %s for local client's query\n",
1064        (int) eval,
1065        GNUNET_h2s (key));
1066   switch (eval)
1067   {
1068   case GNUNET_BLOCK_EVALUATION_OK_LAST:
1069     do_free = GNUNET_YES;
1070     break;
1071   case GNUNET_BLOCK_EVALUATION_OK_MORE:
1072     GNUNET_array_append (record->seen_replies,
1073                          record->seen_replies_count,
1074                          ch);
1075     do_free = GNUNET_NO;
1076     break;
1077   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1078     /* should be impossible to encounter here */
1079     GNUNET_break (0);
1080     return GNUNET_YES;
1081   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1082     GNUNET_break_op (0);
1083     return GNUNET_NO;
1084   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1085     GNUNET_break (0);
1086     return GNUNET_NO;
1087   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1088     GNUNET_break (0);
1089     return GNUNET_NO;
1090   case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1091     return GNUNET_YES;
1092   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1093     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1094                 _("Unsupported block type (%u) in request!\n"), record->type);
1095     return GNUNET_NO;
1096   default:
1097     GNUNET_break (0);
1098     return GNUNET_NO;
1099   }
1100   GNUNET_STATISTICS_update (GDS_stats,
1101                             gettext_noop ("# RESULTS queued for clients"),
1102                             1,
1103                             GNUNET_NO);
1104   env = GNUNET_MQ_msg_extra (reply,
1105                              frc->data_size +
1106                              (frc->get_path_length + frc->put_path_length) * sizeof (struct GNUNET_PeerIdentity),
1107                              GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1108   reply->type = htonl (frc->type);
1109   reply->get_path_length = htonl (frc->get_path_length);
1110   reply->put_path_length = htonl (frc->put_path_length);
1111   reply->unique_id = record->unique_id;
1112   reply->expiration = GNUNET_TIME_absolute_hton (frc->expiration);
1113   reply->key = *key;
1114   paths = (struct GNUNET_PeerIdentity *) &reply[1];
1115   GNUNET_memcpy (paths,
1116                  frc->put_path,
1117                  sizeof (struct GNUNET_PeerIdentity) * frc->put_path_length);
1118   GNUNET_memcpy (&paths[frc->put_path_length],
1119                  frc->get_path,
1120                  sizeof (struct GNUNET_PeerIdentity) * frc->get_path_length);
1121   GNUNET_memcpy (&paths[frc->get_path_length + frc->put_path_length],
1122                  frc->data,
1123                  frc->data_size);
1124   LOG (GNUNET_ERROR_TYPE_DEBUG,
1125        "Sending reply to query %s for client %p\n",
1126        GNUNET_h2s (key),
1127        record->ch->client);
1128   GNUNET_MQ_send (record->ch->mq,
1129                   env);
1130   if (GNUNET_YES == do_free)
1131     remove_client_record (record);
1132   return GNUNET_YES;
1133 }
1134
1135
1136 /**
1137  * Handle a reply we've received from another peer.  If the reply
1138  * matches any of our pending queries, forward it to the respective
1139  * client(s).
1140  *
1141  * @param expiration when will the reply expire
1142  * @param key the query this reply is for
1143  * @param get_path_length number of peers in @a get_path
1144  * @param get_path path the reply took on get
1145  * @param put_path_length number of peers in @a put_path
1146  * @param put_path path the reply took on put
1147  * @param type type of the reply
1148  * @param data_size number of bytes in @a data
1149  * @param data application payload data
1150  */
1151 void
1152 GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
1153                           const struct GNUNET_HashCode *key,
1154                           unsigned int get_path_length,
1155                           const struct GNUNET_PeerIdentity *get_path,
1156                           unsigned int put_path_length,
1157                           const struct GNUNET_PeerIdentity *put_path,
1158                           enum GNUNET_BLOCK_Type type,
1159                           size_t data_size,
1160                           const void *data)
1161 {
1162   struct ForwardReplyContext frc;
1163   size_t msize;
1164
1165   msize = sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size +
1166     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1167   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1168   {
1169     GNUNET_break (0);
1170     return;
1171   }
1172   if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map,
1173                                                  key))
1174   {
1175     LOG (GNUNET_ERROR_TYPE_DEBUG,
1176          "No matching client for reply for key %s\n",
1177          GNUNET_h2s (key));
1178     GNUNET_STATISTICS_update (GDS_stats,
1179                               gettext_noop ("# REPLIES ignored for CLIENTS (no match)"),
1180                               1,
1181                               GNUNET_NO);
1182     return;                     /* no matching request, fast exit! */
1183   }
1184   frc.expiration = expiration;
1185   frc.get_path = get_path;
1186   frc.put_path = put_path;
1187   frc.data = data;
1188   frc.data_size = data_size;
1189   frc.get_path_length = get_path_length;
1190   frc.put_path_length = put_path_length;
1191   frc.type = type;
1192   LOG (GNUNET_ERROR_TYPE_DEBUG,
1193        "Forwarding reply for key %s to client\n",
1194        GNUNET_h2s (key));
1195   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1196                                               key,
1197                                               &forward_reply,
1198                                               &frc);
1199
1200 }
1201
1202
1203 /**
1204  * Check if some client is monitoring GET messages and notify
1205  * them in that case.
1206  *
1207  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1208  * @param type The type of data in the request.
1209  * @param hop_count Hop count so far.
1210  * @param path_length number of entries in path (or 0 if not recorded).
1211  * @param path peers on the GET path (or NULL if not recorded).
1212  * @param desired_replication_level Desired replication level.
1213  * @param key Key of the requested data.
1214  */
1215 void
1216 GDS_CLIENTS_process_get (uint32_t options,
1217                          enum GNUNET_BLOCK_Type type,
1218                          uint32_t hop_count,
1219                          uint32_t desired_replication_level,
1220                          unsigned int path_length,
1221                          const struct GNUNET_PeerIdentity *path,
1222                          const struct GNUNET_HashCode * key)
1223 {
1224   struct ClientMonitorRecord *m;
1225   struct ClientHandle **cl;
1226   unsigned int cl_size;
1227
1228   cl = NULL;
1229   cl_size = 0;
1230   for (m = monitor_head; NULL != m; m = m->next)
1231   {
1232     if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1233            (m->type == type) ) &&
1234          ( (NULL == m->key) ||
1235            (0 == memcmp (key,
1236                          m->key,
1237                          sizeof(struct GNUNET_HashCode))) ) )
1238     {
1239       struct GNUNET_MQ_Envelope *env;
1240       struct GNUNET_DHT_MonitorGetMessage *mmsg;
1241       struct GNUNET_PeerIdentity *msg_path;
1242       size_t msize;
1243       unsigned int i;
1244
1245       /* Don't send duplicates */
1246       for (i = 0; i < cl_size; i++)
1247         if (cl[i] == m->ch)
1248           break;
1249       if (i < cl_size)
1250         continue;
1251       GNUNET_array_append (cl,
1252                            cl_size,
1253                            m->ch);
1254
1255       msize = path_length * sizeof (struct GNUNET_PeerIdentity);
1256       env = GNUNET_MQ_msg_extra (mmsg,
1257                                  msize,
1258                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1259       mmsg->options = htonl(options);
1260       mmsg->type = htonl(type);
1261       mmsg->hop_count = htonl(hop_count);
1262       mmsg->desired_replication_level = htonl(desired_replication_level);
1263       mmsg->get_path_length = htonl(path_length);
1264       mmsg->key = *key;
1265       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1266       GNUNET_memcpy (msg_path,
1267                      path,
1268                      path_length * sizeof (struct GNUNET_PeerIdentity));
1269       GNUNET_MQ_send (m->ch->mq,
1270                       env);
1271     }
1272   }
1273   GNUNET_free_non_null (cl);
1274 }
1275
1276
1277 /**
1278  * Check if some client is monitoring GET RESP messages and notify
1279  * them in that case.
1280  *
1281  * @param type The type of data in the result.
1282  * @param get_path Peers on GET path (or NULL if not recorded).
1283  * @param get_path_length number of entries in get_path.
1284  * @param put_path peers on the PUT path (or NULL if not recorded).
1285  * @param put_path_length number of entries in get_path.
1286  * @param exp Expiration time of the data.
1287  * @param key Key of the data.
1288  * @param data Pointer to the result data.
1289  * @param size Number of bytes in @a data.
1290  */
1291 void
1292 GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1293                               const struct GNUNET_PeerIdentity *get_path,
1294                               unsigned int get_path_length,
1295                               const struct GNUNET_PeerIdentity *put_path,
1296                               unsigned int put_path_length,
1297                               struct GNUNET_TIME_Absolute exp,
1298                               const struct GNUNET_HashCode * key,
1299                               const void *data,
1300                               size_t size)
1301 {
1302   struct ClientMonitorRecord *m;
1303   struct ClientHandle **cl;
1304   unsigned int cl_size;
1305
1306   cl = NULL;
1307   cl_size = 0;
1308   for (m = monitor_head; NULL != m; m = m->next)
1309   {
1310     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1311         (NULL == m->key ||
1312          memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1313     {
1314       struct GNUNET_MQ_Envelope *env;
1315       struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1316       struct GNUNET_PeerIdentity *path;
1317       size_t msize;
1318       unsigned int i;
1319
1320       /* Don't send duplicates */
1321       for (i = 0; i < cl_size; i++)
1322         if (cl[i] == m->ch)
1323           break;
1324       if (i < cl_size)
1325         continue;
1326       GNUNET_array_append (cl,
1327                            cl_size,
1328                            m->ch);
1329
1330       msize = size;
1331       msize += (get_path_length + put_path_length)
1332                * sizeof (struct GNUNET_PeerIdentity);
1333       env = GNUNET_MQ_msg_extra (mmsg,
1334                                  msize,
1335                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1336       mmsg->type = htonl(type);
1337       mmsg->put_path_length = htonl(put_path_length);
1338       mmsg->get_path_length = htonl(get_path_length);
1339       mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1340       mmsg->key = *key;
1341       path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1342       GNUNET_memcpy (path,
1343                      put_path,
1344                      put_path_length * sizeof (struct GNUNET_PeerIdentity));
1345       GNUNET_memcpy (path,
1346                      get_path,
1347                      get_path_length * sizeof (struct GNUNET_PeerIdentity));
1348       GNUNET_memcpy (&path[get_path_length],
1349                      data,
1350                      size);
1351       GNUNET_MQ_send (m->ch->mq,
1352                       env);
1353     }
1354   }
1355   GNUNET_free_non_null (cl);
1356 }
1357
1358
1359 /**
1360  * Check if some client is monitoring PUT messages and notify
1361  * them in that case.
1362  *
1363  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1364  * @param type The type of data in the request.
1365  * @param hop_count Hop count so far.
1366  * @param path_length number of entries in path (or 0 if not recorded).
1367  * @param path peers on the PUT path (or NULL if not recorded).
1368  * @param desired_replication_level Desired replication level.
1369  * @param exp Expiration time of the data.
1370  * @param key Key under which data is to be stored.
1371  * @param data Pointer to the data carried.
1372  * @param size Number of bytes in data.
1373  */
1374 void
1375 GDS_CLIENTS_process_put (uint32_t options,
1376                          enum GNUNET_BLOCK_Type type,
1377                          uint32_t hop_count,
1378                          uint32_t desired_replication_level,
1379                          unsigned int path_length,
1380                          const struct GNUNET_PeerIdentity *path,
1381                          struct GNUNET_TIME_Absolute exp,
1382                          const struct GNUNET_HashCode *key,
1383                          const void *data,
1384                          size_t size)
1385 {
1386   struct ClientMonitorRecord *m;
1387   struct ClientHandle **cl;
1388   unsigned int cl_size;
1389
1390   cl = NULL;
1391   cl_size = 0;
1392   for (m = monitor_head; NULL != m; m = m->next)
1393   {
1394     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1395         (NULL == m->key ||
1396          memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1397     {
1398       struct GNUNET_MQ_Envelope *env;
1399       struct GNUNET_DHT_MonitorPutMessage *mmsg;
1400       struct GNUNET_PeerIdentity *msg_path;
1401       size_t msize;
1402       unsigned int i;
1403
1404       /* Don't send duplicates */
1405       for (i = 0; i < cl_size; i++)
1406         if (cl[i] == m->ch)
1407           break;
1408       if (i < cl_size)
1409         continue;
1410       GNUNET_array_append (cl,
1411                            cl_size,
1412                            m->ch);
1413
1414       msize = size;
1415       msize += path_length * sizeof (struct GNUNET_PeerIdentity);
1416       env = GNUNET_MQ_msg_extra (mmsg,
1417                                  msize,
1418                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1419       mmsg->options = htonl(options);
1420       mmsg->type = htonl(type);
1421       mmsg->hop_count = htonl(hop_count);
1422       mmsg->desired_replication_level = htonl (desired_replication_level);
1423       mmsg->put_path_length = htonl (path_length);
1424       mmsg->key = *key;
1425       mmsg->expiration_time = GNUNET_TIME_absolute_hton (exp);
1426       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1427       GNUNET_memcpy (msg_path,
1428                      path,
1429                      path_length * sizeof (struct GNUNET_PeerIdentity));
1430       GNUNET_memcpy (&msg_path[path_length],
1431                      data,
1432                      size);
1433       GNUNET_MQ_send (m->ch->mq,
1434                       env);
1435     }
1436   }
1437   GNUNET_free_non_null (cl);
1438 }
1439
1440
1441 /**
1442  * Initialize client subsystem.
1443  *
1444  * @param server the initialized server
1445  */
1446 static void
1447 GDS_CLIENTS_init ()
1448 {
1449   forward_map
1450     = GNUNET_CONTAINER_multihashmap_create (1024,
1451                                             GNUNET_YES);
1452   retry_heap
1453     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1454 }
1455
1456
1457 /**
1458  * Shutdown client subsystem.
1459  */
1460 static void
1461 GDS_CLIENTS_stop ()
1462 {
1463   if (NULL != retry_task)
1464   {
1465     GNUNET_SCHEDULER_cancel (retry_task);
1466     retry_task = NULL;
1467   }
1468 }
1469
1470
1471 /**
1472  * Define "main" method using service macro.
1473  *
1474  * @param name name of the service, i.e. "dht" or "xdht"
1475  * @param run name of the initializaton method for the service
1476  */
1477 #define GDS_DHT_SERVICE_INIT(name,run)          \
1478  GNUNET_SERVICE_MAIN \
1479   (name, \
1480   GNUNET_SERVICE_OPTION_NONE, \
1481   run, \
1482   &client_connect_cb, \
1483   &client_disconnect_cb, \
1484   NULL, \
1485   GNUNET_MQ_hd_var_size (dht_local_put, \
1486                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1487                          struct GNUNET_DHT_ClientPutMessage, \
1488                          NULL), \
1489   GNUNET_MQ_hd_var_size (dht_local_get, \
1490                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1491                          struct GNUNET_DHT_ClientGetMessage, \
1492                          NULL), \
1493   GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \
1494                           GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1495                           struct GNUNET_DHT_ClientGetStopMessage, \
1496                           NULL), \
1497   GNUNET_MQ_hd_fixed_size (dht_local_monitor, \
1498                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1499                            struct GNUNET_DHT_MonitorStartStopMessage, \
1500                            NULL), \
1501   GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \
1502                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1503                            struct GNUNET_DHT_MonitorStartStopMessage, \
1504                            NULL), \
1505   GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \
1506                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1507                          struct GNUNET_DHT_ClientGetResultSeenMessage , \
1508                          NULL), \
1509   GNUNET_MQ_handler_end ())
1510
1511
1512 /**
1513  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1514  */
1515 void __attribute__ ((destructor))
1516 GDS_CLIENTS_done ()
1517 {
1518   if (NULL != retry_heap)
1519   {
1520     GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1521     GNUNET_CONTAINER_heap_destroy (retry_heap);
1522     retry_heap = NULL;
1523   }
1524   if (NULL != forward_map)
1525   {
1526     GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
1527     GNUNET_CONTAINER_multihashmap_destroy (forward_map);
1528     forward_map = NULL;
1529   }
1530 }
1531
1532 /* end of gnunet-service-dht_clients.c */