another special case for loopback
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-service-dht.h"
33 #include "gnunet-service-dht_datacache.h"
34 #include "gnunet-service-dht_neighbours.h"
35 #include "dht.h"
36
37
38 /**
39  * Should routing details be logged to stderr (for debugging)?
40  */
41 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "dht-clients",__VA_ARGS__)
44
45
46 /**
47  * Struct containing information about a client,
48  * handle to connect to it, and any pending messages
49  * that need to be sent to it.
50  */
51 struct ClientHandle;
52
53
54 /**
55  * Entry in the local forwarding map for a client's GET request.
56  */
57 struct ClientQueryRecord
58 {
59
60   /**
61    * The key this request was about
62    */
63   struct GNUNET_HashCode key;
64
65   /**
66    * Kept in a DLL with @e client.
67    */
68   struct ClientQueryRecord *next;
69
70   /**
71    * Kept in a DLL with @e client.
72    */
73   struct ClientQueryRecord *prev;
74
75   /**
76    * Client responsible for the request.
77    */
78   struct ClientHandle *ch;
79
80   /**
81    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
82    */
83   const void *xquery;
84
85   /**
86    * Replies we have already seen for this request.
87    */
88   struct GNUNET_HashCode *seen_replies;
89
90   /**
91    * Pointer to this nodes heap location in the retry-heap (for fast removal)
92    */
93   struct GNUNET_CONTAINER_HeapNode *hnode;
94
95   /**
96    * What's the delay between re-try operations that we currently use for this
97    * request?
98    */
99   struct GNUNET_TIME_Relative retry_frequency;
100
101   /**
102    * What's the next time we should re-try this request?
103    */
104   struct GNUNET_TIME_Absolute retry_time;
105
106   /**
107    * The unique identifier of this request
108    */
109   uint64_t unique_id;
110
111   /**
112    * Number of bytes in xquery.
113    */
114   size_t xquery_size;
115
116   /**
117    * Number of entries in 'seen_replies'.
118    */
119   unsigned int seen_replies_count;
120
121   /**
122    * Desired replication level
123    */
124   uint32_t replication;
125
126   /**
127    * Any message options for this request
128    */
129   uint32_t msg_options;
130
131   /**
132    * The type for the data for the GET request.
133    */
134   enum GNUNET_BLOCK_Type type;
135
136 };
137
138
139 /**
140  * Struct containing paremeters of monitoring requests.
141  */
142 struct ClientMonitorRecord
143 {
144
145   /**
146    * Next element in DLL.
147    */
148   struct ClientMonitorRecord *next;
149
150   /**
151    * Previous element in DLL.
152    */
153   struct ClientMonitorRecord *prev;
154
155   /**
156    * Type of blocks that are of interest
157    */
158   enum GNUNET_BLOCK_Type type;
159
160   /**
161    * Key of data of interest, NULL for all.
162    */
163   struct GNUNET_HashCode *key;
164
165   /**
166    * Flag whether to notify about GET messages.
167    */
168   int16_t get;
169
170   /**
171    * Flag whether to notify about GET_REPONSE messages.
172    */
173   int16_t get_resp;
174
175   /**
176    * Flag whether to notify about PUT messages.
177    */
178   uint16_t put;
179
180   /**
181    * Client to notify of these requests.
182    */
183   struct ClientHandle *ch;
184 };
185
186
187 /**
188  * Struct containing information about a client,
189  * handle to connect to it, and any pending messages
190  * that need to be sent to it.
191  */
192 struct ClientHandle
193 {
194   /**
195    * Linked list of active queries of this client.
196    */
197   struct ClientQueryRecord *cqr_head;
198
199   /**
200    * Linked list of active queries of this client.
201    */
202   struct ClientQueryRecord *cqr_tail;
203
204   /**
205    * The handle to this client
206    */
207   struct GNUNET_SERVICE_Client *client;
208
209   /**
210    * The message queue to this client
211    */
212   struct GNUNET_MQ_Handle *mq;
213
214 };
215
216 /**
217  * Our handle to the BLOCK library.
218  */
219 struct GNUNET_BLOCK_Context *GDS_block_context;
220
221 /**
222  * Handle for the statistics service.
223  */
224 struct GNUNET_STATISTICS_Handle *GDS_stats;
225
226 /**
227  * Handle for the service.
228  */
229 struct GNUNET_SERVICE_Handle *GDS_service;
230
231 /**
232  * The configuration the DHT service is running with
233  */
234 const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
235
236 /**
237  * List of active monitoring requests.
238  */
239 static struct ClientMonitorRecord *monitor_head;
240
241 /**
242  * List of active monitoring requests.
243  */
244 static struct ClientMonitorRecord *monitor_tail;
245
246 /**
247  * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
248  */
249 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
250
251 /**
252  * Heap with all of our client's request, sorted by retry time (earliest on top).
253  */
254 static struct GNUNET_CONTAINER_Heap *retry_heap;
255
256 /**
257  * Task that re-transmits requests (using retry_heap).
258  */
259 static struct GNUNET_SCHEDULER_Task *retry_task;
260
261
262 /**
263  * Free data structures associated with the given query.
264  *
265  * @param record record to remove
266  */
267 static void
268 remove_client_record (struct ClientQueryRecord *record)
269 {
270   struct ClientHandle *ch = record->ch;
271
272   GNUNET_CONTAINER_DLL_remove (ch->cqr_head,
273                                ch->cqr_tail,
274                                record);
275   GNUNET_assert (GNUNET_YES ==
276                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
277                                                        &record->key,
278                                                        record));
279   if (NULL != record->hnode)
280     GNUNET_CONTAINER_heap_remove_node (record->hnode);
281   GNUNET_array_grow (record->seen_replies,
282                      record->seen_replies_count,
283                      0);
284   GNUNET_free (record);
285 }
286
287
288 /**
289  * Functions with this signature are called whenever a local client is
290  * connects to us.
291  *
292  * @param cls closure (NULL for dht)
293  * @param client identification of the client
294  * @param mq message queue for talking to @a client
295  * @return our `struct ClientHandle` for @a client
296  */
297 static void *
298 client_connect_cb (void *cls,
299                    struct GNUNET_SERVICE_Client *client,
300                    struct GNUNET_MQ_Handle *mq)
301 {
302   struct ClientHandle *ch;
303
304   ch = GNUNET_new (struct ClientHandle);
305   ch->client = client;
306   ch->mq = mq;
307   return ch;
308 }
309
310
311 /**
312  * Functions with this signature are called whenever a client
313  * is disconnected on the network level.
314  *
315  * @param cls closure (NULL for dht)
316  * @param client identification of the client
317  * @param app_ctx our `struct ClientHandle` for @a client
318  */
319 static void
320 client_disconnect_cb (void *cls,
321                       struct GNUNET_SERVICE_Client *client,
322                       void *app_ctx)
323 {
324   struct ClientHandle *ch = app_ctx;
325   struct ClientQueryRecord *cqr;
326   struct ClientMonitorRecord *monitor;
327
328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329               "Local client %p disconnects\n",
330               ch);
331   monitor = monitor_head;
332   while (NULL != monitor)
333   {
334     if (monitor->ch == ch)
335     {
336       struct ClientMonitorRecord *next;
337
338       next = monitor->next;
339       GNUNET_free_non_null (monitor->key);
340       GNUNET_CONTAINER_DLL_remove (monitor_head,
341                                    monitor_tail,
342                                    monitor);
343       GNUNET_free (monitor);
344       monitor = next;
345     }
346     else
347     {
348       monitor = monitor->next;
349     }
350   }
351   while (NULL != (cqr = ch->cqr_head))
352     remove_client_record (cqr);
353   GNUNET_free (ch);
354 }
355
356
357 /**
358  * Route the given request via the DHT.  This includes updating
359  * the bloom filter and retransmission times, building the P2P
360  * message and initiating the routing operation.
361  */
362 static void
363 transmit_request (struct ClientQueryRecord *cqr)
364 {
365   int32_t reply_bf_mutator;
366   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
367   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
368
369   GNUNET_STATISTICS_update (GDS_stats,
370                             gettext_noop ("# GET requests from clients injected"),
371                             1,
372                             GNUNET_NO);
373   reply_bf_mutator =
374       (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
375                                           UINT32_MAX);
376   reply_bf
377     = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
378                                           cqr->seen_replies,
379                                           cqr->seen_replies_count);
380   peer_bf
381     = GNUNET_CONTAINER_bloomfilter_init (NULL,
382                                          DHT_BLOOM_SIZE,
383                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
384   LOG (GNUNET_ERROR_TYPE_DEBUG,
385        "Initiating GET for %s, replication %u, already have %u replies\n",
386        GNUNET_h2s (&cqr->key),
387        cqr->replication,
388        cqr->seen_replies_count);
389   GDS_NEIGHBOURS_handle_get (cqr->type,
390                              cqr->msg_options,
391                              cqr->replication,
392                              0 /* hop count */ ,
393                              &cqr->key,
394                              cqr->xquery,
395                              cqr->xquery_size,
396                              reply_bf,
397                              reply_bf_mutator,
398                              peer_bf);
399   GNUNET_CONTAINER_bloomfilter_free (reply_bf);
400   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
401
402   /* exponential back-off for retries.
403    * max GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
404   cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
405   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
406 }
407
408
409 /**
410  * Task that looks at the #retry_heap and transmits all of the requests
411  * on the heap that are ready for transmission.  Then re-schedules
412  * itself (unless the heap is empty).
413  *
414  * @param cls unused
415  */
416 static void
417 transmit_next_request_task (void *cls)
418 {
419   struct ClientQueryRecord *cqr;
420   struct GNUNET_TIME_Relative delay;
421
422   retry_task = NULL;
423   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
424   {
425     cqr->hnode = NULL;
426     delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
427     if (delay.rel_value_us > 0)
428     {
429       cqr->hnode
430         = GNUNET_CONTAINER_heap_insert (retry_heap,
431                                         cqr,
432                                         cqr->retry_time.abs_value_us);
433       retry_task
434         = GNUNET_SCHEDULER_add_at (cqr->retry_time,
435                                    &transmit_next_request_task,
436                                    NULL);
437       return;
438     }
439     transmit_request (cqr);
440     cqr->hnode
441       = GNUNET_CONTAINER_heap_insert (retry_heap,
442                                       cqr,
443                                       cqr->retry_time.abs_value_us);
444   }
445 }
446
447
448 /**
449  * Check DHT PUT messages from the client.
450  *
451  * @param cls the client we received this message from
452  * @param dht_msg the actual message received
453  * @return #GNUNET_OK (always)
454  */
455 static int
456 check_dht_local_put (void *cls,
457                       const struct GNUNET_DHT_ClientPutMessage *dht_msg)
458 {
459   /* always well-formed */
460   return GNUNET_OK;
461 }
462
463
464 /**
465  * Handler for PUT messages.
466  *
467  * @param cls the client we received this message from
468  * @param dht_msg the actual message received
469  */
470 static void
471 handle_dht_local_put (void *cls,
472                       const struct GNUNET_DHT_ClientPutMessage *dht_msg)
473 {
474   struct ClientHandle *ch = cls;
475   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
476   uint16_t size;
477   struct GNUNET_MQ_Envelope *env;
478   struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
479
480   size = ntohs (dht_msg->header.size);
481   GNUNET_STATISTICS_update (GDS_stats,
482                             gettext_noop ("# PUT requests received from clients"),
483                             1,
484                             GNUNET_NO);
485   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
486                "CLIENT-PUT %s\n",
487                GNUNET_h2s_full (&dht_msg->key));
488   /* give to local clients */
489   LOG (GNUNET_ERROR_TYPE_DEBUG,
490        "Handling local PUT of %u-bytes for query %s\n",
491        size - sizeof (struct GNUNET_DHT_ClientPutMessage),
492        GNUNET_h2s (&dht_msg->key));
493   GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
494                             &dht_msg->key,
495                             0,
496                             NULL,
497                             0,
498                             NULL,
499                             ntohl (dht_msg->type),
500                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
501                             &dht_msg[1]);
502   /* store locally */
503   GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
504                             &dht_msg->key,
505                             0,
506                             NULL,
507                             ntohl (dht_msg->type),
508                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
509                             &dht_msg[1]);
510   /* route to other peers */
511   peer_bf
512     = GNUNET_CONTAINER_bloomfilter_init (NULL,
513                                          DHT_BLOOM_SIZE,
514                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
515   GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
516                              ntohl (dht_msg->options),
517                              ntohl (dht_msg->desired_replication_level),
518                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
519                              0 /* hop count */,
520                              peer_bf,
521                              &dht_msg->key,
522                              0,
523                              NULL,
524                              &dht_msg[1],
525                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
526   GDS_CLIENTS_process_put (ntohl (dht_msg->options),
527                            ntohl (dht_msg->type),
528                            0,
529                            ntohl (dht_msg->desired_replication_level),
530                            1,
531                            GDS_NEIGHBOURS_get_id(),
532                            GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
533                            &dht_msg->key,
534                            &dht_msg[1],
535                            size - sizeof (struct GNUNET_DHT_ClientPutMessage));
536   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
537   env = GNUNET_MQ_msg (conf,
538                        GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
539   conf->reserved = htonl (0);
540   conf->unique_id = dht_msg->unique_id;
541   GNUNET_MQ_send (ch->mq,
542                   env);
543   GNUNET_SERVICE_client_continue (ch->client);
544 }
545
546
547 /**
548  * Check DHT GET messages from the client.
549  *
550  * @param cls the client we received this message from
551  * @param message the actual message received
552  * @return #GNUNET_OK (always)
553  */
554 static int
555 check_dht_local_get (void *cls,
556                       const struct GNUNET_DHT_ClientGetMessage *get)
557 {
558   /* always well-formed */
559   return GNUNET_OK;
560 }
561
562
563 /**
564  * Handle a result from local datacache for a GET operation.
565  *
566  * @param cls the `struct ClientHandle` of the client doing the query
567  * @param type type of the block
568  * @param expiration_time when does the content expire
569  * @param key key for the content
570  * @param put_path_length number of entries in @a put_path
571  * @param put_path peers the original PUT traversed (if tracked)
572  * @param get_path_length number of entries in @a get_path
573  * @param get_path peers this reply has traversed so far (if tracked)
574  * @param data payload of the reply
575  * @param data_size number of bytes in @a data
576  */
577 static void
578 handle_local_result (void *cls,
579                      enum GNUNET_BLOCK_Type type,
580                      struct GNUNET_TIME_Absolute expiration_time,
581                      const struct GNUNET_HashCode *key,
582                      unsigned int put_path_length,
583                      const struct GNUNET_PeerIdentity *put_path,
584                      unsigned int get_path_length,
585                      const struct GNUNET_PeerIdentity *get_path,
586                      const void *data,
587                      size_t data_size)
588 {
589   // FIXME: this needs some clean up: inline the function,
590   // possibly avoid even looking up the client!
591   GDS_CLIENTS_handle_reply (expiration_time,
592                             key,
593                             0, NULL,
594                             put_path_length, put_path,
595                             type,
596                             data_size, data);
597 }
598
599
600 /**
601  * Handler for DHT GET messages from the client.
602  *
603  * @param cls the client we received this message from
604  * @param message the actual message received
605  */
606 static void
607 handle_dht_local_get (void *cls,
608                       const struct GNUNET_DHT_ClientGetMessage *get)
609 {
610   struct ClientHandle *ch = cls;
611   struct ClientQueryRecord *cqr;
612   size_t xquery_size;
613   const char *xquery;
614   uint16_t size;
615
616   size = ntohs (get->header.size);
617   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
618   xquery = (const char *) &get[1];
619   GNUNET_STATISTICS_update (GDS_stats,
620                             gettext_noop
621                             ("# GET requests received from clients"), 1,
622                             GNUNET_NO);
623   LOG (GNUNET_ERROR_TYPE_DEBUG,
624        "Received GET request for %s from local client %p, xq: %.*s\n",
625        GNUNET_h2s (&get->key),
626        ch->client,
627        xquery_size,
628        xquery);
629   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
630                "CLIENT-GET %s\n",
631                GNUNET_h2s_full (&get->key));
632
633   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
634   cqr->key = get->key;
635   cqr->ch = ch;
636   cqr->xquery = (void *) &cqr[1];
637   GNUNET_memcpy (&cqr[1], xquery, xquery_size);
638   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0);
639   cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
640   cqr->retry_time = GNUNET_TIME_absolute_get ();
641   cqr->unique_id = get->unique_id;
642   cqr->xquery_size = xquery_size;
643   cqr->replication = ntohl (get->desired_replication_level);
644   cqr->msg_options = ntohl (get->options);
645   cqr->type = ntohl (get->type);
646   GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
647                                ch->cqr_tail,
648                                cqr);
649   GNUNET_CONTAINER_multihashmap_put (forward_map,
650                                      &cqr->key,
651                                      cqr,
652                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
653   GDS_CLIENTS_process_get (ntohl (get->options),
654                            ntohl (get->type),
655                            0,
656                            ntohl (get->desired_replication_level),
657                            1,
658                            GDS_NEIGHBOURS_get_id(),
659                            &get->key);
660   /* start remote requests */
661   if (NULL != retry_task)
662     GNUNET_SCHEDULER_cancel (retry_task);
663   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task,
664                                          NULL);
665   /* perform local lookup */
666   GDS_DATACACHE_handle_get (&get->key,
667                             cqr->type,
668                             cqr->xquery,
669                             xquery_size,
670                             NULL,
671                             0,
672                             &handle_local_result,
673                             ch);
674   GNUNET_SERVICE_client_continue (ch->client);
675 }
676
677
678 /**
679  * Closure for #find_by_unique_id().
680  */
681 struct FindByUniqueIdContext
682 {
683   /**
684    * Where to store the result, if found.
685    */
686   struct ClientQueryRecord *cqr;
687
688   uint64_t unique_id;
689 };
690
691
692 /**
693  * Function called for each existing DHT record for the given
694  * query.  Checks if it matches the UID given in the closure
695  * and if so returns the entry as a result.
696  *
697  * @param cls the search context
698  * @param key query for the lookup (not used)
699  * @param value the `struct ClientQueryRecord`
700  * @return #GNUNET_YES to continue iteration (result not yet found)
701  */
702 static int
703 find_by_unique_id (void *cls,
704                    const struct GNUNET_HashCode *key,
705                    void *value)
706 {
707   struct FindByUniqueIdContext *fui_ctx = cls;
708   struct ClientQueryRecord *cqr = value;
709
710   if (cqr->unique_id != fui_ctx->unique_id)
711     return GNUNET_YES;
712   fui_ctx->cqr = cqr;
713   return GNUNET_NO;
714 }
715
716
717 /**
718  * Check "GET result seen" messages from the client.
719  *
720  * @param cls the client we received this message from
721  * @param message the actual message received
722  * @return #GNUNET_OK if @a seen is well-formed
723  */
724 static int
725 check_dht_local_get_result_seen (void *cls,
726                                  const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
727 {
728   uint16_t size;
729   unsigned int hash_count;
730
731   size = ntohs (seen->header.size);
732   hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
733   if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode))
734   {
735     GNUNET_break (0);
736     return GNUNET_SYSERR;
737   }
738   return GNUNET_OK;
739 }
740
741
742 /**
743  * Handler for "GET result seen" messages from the client.
744  *
745  * @param cls the client we received this message from
746  * @param message the actual message received
747  */
748 static void
749 handle_dht_local_get_result_seen (void *cls,
750                                   const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
751 {
752   struct ClientHandle *ch = cls;
753   uint16_t size;
754   unsigned int hash_count;
755   unsigned int old_count;
756   const struct GNUNET_HashCode *hc;
757   struct FindByUniqueIdContext fui_ctx;
758   struct ClientQueryRecord *cqr;
759
760   size = ntohs (seen->header.size);
761   hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
762   hc = (const struct GNUNET_HashCode*) &seen[1];
763   fui_ctx.unique_id = seen->unique_id;
764   fui_ctx.cqr = NULL;
765   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
766                                               &seen->key,
767                                               &find_by_unique_id,
768                                               &fui_ctx);
769   if (NULL == (cqr = fui_ctx.cqr))
770   {
771     GNUNET_break (0);
772     GNUNET_SERVICE_client_drop (ch->client);
773     return;
774   }
775   /* finally, update 'seen' list */
776   old_count = cqr->seen_replies_count;
777   GNUNET_array_grow (cqr->seen_replies,
778                      cqr->seen_replies_count,
779                      cqr->seen_replies_count + hash_count);
780   GNUNET_memcpy (&cqr->seen_replies[old_count],
781                  hc,
782                  sizeof (struct GNUNET_HashCode) * hash_count);
783 }
784
785
786 /**
787  * Closure for #remove_by_unique_id().
788  */
789 struct RemoveByUniqueIdContext
790 {
791   /**
792    * Client that issued the removal request.
793    */
794   struct ClientHandle *ch;
795
796   /**
797    * Unique ID of the request.
798    */
799   uint64_t unique_id;
800 };
801
802
803 /**
804  * Iterator over hash map entries that frees all entries
805  * that match the given client and unique ID.
806  *
807  * @param cls unique ID and client to search for in source routes
808  * @param key current key code
809  * @param value value in the hash map, a ClientQueryRecord
810  * @return #GNUNET_YES (we should continue to iterate)
811  */
812 static int
813 remove_by_unique_id (void *cls,
814                      const struct GNUNET_HashCode *key,
815                      void *value)
816 {
817   const struct RemoveByUniqueIdContext *ctx = cls;
818   struct ClientQueryRecord *cqr = value;
819
820   if (cqr->unique_id != ctx->unique_id)
821     return GNUNET_YES;
822   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
823               "Removing client %p's record for key %s (by unique id)\n",
824               ctx->ch->client,
825               GNUNET_h2s (key));
826   remove_client_record (cqr);
827   return GNUNET_YES;
828 }
829
830
831 /**
832  * Handler for any generic DHT stop messages, calls the appropriate handler
833  * depending on message type (if processed locally)
834  *
835  * @param cls client we received this message from
836  * @param message the actual message received
837  *
838  */
839 static void
840 handle_dht_local_get_stop (void *cls,
841                            const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
842 {
843   struct ClientHandle *ch = cls;
844   struct RemoveByUniqueIdContext ctx;
845
846   GNUNET_STATISTICS_update (GDS_stats,
847                             gettext_noop
848                             ("# GET STOP requests received from clients"), 1,
849                             GNUNET_NO);
850   LOG (GNUNET_ERROR_TYPE_DEBUG,
851        "Received GET STOP request for %s from local client %p\n",
852        GNUNET_h2s (&dht_stop_msg->key),
853        ch->client);
854   ctx.ch = ch;
855   ctx.unique_id = dht_stop_msg->unique_id;
856   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
857                                               &dht_stop_msg->key,
858                                               &remove_by_unique_id,
859                                               &ctx);
860   GNUNET_SERVICE_client_continue (ch->client);
861 }
862
863
864 /**
865  * Handler for monitor start messages
866  *
867  * @param cls the client we received this message from
868  * @param msg the actual message received
869  *
870  */
871 static void
872 handle_dht_local_monitor (void *cls,
873                           const struct GNUNET_DHT_MonitorStartStopMessage *msg)
874 {
875   struct ClientHandle *ch = cls;
876   struct ClientMonitorRecord *r;
877
878   r = GNUNET_new (struct ClientMonitorRecord);
879   r->ch = ch;
880   r->type = ntohl (msg->type);
881   r->get = ntohs (msg->get);
882   r->get_resp = ntohs (msg->get_resp);
883   r->put = ntohs (msg->put);
884   if (0 == ntohs (msg->filter_key))
885   {
886     r->key = NULL;
887   }
888   else
889   {
890     r->key = GNUNET_new (struct GNUNET_HashCode);
891     GNUNET_memcpy (r->key,
892                    &msg->key,
893                    sizeof (struct GNUNET_HashCode));
894   }
895   GNUNET_CONTAINER_DLL_insert (monitor_head,
896                                monitor_tail,
897                                r);
898   GNUNET_SERVICE_client_continue (ch->client);
899 }
900
901
902 /**
903  * Handler for monitor stop messages
904  *
905  * @param cls the client we received this message from
906  * @param msg the actual message received
907  */
908 static void
909 handle_dht_local_monitor_stop (void *cls,
910                                const struct GNUNET_DHT_MonitorStartStopMessage *msg)
911 {
912   struct ClientHandle *ch = cls;
913   struct ClientMonitorRecord *r;
914   int keys_match;
915
916   GNUNET_SERVICE_client_continue (ch->client);
917   for (r = monitor_head; NULL != r; r = r->next)
918   {
919     if (NULL == r->key)
920     {
921       keys_match = (0 == ntohs(msg->filter_key));
922     }
923     else
924     {
925       keys_match = ( (0 != ntohs(msg->filter_key)) &&
926                      (! memcmp (r->key,
927                                 &msg->key,
928                                 sizeof(struct GNUNET_HashCode))) );
929     }
930     if ( (ch == r->ch) &&
931          (ntohl(msg->type) == r->type) &&
932          (r->get == msg->get) &&
933          (r->get_resp == msg->get_resp) &&
934          (r->put == msg->put) &&
935          keys_match )
936     {
937       GNUNET_CONTAINER_DLL_remove (monitor_head,
938                                    monitor_tail,
939                                    r);
940       GNUNET_free_non_null (r->key);
941       GNUNET_free (r);
942       return; /* Delete only ONE entry */
943     }
944   }
945 }
946
947
948 /**
949  * Closure for #forward_reply()
950  */
951 struct ForwardReplyContext
952 {
953
954   /**
955    * Expiration time of the reply.
956    */
957   struct GNUNET_TIME_Absolute expiration;
958
959   /**
960    * GET path taken.
961    */
962   const struct GNUNET_PeerIdentity *get_path;
963
964   /**
965    * PUT path taken.
966    */
967   const struct GNUNET_PeerIdentity *put_path;
968
969   /**
970    * Embedded payload.
971    */
972   const void *data;
973
974   /**
975    * Number of bytes in data.
976    */
977   size_t data_size;
978
979   /**
980    * Number of entries in @e get_path.
981    */
982   unsigned int get_path_length;
983
984   /**
985    * Number of entries in @e put_path.
986    */
987   unsigned int put_path_length;
988
989   /**
990    * Type of the data.
991    */
992   enum GNUNET_BLOCK_Type type;
993
994 };
995
996
997 /**
998  * Iterator over hash map entries that send a given reply to
999  * each of the matching clients.  With some tricky recycling
1000  * of the buffer.
1001  *
1002  * @param cls the 'struct ForwardReplyContext'
1003  * @param key current key
1004  * @param value value in the hash map, a ClientQueryRecord
1005  * @return #GNUNET_YES (we should continue to iterate),
1006  *         if the result is mal-formed, #GNUNET_NO
1007  */
1008 static int
1009 forward_reply (void *cls,
1010                const struct GNUNET_HashCode *key,
1011                void *value)
1012 {
1013   struct ForwardReplyContext *frc = cls;
1014   struct ClientQueryRecord *record = value;
1015   struct GNUNET_MQ_Envelope *env;
1016   struct GNUNET_DHT_ClientResultMessage *reply;
1017   enum GNUNET_BLOCK_EvaluationResult eval;
1018   int do_free;
1019   struct GNUNET_HashCode ch;
1020   struct GNUNET_PeerIdentity *paths;
1021
1022   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1023                "CLIENT-RESULT %s\n",
1024                GNUNET_h2s_full (key));
1025   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
1026        (record->type != frc->type))
1027   {
1028     LOG (GNUNET_ERROR_TYPE_DEBUG,
1029          "Record type missmatch, not passing request for key %s to local client\n",
1030          GNUNET_h2s (key));
1031     GNUNET_STATISTICS_update (GDS_stats,
1032                               gettext_noop
1033                               ("# Key match, type mismatches in REPLY to CLIENT"),
1034                               1, GNUNET_NO);
1035     return GNUNET_YES;          /* type mismatch */
1036   }
1037   GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch);
1038   for (unsigned int i = 0; i < record->seen_replies_count; i++)
1039     if (0 == memcmp (&record->seen_replies[i],
1040                      &ch,
1041                      sizeof (struct GNUNET_HashCode)))
1042     {
1043       LOG (GNUNET_ERROR_TYPE_DEBUG,
1044            "Duplicate reply, not passing request for key %s to local client\n",
1045            GNUNET_h2s (key));
1046       GNUNET_STATISTICS_update (GDS_stats,
1047                                 gettext_noop
1048                                 ("# Duplicate REPLIES to CLIENT request dropped"),
1049                                 1, GNUNET_NO);
1050       return GNUNET_YES;        /* duplicate */
1051     }
1052   eval
1053     = GNUNET_BLOCK_evaluate (GDS_block_context,
1054                              record->type,
1055                              GNUNET_BLOCK_EO_NONE,
1056                              key,
1057                              NULL,
1058                              0,
1059                              record->xquery,
1060                              record->xquery_size,
1061                              frc->data,
1062                              frc->data_size);
1063   LOG (GNUNET_ERROR_TYPE_DEBUG,
1064        "Evaluation result is %d for key %s for local client's query\n",
1065        (int) eval,
1066        GNUNET_h2s (key));
1067   switch (eval)
1068   {
1069   case GNUNET_BLOCK_EVALUATION_OK_LAST:
1070     do_free = GNUNET_YES;
1071     break;
1072   case GNUNET_BLOCK_EVALUATION_OK_MORE:
1073     GNUNET_array_append (record->seen_replies,
1074                          record->seen_replies_count,
1075                          ch);
1076     do_free = GNUNET_NO;
1077     break;
1078   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1079     /* should be impossible to encounter here */
1080     GNUNET_break (0);
1081     return GNUNET_YES;
1082   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1083     GNUNET_break_op (0);
1084     return GNUNET_NO;
1085   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1086     GNUNET_break (0);
1087     return GNUNET_NO;
1088   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1089     GNUNET_break (0);
1090     return GNUNET_NO;
1091   case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1092     return GNUNET_YES;
1093   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1094     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1095                 _("Unsupported block type (%u) in request!\n"), record->type);
1096     return GNUNET_NO;
1097   default:
1098     GNUNET_break (0);
1099     return GNUNET_NO;
1100   }
1101   GNUNET_STATISTICS_update (GDS_stats,
1102                             gettext_noop ("# RESULTS queued for clients"),
1103                             1,
1104                             GNUNET_NO);
1105   env = GNUNET_MQ_msg_extra (reply,
1106                              frc->data_size +
1107                              (frc->get_path_length + frc->put_path_length) * sizeof (struct GNUNET_PeerIdentity),
1108                              GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1109   reply->type = htonl (frc->type);
1110   reply->get_path_length = htonl (frc->get_path_length);
1111   reply->put_path_length = htonl (frc->put_path_length);
1112   reply->unique_id = record->unique_id;
1113   reply->expiration = GNUNET_TIME_absolute_hton (frc->expiration);
1114   reply->key = *key;
1115   paths = (struct GNUNET_PeerIdentity *) &reply[1];
1116   GNUNET_memcpy (paths,
1117                  frc->put_path,
1118                  sizeof (struct GNUNET_PeerIdentity) * frc->put_path_length);
1119   GNUNET_memcpy (&paths[frc->put_path_length],
1120                  frc->get_path,
1121                  sizeof (struct GNUNET_PeerIdentity) * frc->get_path_length);
1122   GNUNET_memcpy (&paths[frc->get_path_length + frc->put_path_length],
1123                  frc->data,
1124                  frc->data_size);
1125   LOG (GNUNET_ERROR_TYPE_DEBUG,
1126        "Sending reply to query %s for client %p\n",
1127        GNUNET_h2s (key),
1128        record->ch->client);
1129   GNUNET_MQ_send (record->ch->mq,
1130                   env);
1131   if (GNUNET_YES == do_free)
1132     remove_client_record (record);
1133   return GNUNET_YES;
1134 }
1135
1136
1137 /**
1138  * Handle a reply we've received from another peer.  If the reply
1139  * matches any of our pending queries, forward it to the respective
1140  * client(s).
1141  *
1142  * @param expiration when will the reply expire
1143  * @param key the query this reply is for
1144  * @param get_path_length number of peers in @a get_path
1145  * @param get_path path the reply took on get
1146  * @param put_path_length number of peers in @a put_path
1147  * @param put_path path the reply took on put
1148  * @param type type of the reply
1149  * @param data_size number of bytes in @a data
1150  * @param data application payload data
1151  */
1152 void
1153 GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
1154                           const struct GNUNET_HashCode *key,
1155                           unsigned int get_path_length,
1156                           const struct GNUNET_PeerIdentity *get_path,
1157                           unsigned int put_path_length,
1158                           const struct GNUNET_PeerIdentity *put_path,
1159                           enum GNUNET_BLOCK_Type type,
1160                           size_t data_size,
1161                           const void *data)
1162 {
1163   struct ForwardReplyContext frc;
1164   size_t msize;
1165
1166   msize = sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size +
1167     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1168   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1169   {
1170     GNUNET_break (0);
1171     return;
1172   }
1173   if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map,
1174                                                  key))
1175   {
1176     LOG (GNUNET_ERROR_TYPE_DEBUG,
1177          "No matching client for reply for key %s\n",
1178          GNUNET_h2s (key));
1179     GNUNET_STATISTICS_update (GDS_stats,
1180                               gettext_noop ("# REPLIES ignored for CLIENTS (no match)"),
1181                               1,
1182                               GNUNET_NO);
1183     return;                     /* no matching request, fast exit! */
1184   }
1185   frc.expiration = expiration;
1186   frc.get_path = get_path;
1187   frc.put_path = put_path;
1188   frc.data = data;
1189   frc.data_size = data_size;
1190   frc.get_path_length = get_path_length;
1191   frc.put_path_length = put_path_length;
1192   frc.type = type;
1193   LOG (GNUNET_ERROR_TYPE_DEBUG,
1194        "Forwarding reply for key %s to client\n",
1195        GNUNET_h2s (key));
1196   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1197                                               key,
1198                                               &forward_reply,
1199                                               &frc);
1200
1201 }
1202
1203
1204 /**
1205  * Check if some client is monitoring GET messages and notify
1206  * them in that case.
1207  *
1208  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1209  * @param type The type of data in the request.
1210  * @param hop_count Hop count so far.
1211  * @param path_length number of entries in path (or 0 if not recorded).
1212  * @param path peers on the GET path (or NULL if not recorded).
1213  * @param desired_replication_level Desired replication level.
1214  * @param key Key of the requested data.
1215  */
1216 void
1217 GDS_CLIENTS_process_get (uint32_t options,
1218                          enum GNUNET_BLOCK_Type type,
1219                          uint32_t hop_count,
1220                          uint32_t desired_replication_level,
1221                          unsigned int path_length,
1222                          const struct GNUNET_PeerIdentity *path,
1223                          const struct GNUNET_HashCode * key)
1224 {
1225   struct ClientMonitorRecord *m;
1226   struct ClientHandle **cl;
1227   unsigned int cl_size;
1228
1229   cl = NULL;
1230   cl_size = 0;
1231   for (m = monitor_head; NULL != m; m = m->next)
1232   {
1233     if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1234            (m->type == type) ) &&
1235          ( (NULL == m->key) ||
1236            (0 == memcmp (key,
1237                          m->key,
1238                          sizeof(struct GNUNET_HashCode))) ) )
1239     {
1240       struct GNUNET_MQ_Envelope *env;
1241       struct GNUNET_DHT_MonitorGetMessage *mmsg;
1242       struct GNUNET_PeerIdentity *msg_path;
1243       size_t msize;
1244       unsigned int i;
1245
1246       /* Don't send duplicates */
1247       for (i = 0; i < cl_size; i++)
1248         if (cl[i] == m->ch)
1249           break;
1250       if (i < cl_size)
1251         continue;
1252       GNUNET_array_append (cl,
1253                            cl_size,
1254                            m->ch);
1255
1256       msize = path_length * sizeof (struct GNUNET_PeerIdentity);
1257       env = GNUNET_MQ_msg_extra (mmsg,
1258                                  msize,
1259                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1260       mmsg->options = htonl(options);
1261       mmsg->type = htonl(type);
1262       mmsg->hop_count = htonl(hop_count);
1263       mmsg->desired_replication_level = htonl(desired_replication_level);
1264       mmsg->get_path_length = htonl(path_length);
1265       mmsg->key = *key;
1266       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1267       GNUNET_memcpy (msg_path,
1268                      path,
1269                      path_length * sizeof (struct GNUNET_PeerIdentity));
1270       GNUNET_MQ_send (m->ch->mq,
1271                       env);
1272     }
1273   }
1274   GNUNET_free_non_null (cl);
1275 }
1276
1277
1278 /**
1279  * Check if some client is monitoring GET RESP messages and notify
1280  * them in that case.
1281  *
1282  * @param type The type of data in the result.
1283  * @param get_path Peers on GET path (or NULL if not recorded).
1284  * @param get_path_length number of entries in get_path.
1285  * @param put_path peers on the PUT path (or NULL if not recorded).
1286  * @param put_path_length number of entries in get_path.
1287  * @param exp Expiration time of the data.
1288  * @param key Key of the data.
1289  * @param data Pointer to the result data.
1290  * @param size Number of bytes in @a data.
1291  */
1292 void
1293 GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1294                               const struct GNUNET_PeerIdentity *get_path,
1295                               unsigned int get_path_length,
1296                               const struct GNUNET_PeerIdentity *put_path,
1297                               unsigned int put_path_length,
1298                               struct GNUNET_TIME_Absolute exp,
1299                               const struct GNUNET_HashCode * key,
1300                               const void *data,
1301                               size_t size)
1302 {
1303   struct ClientMonitorRecord *m;
1304   struct ClientHandle **cl;
1305   unsigned int cl_size;
1306
1307   cl = NULL;
1308   cl_size = 0;
1309   for (m = monitor_head; NULL != m; m = m->next)
1310   {
1311     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1312         (NULL == m->key ||
1313          memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1314     {
1315       struct GNUNET_MQ_Envelope *env;
1316       struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1317       struct GNUNET_PeerIdentity *path;
1318       size_t msize;
1319       unsigned int i;
1320
1321       /* Don't send duplicates */
1322       for (i = 0; i < cl_size; i++)
1323         if (cl[i] == m->ch)
1324           break;
1325       if (i < cl_size)
1326         continue;
1327       GNUNET_array_append (cl,
1328                            cl_size,
1329                            m->ch);
1330
1331       msize = size;
1332       msize += (get_path_length + put_path_length)
1333                * sizeof (struct GNUNET_PeerIdentity);
1334       env = GNUNET_MQ_msg_extra (mmsg,
1335                                  msize,
1336                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1337       mmsg->type = htonl(type);
1338       mmsg->put_path_length = htonl(put_path_length);
1339       mmsg->get_path_length = htonl(get_path_length);
1340       mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1341       mmsg->key = *key;
1342       path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1343       GNUNET_memcpy (path,
1344                      put_path,
1345                      put_path_length * sizeof (struct GNUNET_PeerIdentity));
1346       GNUNET_memcpy (path,
1347                      get_path,
1348                      get_path_length * sizeof (struct GNUNET_PeerIdentity));
1349       GNUNET_memcpy (&path[get_path_length],
1350                      data,
1351                      size);
1352       GNUNET_MQ_send (m->ch->mq,
1353                       env);
1354     }
1355   }
1356   GNUNET_free_non_null (cl);
1357 }
1358
1359
1360 /**
1361  * Check if some client is monitoring PUT messages and notify
1362  * them in that case.
1363  *
1364  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1365  * @param type The type of data in the request.
1366  * @param hop_count Hop count so far.
1367  * @param path_length number of entries in path (or 0 if not recorded).
1368  * @param path peers on the PUT path (or NULL if not recorded).
1369  * @param desired_replication_level Desired replication level.
1370  * @param exp Expiration time of the data.
1371  * @param key Key under which data is to be stored.
1372  * @param data Pointer to the data carried.
1373  * @param size Number of bytes in data.
1374  */
1375 void
1376 GDS_CLIENTS_process_put (uint32_t options,
1377                          enum GNUNET_BLOCK_Type type,
1378                          uint32_t hop_count,
1379                          uint32_t desired_replication_level,
1380                          unsigned int path_length,
1381                          const struct GNUNET_PeerIdentity *path,
1382                          struct GNUNET_TIME_Absolute exp,
1383                          const struct GNUNET_HashCode *key,
1384                          const void *data,
1385                          size_t size)
1386 {
1387   struct ClientMonitorRecord *m;
1388   struct ClientHandle **cl;
1389   unsigned int cl_size;
1390
1391   cl = NULL;
1392   cl_size = 0;
1393   for (m = monitor_head; NULL != m; m = m->next)
1394   {
1395     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1396         (NULL == m->key ||
1397          memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1398     {
1399       struct GNUNET_MQ_Envelope *env;
1400       struct GNUNET_DHT_MonitorPutMessage *mmsg;
1401       struct GNUNET_PeerIdentity *msg_path;
1402       size_t msize;
1403       unsigned int i;
1404
1405       /* Don't send duplicates */
1406       for (i = 0; i < cl_size; i++)
1407         if (cl[i] == m->ch)
1408           break;
1409       if (i < cl_size)
1410         continue;
1411       GNUNET_array_append (cl,
1412                            cl_size,
1413                            m->ch);
1414
1415       msize = size;
1416       msize += path_length * sizeof (struct GNUNET_PeerIdentity);
1417       env = GNUNET_MQ_msg_extra (mmsg,
1418                                  msize,
1419                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1420       mmsg->options = htonl(options);
1421       mmsg->type = htonl(type);
1422       mmsg->hop_count = htonl(hop_count);
1423       mmsg->desired_replication_level = htonl (desired_replication_level);
1424       mmsg->put_path_length = htonl (path_length);
1425       mmsg->key = *key;
1426       mmsg->expiration_time = GNUNET_TIME_absolute_hton (exp);
1427       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1428       GNUNET_memcpy (msg_path,
1429                      path,
1430                      path_length * sizeof (struct GNUNET_PeerIdentity));
1431       GNUNET_memcpy (&msg_path[path_length],
1432                      data,
1433                      size);
1434       GNUNET_MQ_send (m->ch->mq,
1435                       env);
1436     }
1437   }
1438   GNUNET_free_non_null (cl);
1439 }
1440
1441
1442 /**
1443  * Initialize client subsystem.
1444  *
1445  * @param server the initialized server
1446  */
1447 static void
1448 GDS_CLIENTS_init ()
1449 {
1450   forward_map
1451     = GNUNET_CONTAINER_multihashmap_create (1024,
1452                                             GNUNET_YES);
1453   retry_heap
1454     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1455 }
1456
1457
1458 /**
1459  * Shutdown client subsystem.
1460  */
1461 static void
1462 GDS_CLIENTS_stop ()
1463 {
1464   if (NULL != retry_task)
1465   {
1466     GNUNET_SCHEDULER_cancel (retry_task);
1467     retry_task = NULL;
1468   }
1469 }
1470
1471
1472 /**
1473  * Define "main" method using service macro.
1474  *
1475  * @param name name of the service, i.e. "dht" or "xdht"
1476  * @param run name of the initializaton method for the service
1477  */
1478 #define GDS_DHT_SERVICE_INIT(name,run)          \
1479  GNUNET_SERVICE_MAIN \
1480   (name, \
1481   GNUNET_SERVICE_OPTION_NONE, \
1482   run, \
1483   &client_connect_cb, \
1484   &client_disconnect_cb, \
1485   NULL, \
1486   GNUNET_MQ_hd_var_size (dht_local_put, \
1487                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1488                          struct GNUNET_DHT_ClientPutMessage, \
1489                          NULL), \
1490   GNUNET_MQ_hd_var_size (dht_local_get, \
1491                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1492                          struct GNUNET_DHT_ClientGetMessage, \
1493                          NULL), \
1494   GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \
1495                           GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1496                           struct GNUNET_DHT_ClientGetStopMessage, \
1497                           NULL), \
1498   GNUNET_MQ_hd_fixed_size (dht_local_monitor, \
1499                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1500                            struct GNUNET_DHT_MonitorStartStopMessage, \
1501                            NULL), \
1502   GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \
1503                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1504                            struct GNUNET_DHT_MonitorStartStopMessage, \
1505                            NULL), \
1506   GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \
1507                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1508                          struct GNUNET_DHT_ClientGetResultSeenMessage , \
1509                          NULL), \
1510   GNUNET_MQ_handler_end ())
1511
1512
1513 /**
1514  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1515  */
1516 void __attribute__ ((destructor))
1517 GDS_CLIENTS_done ()
1518 {
1519   if (NULL != retry_heap)
1520   {
1521     GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1522     GNUNET_CONTAINER_heap_destroy (retry_heap);
1523     retry_heap = NULL;
1524   }
1525   if (NULL != forward_map)
1526   {
1527     GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
1528     GNUNET_CONTAINER_multihashmap_destroy (forward_map);
1529     forward_map = NULL;
1530   }
1531 }
1532
1533 /* end of gnunet-service-dht_clients.c */