commented out wrong message type
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016, 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-service-dht.h"
33 #include "gnunet-service-dht_datacache.h"
34 #include "gnunet-service-dht_neighbours.h"
35 #include "dht.h"
36
37
38 /**
39  * Should routing details be logged to stderr (for debugging)?
40  */
41 #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "dht-clients",__VA_ARGS__)
44
45
46 /**
47  * Struct containing information about a client,
48  * handle to connect to it, and any pending messages
49  * that need to be sent to it.
50  */
51 struct ClientHandle;
52
53
54 /**
55  * Entry in the local forwarding map for a client's GET request.
56  */
57 struct ClientQueryRecord
58 {
59
60   /**
61    * The key this request was about
62    */
63   struct GNUNET_HashCode key;
64
65   /**
66    * Kept in a DLL with @e client.
67    */
68   struct ClientQueryRecord *next;
69
70   /**
71    * Kept in a DLL with @e client.
72    */
73   struct ClientQueryRecord *prev;
74
75   /**
76    * Client responsible for the request.
77    */
78   struct ClientHandle *ch;
79
80   /**
81    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
82    */
83   const void *xquery;
84
85   /**
86    * Replies we have already seen for this request.
87    */
88   struct GNUNET_HashCode *seen_replies;
89
90   /**
91    * Pointer to this nodes heap location in the retry-heap (for fast removal)
92    */
93   struct GNUNET_CONTAINER_HeapNode *hnode;
94
95   /**
96    * What's the delay between re-try operations that we currently use for this
97    * request?
98    */
99   struct GNUNET_TIME_Relative retry_frequency;
100
101   /**
102    * What's the next time we should re-try this request?
103    */
104   struct GNUNET_TIME_Absolute retry_time;
105
106   /**
107    * The unique identifier of this request
108    */
109   uint64_t unique_id;
110
111   /**
112    * Number of bytes in xquery.
113    */
114   size_t xquery_size;
115
116   /**
117    * Number of entries in 'seen_replies'.
118    */
119   unsigned int seen_replies_count;
120
121   /**
122    * Desired replication level
123    */
124   uint32_t replication;
125
126   /**
127    * Any message options for this request
128    */
129   uint32_t msg_options;
130
131   /**
132    * The type for the data for the GET request.
133    */
134   enum GNUNET_BLOCK_Type type;
135
136 };
137
138
139 /**
140  * Struct containing paremeters of monitoring requests.
141  */
142 struct ClientMonitorRecord
143 {
144
145   /**
146    * Next element in DLL.
147    */
148   struct ClientMonitorRecord *next;
149
150   /**
151    * Previous element in DLL.
152    */
153   struct ClientMonitorRecord *prev;
154
155   /**
156    * Type of blocks that are of interest
157    */
158   enum GNUNET_BLOCK_Type type;
159
160   /**
161    * Key of data of interest, NULL for all.
162    */
163   struct GNUNET_HashCode *key;
164
165   /**
166    * Flag whether to notify about GET messages.
167    */
168   int16_t get;
169
170   /**
171    * Flag whether to notify about GET_REPONSE messages.
172    */
173   int16_t get_resp;
174
175   /**
176    * Flag whether to notify about PUT messages.
177    */
178   uint16_t put;
179
180   /**
181    * Client to notify of these requests.
182    */
183   struct ClientHandle *ch;
184 };
185
186
187 /**
188  * Struct containing information about a client,
189  * handle to connect to it, and any pending messages
190  * that need to be sent to it.
191  */
192 struct ClientHandle
193 {
194   /**
195    * Linked list of active queries of this client.
196    */
197   struct ClientQueryRecord *cqr_head;
198
199   /**
200    * Linked list of active queries of this client.
201    */
202   struct ClientQueryRecord *cqr_tail;
203
204   /**
205    * The handle to this client
206    */
207   struct GNUNET_SERVICE_Client *client;
208
209   /**
210    * The message queue to this client
211    */
212   struct GNUNET_MQ_Handle *mq;
213
214 };
215
216 /**
217  * Our handle to the BLOCK library.
218  */
219 struct GNUNET_BLOCK_Context *GDS_block_context;
220
221 /**
222  * Handle for the statistics service.
223  */
224 struct GNUNET_STATISTICS_Handle *GDS_stats;
225
226 /**
227  * Handle for the service.
228  */
229 struct GNUNET_SERVICE_Handle *GDS_service;
230
231 /**
232  * The configuration the DHT service is running with
233  */
234 const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
235
236 /**
237  * List of active monitoring requests.
238  */
239 static struct ClientMonitorRecord *monitor_head;
240
241 /**
242  * List of active monitoring requests.
243  */
244 static struct ClientMonitorRecord *monitor_tail;
245
246 /**
247  * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
248  */
249 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
250
251 /**
252  * Heap with all of our client's request, sorted by retry time (earliest on top).
253  */
254 static struct GNUNET_CONTAINER_Heap *retry_heap;
255
256 /**
257  * Task that re-transmits requests (using retry_heap).
258  */
259 static struct GNUNET_SCHEDULER_Task *retry_task;
260
261
262 /**
263  * Free data structures associated with the given query.
264  *
265  * @param record record to remove
266  */
267 static void
268 remove_client_record (struct ClientQueryRecord *record)
269 {
270   struct ClientHandle *ch = record->ch;
271
272   GNUNET_CONTAINER_DLL_remove (ch->cqr_head,
273                                ch->cqr_tail,
274                                record);
275   GNUNET_assert (GNUNET_YES ==
276                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
277                                                        &record->key,
278                                                        record));
279   if (NULL != record->hnode)
280     GNUNET_CONTAINER_heap_remove_node (record->hnode);
281   GNUNET_array_grow (record->seen_replies,
282                      record->seen_replies_count,
283                      0);
284   GNUNET_free (record);
285 }
286
287
288 /**
289  * Functions with this signature are called whenever a local client is
290  * connects to us.
291  *
292  * @param cls closure (NULL for dht)
293  * @param client identification of the client
294  * @param mq message queue for talking to @a client
295  * @return our `struct ClientHandle` for @a client
296  */
297 static void *
298 client_connect_cb (void *cls,
299                    struct GNUNET_SERVICE_Client *client,
300                    struct GNUNET_MQ_Handle *mq)
301 {
302   struct ClientHandle *ch;
303
304   ch = GNUNET_new (struct ClientHandle);
305   ch->client = client;
306   ch->mq = mq;
307   return ch;
308 }
309
310
311 /**
312  * Functions with this signature are called whenever a client
313  * is disconnected on the network level.
314  *
315  * @param cls closure (NULL for dht)
316  * @param client identification of the client
317  * @param app_ctx our `struct ClientHandle` for @a client
318  */
319 static void
320 client_disconnect_cb (void *cls,
321                       struct GNUNET_SERVICE_Client *client,
322                       void *app_ctx)
323 {
324   struct ClientHandle *ch = app_ctx;
325   struct ClientQueryRecord *cqr;
326   struct ClientMonitorRecord *monitor;
327
328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329               "Local client %p disconnects\n",
330               ch);
331   monitor = monitor_head;
332   while (NULL != monitor)
333   {
334     if (monitor->ch == ch)
335     {
336       struct ClientMonitorRecord *next;
337
338       next = monitor->next;
339       GNUNET_free_non_null (monitor->key);
340       GNUNET_CONTAINER_DLL_remove (monitor_head,
341                                    monitor_tail,
342                                    monitor);
343       GNUNET_free (monitor);
344       monitor = next;
345     }
346     else
347     {
348       monitor = monitor->next;
349     }
350   }
351   while (NULL != (cqr = ch->cqr_head))
352     remove_client_record (cqr);
353   GNUNET_free (ch);
354 }
355
356
357 /**
358  * Route the given request via the DHT.  This includes updating
359  * the bloom filter and retransmission times, building the P2P
360  * message and initiating the routing operation.
361  */
362 static void
363 transmit_request (struct ClientQueryRecord *cqr)
364 {
365   struct GNUNET_BLOCK_Group *bg;
366   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
367
368   GNUNET_STATISTICS_update (GDS_stats,
369                             gettext_noop ("# GET requests from clients injected"),
370                             1,
371                             GNUNET_NO);
372   bg = GNUNET_BLOCK_group_create (GDS_block_context,
373                                   cqr->type,
374                                   GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
375                                                             UINT32_MAX),
376                                   NULL,
377                                   0,
378                                   "seen-set-size",
379                                   cqr->seen_replies_count,
380                                   NULL);
381   GNUNET_BLOCK_group_set_seen (bg,
382                                cqr->seen_replies,
383                                cqr->seen_replies_count);
384   peer_bf
385     = GNUNET_CONTAINER_bloomfilter_init (NULL,
386                                          DHT_BLOOM_SIZE,
387                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
388   LOG (GNUNET_ERROR_TYPE_DEBUG,
389        "Initiating GET for %s, replication %u, already have %u replies\n",
390        GNUNET_h2s (&cqr->key),
391        cqr->replication,
392        cqr->seen_replies_count);
393   GDS_NEIGHBOURS_handle_get (cqr->type,
394                              cqr->msg_options,
395                              cqr->replication,
396                              0 /* hop count */ ,
397                              &cqr->key,
398                              cqr->xquery,
399                              cqr->xquery_size,
400                              bg,
401                              peer_bf);
402   GNUNET_BLOCK_group_destroy (bg);
403   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
404
405   /* exponential back-off for retries.
406    * max GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
407   cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
408   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
409 }
410
411
412 /**
413  * Task that looks at the #retry_heap and transmits all of the requests
414  * on the heap that are ready for transmission.  Then re-schedules
415  * itself (unless the heap is empty).
416  *
417  * @param cls unused
418  */
419 static void
420 transmit_next_request_task (void *cls)
421 {
422   struct ClientQueryRecord *cqr;
423   struct GNUNET_TIME_Relative delay;
424
425   retry_task = NULL;
426   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
427   {
428     cqr->hnode = NULL;
429     delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
430     if (delay.rel_value_us > 0)
431     {
432       cqr->hnode
433         = GNUNET_CONTAINER_heap_insert (retry_heap,
434                                         cqr,
435                                         cqr->retry_time.abs_value_us);
436       retry_task
437         = GNUNET_SCHEDULER_add_at (cqr->retry_time,
438                                    &transmit_next_request_task,
439                                    NULL);
440       return;
441     }
442     transmit_request (cqr);
443     cqr->hnode
444       = GNUNET_CONTAINER_heap_insert (retry_heap,
445                                       cqr,
446                                       cqr->retry_time.abs_value_us);
447   }
448 }
449
450
451 /**
452  * Check DHT PUT messages from the client.
453  *
454  * @param cls the client we received this message from
455  * @param dht_msg the actual message received
456  * @return #GNUNET_OK (always)
457  */
458 static int
459 check_dht_local_put (void *cls,
460                       const struct GNUNET_DHT_ClientPutMessage *dht_msg)
461 {
462   /* always well-formed */
463   return GNUNET_OK;
464 }
465
466
467 /**
468  * Handler for PUT messages.
469  *
470  * @param cls the client we received this message from
471  * @param dht_msg the actual message received
472  */
473 static void
474 handle_dht_local_put (void *cls,
475                       const struct GNUNET_DHT_ClientPutMessage *dht_msg)
476 {
477   struct ClientHandle *ch = cls;
478   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
479   uint16_t size;
480   struct GNUNET_MQ_Envelope *env;
481   struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
482
483   size = ntohs (dht_msg->header.size);
484   GNUNET_STATISTICS_update (GDS_stats,
485                             gettext_noop ("# PUT requests received from clients"),
486                             1,
487                             GNUNET_NO);
488   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
489                "CLIENT-PUT %s\n",
490                GNUNET_h2s_full (&dht_msg->key));
491   /* give to local clients */
492   LOG (GNUNET_ERROR_TYPE_DEBUG,
493        "Handling local PUT of %u-bytes for query %s\n",
494        size - sizeof (struct GNUNET_DHT_ClientPutMessage),
495        GNUNET_h2s (&dht_msg->key));
496   GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
497                             &dht_msg->key,
498                             0,
499                             NULL,
500                             0,
501                             NULL,
502                             ntohl (dht_msg->type),
503                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
504                             &dht_msg[1]);
505   /* store locally */
506   GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
507                             &dht_msg->key,
508                             0,
509                             NULL,
510                             ntohl (dht_msg->type),
511                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
512                             &dht_msg[1]);
513   /* route to other peers */
514   peer_bf
515     = GNUNET_CONTAINER_bloomfilter_init (NULL,
516                                          DHT_BLOOM_SIZE,
517                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
518   GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
519                              ntohl (dht_msg->options),
520                              ntohl (dht_msg->desired_replication_level),
521                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
522                              0 /* hop count */,
523                              peer_bf,
524                              &dht_msg->key,
525                              0,
526                              NULL,
527                              &dht_msg[1],
528                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
529   GDS_CLIENTS_process_put (ntohl (dht_msg->options),
530                            ntohl (dht_msg->type),
531                            0,
532                            ntohl (dht_msg->desired_replication_level),
533                            1,
534                            GDS_NEIGHBOURS_get_id(),
535                            GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
536                            &dht_msg->key,
537                            &dht_msg[1],
538                            size - sizeof (struct GNUNET_DHT_ClientPutMessage));
539   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
540   env = GNUNET_MQ_msg (conf,
541                        GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
542   conf->reserved = htonl (0);
543   conf->unique_id = dht_msg->unique_id;
544   GNUNET_MQ_send (ch->mq,
545                   env);
546   GNUNET_SERVICE_client_continue (ch->client);
547 }
548
549
550 /**
551  * Check DHT GET messages from the client.
552  *
553  * @param cls the client we received this message from
554  * @param message the actual message received
555  * @return #GNUNET_OK (always)
556  */
557 static int
558 check_dht_local_get (void *cls,
559                       const struct GNUNET_DHT_ClientGetMessage *get)
560 {
561   /* always well-formed */
562   return GNUNET_OK;
563 }
564
565
566 /**
567  * Handle a result from local datacache for a GET operation.
568  *
569  * @param cls the `struct ClientHandle` of the client doing the query
570  * @param type type of the block
571  * @param expiration_time when does the content expire
572  * @param key key for the content
573  * @param put_path_length number of entries in @a put_path
574  * @param put_path peers the original PUT traversed (if tracked)
575  * @param get_path_length number of entries in @a get_path
576  * @param get_path peers this reply has traversed so far (if tracked)
577  * @param data payload of the reply
578  * @param data_size number of bytes in @a data
579  */
580 static void
581 handle_local_result (void *cls,
582                      enum GNUNET_BLOCK_Type type,
583                      struct GNUNET_TIME_Absolute expiration_time,
584                      const struct GNUNET_HashCode *key,
585                      unsigned int put_path_length,
586                      const struct GNUNET_PeerIdentity *put_path,
587                      unsigned int get_path_length,
588                      const struct GNUNET_PeerIdentity *get_path,
589                      const void *data,
590                      size_t data_size)
591 {
592   // FIXME: this needs some clean up: inline the function,
593   // possibly avoid even looking up the client!
594   GDS_CLIENTS_handle_reply (expiration_time,
595                             key,
596                             0, NULL,
597                             put_path_length, put_path,
598                             type,
599                             data_size, data);
600 }
601
602
603 /**
604  * Handler for DHT GET messages from the client.
605  *
606  * @param cls the client we received this message from
607  * @param message the actual message received
608  */
609 static void
610 handle_dht_local_get (void *cls,
611                       const struct GNUNET_DHT_ClientGetMessage *get)
612 {
613   struct ClientHandle *ch = cls;
614   struct ClientQueryRecord *cqr;
615   size_t xquery_size;
616   const char *xquery;
617   uint16_t size;
618
619   size = ntohs (get->header.size);
620   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
621   xquery = (const char *) &get[1];
622   GNUNET_STATISTICS_update (GDS_stats,
623                             gettext_noop
624                             ("# GET requests received from clients"), 1,
625                             GNUNET_NO);
626   LOG (GNUNET_ERROR_TYPE_DEBUG,
627        "Received GET request for %s from local client %p, xq: %.*s\n",
628        GNUNET_h2s (&get->key),
629        ch->client,
630        xquery_size,
631        xquery);
632   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
633                "CLIENT-GET %s\n",
634                GNUNET_h2s_full (&get->key));
635
636   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
637   cqr->key = get->key;
638   cqr->ch = ch;
639   cqr->xquery = (void *) &cqr[1];
640   GNUNET_memcpy (&cqr[1], xquery, xquery_size);
641   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0);
642   cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
643   cqr->retry_time = GNUNET_TIME_absolute_get ();
644   cqr->unique_id = get->unique_id;
645   cqr->xquery_size = xquery_size;
646   cqr->replication = ntohl (get->desired_replication_level);
647   cqr->msg_options = ntohl (get->options);
648   cqr->type = ntohl (get->type);
649   GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
650                                ch->cqr_tail,
651                                cqr);
652   GNUNET_CONTAINER_multihashmap_put (forward_map,
653                                      &cqr->key,
654                                      cqr,
655                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
656   GDS_CLIENTS_process_get (ntohl (get->options),
657                            ntohl (get->type),
658                            0,
659                            ntohl (get->desired_replication_level),
660                            1,
661                            GDS_NEIGHBOURS_get_id(),
662                            &get->key);
663   /* start remote requests */
664   if (NULL != retry_task)
665     GNUNET_SCHEDULER_cancel (retry_task);
666   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task,
667                                          NULL);
668   /* perform local lookup */
669   GDS_DATACACHE_handle_get (&get->key,
670                             cqr->type,
671                             cqr->xquery,
672                             xquery_size,
673                             NULL,
674                             &handle_local_result,
675                             ch);
676   GNUNET_SERVICE_client_continue (ch->client);
677 }
678
679
680 /**
681  * Closure for #find_by_unique_id().
682  */
683 struct FindByUniqueIdContext
684 {
685   /**
686    * Where to store the result, if found.
687    */
688   struct ClientQueryRecord *cqr;
689
690   uint64_t unique_id;
691 };
692
693
694 /**
695  * Function called for each existing DHT record for the given
696  * query.  Checks if it matches the UID given in the closure
697  * and if so returns the entry as a result.
698  *
699  * @param cls the search context
700  * @param key query for the lookup (not used)
701  * @param value the `struct ClientQueryRecord`
702  * @return #GNUNET_YES to continue iteration (result not yet found)
703  */
704 static int
705 find_by_unique_id (void *cls,
706                    const struct GNUNET_HashCode *key,
707                    void *value)
708 {
709   struct FindByUniqueIdContext *fui_ctx = cls;
710   struct ClientQueryRecord *cqr = value;
711
712   if (cqr->unique_id != fui_ctx->unique_id)
713     return GNUNET_YES;
714   fui_ctx->cqr = cqr;
715   return GNUNET_NO;
716 }
717
718
719 /**
720  * Check "GET result seen" messages from the client.
721  *
722  * @param cls the client we received this message from
723  * @param message the actual message received
724  * @return #GNUNET_OK if @a seen is well-formed
725  */
726 static int
727 check_dht_local_get_result_seen (void *cls,
728                                  const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
729 {
730   uint16_t size;
731   unsigned int hash_count;
732
733   size = ntohs (seen->header.size);
734   hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
735   if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode))
736   {
737     GNUNET_break (0);
738     return GNUNET_SYSERR;
739   }
740   return GNUNET_OK;
741 }
742
743
744 /**
745  * Handler for "GET result seen" messages from the client.
746  *
747  * @param cls the client we received this message from
748  * @param message the actual message received
749  */
750 static void
751 handle_dht_local_get_result_seen (void *cls,
752                                   const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
753 {
754   struct ClientHandle *ch = cls;
755   uint16_t size;
756   unsigned int hash_count;
757   unsigned int old_count;
758   const struct GNUNET_HashCode *hc;
759   struct FindByUniqueIdContext fui_ctx;
760   struct ClientQueryRecord *cqr;
761
762   size = ntohs (seen->header.size);
763   hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
764   hc = (const struct GNUNET_HashCode*) &seen[1];
765   fui_ctx.unique_id = seen->unique_id;
766   fui_ctx.cqr = NULL;
767   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
768                                               &seen->key,
769                                               &find_by_unique_id,
770                                               &fui_ctx);
771   if (NULL == (cqr = fui_ctx.cqr))
772   {
773     GNUNET_break (0);
774     GNUNET_SERVICE_client_drop (ch->client);
775     return;
776   }
777   /* finally, update 'seen' list */
778   old_count = cqr->seen_replies_count;
779   GNUNET_array_grow (cqr->seen_replies,
780                      cqr->seen_replies_count,
781                      cqr->seen_replies_count + hash_count);
782   GNUNET_memcpy (&cqr->seen_replies[old_count],
783                  hc,
784                  sizeof (struct GNUNET_HashCode) * hash_count);
785 }
786
787
788 /**
789  * Closure for #remove_by_unique_id().
790  */
791 struct RemoveByUniqueIdContext
792 {
793   /**
794    * Client that issued the removal request.
795    */
796   struct ClientHandle *ch;
797
798   /**
799    * Unique ID of the request.
800    */
801   uint64_t unique_id;
802 };
803
804
805 /**
806  * Iterator over hash map entries that frees all entries
807  * that match the given client and unique ID.
808  *
809  * @param cls unique ID and client to search for in source routes
810  * @param key current key code
811  * @param value value in the hash map, a ClientQueryRecord
812  * @return #GNUNET_YES (we should continue to iterate)
813  */
814 static int
815 remove_by_unique_id (void *cls,
816                      const struct GNUNET_HashCode *key,
817                      void *value)
818 {
819   const struct RemoveByUniqueIdContext *ctx = cls;
820   struct ClientQueryRecord *cqr = value;
821
822   if (cqr->unique_id != ctx->unique_id)
823     return GNUNET_YES;
824   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
825               "Removing client %p's record for key %s (by unique id)\n",
826               ctx->ch->client,
827               GNUNET_h2s (key));
828   remove_client_record (cqr);
829   return GNUNET_YES;
830 }
831
832
833 /**
834  * Handler for any generic DHT stop messages, calls the appropriate handler
835  * depending on message type (if processed locally)
836  *
837  * @param cls client we received this message from
838  * @param message the actual message received
839  *
840  */
841 static void
842 handle_dht_local_get_stop (void *cls,
843                            const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
844 {
845   struct ClientHandle *ch = cls;
846   struct RemoveByUniqueIdContext ctx;
847
848   GNUNET_STATISTICS_update (GDS_stats,
849                             gettext_noop
850                             ("# GET STOP requests received from clients"), 1,
851                             GNUNET_NO);
852   LOG (GNUNET_ERROR_TYPE_DEBUG,
853        "Received GET STOP request for %s from local client %p\n",
854        GNUNET_h2s (&dht_stop_msg->key),
855        ch->client);
856   ctx.ch = ch;
857   ctx.unique_id = dht_stop_msg->unique_id;
858   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
859                                               &dht_stop_msg->key,
860                                               &remove_by_unique_id,
861                                               &ctx);
862   GNUNET_SERVICE_client_continue (ch->client);
863 }
864
865
866 /**
867  * Handler for monitor start messages
868  *
869  * @param cls the client we received this message from
870  * @param msg the actual message received
871  *
872  */
873 static void
874 handle_dht_local_monitor (void *cls,
875                           const struct GNUNET_DHT_MonitorStartStopMessage *msg)
876 {
877   struct ClientHandle *ch = cls;
878   struct ClientMonitorRecord *r;
879
880   r = GNUNET_new (struct ClientMonitorRecord);
881   r->ch = ch;
882   r->type = ntohl (msg->type);
883   r->get = ntohs (msg->get);
884   r->get_resp = ntohs (msg->get_resp);
885   r->put = ntohs (msg->put);
886   if (0 == ntohs (msg->filter_key))
887   {
888     r->key = NULL;
889   }
890   else
891   {
892     r->key = GNUNET_new (struct GNUNET_HashCode);
893     GNUNET_memcpy (r->key,
894                    &msg->key,
895                    sizeof (struct GNUNET_HashCode));
896   }
897   GNUNET_CONTAINER_DLL_insert (monitor_head,
898                                monitor_tail,
899                                r);
900   GNUNET_SERVICE_client_continue (ch->client);
901 }
902
903
904 /**
905  * Handler for monitor stop messages
906  *
907  * @param cls the client we received this message from
908  * @param msg the actual message received
909  */
910 static void
911 handle_dht_local_monitor_stop (void *cls,
912                                const struct GNUNET_DHT_MonitorStartStopMessage *msg)
913 {
914   struct ClientHandle *ch = cls;
915   struct ClientMonitorRecord *r;
916   int keys_match;
917
918   GNUNET_SERVICE_client_continue (ch->client);
919   for (r = monitor_head; NULL != r; r = r->next)
920   {
921     if (NULL == r->key)
922     {
923       keys_match = (0 == ntohs(msg->filter_key));
924     }
925     else
926     {
927       keys_match = ( (0 != ntohs(msg->filter_key)) &&
928                      (! memcmp (r->key,
929                                 &msg->key,
930                                 sizeof(struct GNUNET_HashCode))) );
931     }
932     if ( (ch == r->ch) &&
933          (ntohl(msg->type) == r->type) &&
934          (r->get == msg->get) &&
935          (r->get_resp == msg->get_resp) &&
936          (r->put == msg->put) &&
937          keys_match )
938     {
939       GNUNET_CONTAINER_DLL_remove (monitor_head,
940                                    monitor_tail,
941                                    r);
942       GNUNET_free_non_null (r->key);
943       GNUNET_free (r);
944       return; /* Delete only ONE entry */
945     }
946   }
947 }
948
949
950 /**
951  * Closure for #forward_reply()
952  */
953 struct ForwardReplyContext
954 {
955
956   /**
957    * Expiration time of the reply.
958    */
959   struct GNUNET_TIME_Absolute expiration;
960
961   /**
962    * GET path taken.
963    */
964   const struct GNUNET_PeerIdentity *get_path;
965
966   /**
967    * PUT path taken.
968    */
969   const struct GNUNET_PeerIdentity *put_path;
970
971   /**
972    * Embedded payload.
973    */
974   const void *data;
975
976   /**
977    * Number of bytes in data.
978    */
979   size_t data_size;
980
981   /**
982    * Number of entries in @e get_path.
983    */
984   unsigned int get_path_length;
985
986   /**
987    * Number of entries in @e put_path.
988    */
989   unsigned int put_path_length;
990
991   /**
992    * Type of the data.
993    */
994   enum GNUNET_BLOCK_Type type;
995
996 };
997
998
999 /**
1000  * Iterator over hash map entries that send a given reply to
1001  * each of the matching clients.  With some tricky recycling
1002  * of the buffer.
1003  *
1004  * @param cls the 'struct ForwardReplyContext'
1005  * @param key current key
1006  * @param value value in the hash map, a ClientQueryRecord
1007  * @return #GNUNET_YES (we should continue to iterate),
1008  *         if the result is mal-formed, #GNUNET_NO
1009  */
1010 static int
1011 forward_reply (void *cls,
1012                const struct GNUNET_HashCode *key,
1013                void *value)
1014 {
1015   struct ForwardReplyContext *frc = cls;
1016   struct ClientQueryRecord *record = value;
1017   struct GNUNET_MQ_Envelope *env;
1018   struct GNUNET_DHT_ClientResultMessage *reply;
1019   enum GNUNET_BLOCK_EvaluationResult eval;
1020   int do_free;
1021   struct GNUNET_HashCode ch;
1022   struct GNUNET_PeerIdentity *paths;
1023
1024   LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1025                "CLIENT-RESULT %s\n",
1026                GNUNET_h2s_full (key));
1027   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
1028        (record->type != frc->type))
1029   {
1030     LOG (GNUNET_ERROR_TYPE_DEBUG,
1031          "Record type missmatch, not passing request for key %s to local client\n",
1032          GNUNET_h2s (key));
1033     GNUNET_STATISTICS_update (GDS_stats,
1034                               gettext_noop
1035                               ("# Key match, type mismatches in REPLY to CLIENT"),
1036                               1, GNUNET_NO);
1037     return GNUNET_YES;          /* type mismatch */
1038   }
1039   GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch);
1040   for (unsigned int i = 0; i < record->seen_replies_count; i++)
1041     if (0 == memcmp (&record->seen_replies[i],
1042                      &ch,
1043                      sizeof (struct GNUNET_HashCode)))
1044     {
1045       LOG (GNUNET_ERROR_TYPE_DEBUG,
1046            "Duplicate reply, not passing request for key %s to local client\n",
1047            GNUNET_h2s (key));
1048       GNUNET_STATISTICS_update (GDS_stats,
1049                                 gettext_noop
1050                                 ("# Duplicate REPLIES to CLIENT request dropped"),
1051                                 1, GNUNET_NO);
1052       return GNUNET_YES;        /* duplicate */
1053     }
1054   eval
1055     = GNUNET_BLOCK_evaluate (GDS_block_context,
1056                              record->type,
1057                              NULL,
1058                              GNUNET_BLOCK_EO_NONE,
1059                              key,
1060                              record->xquery,
1061                              record->xquery_size,
1062                              frc->data,
1063                              frc->data_size);
1064   LOG (GNUNET_ERROR_TYPE_DEBUG,
1065        "Evaluation result is %d for key %s for local client's query\n",
1066        (int) eval,
1067        GNUNET_h2s (key));
1068   switch (eval)
1069   {
1070   case GNUNET_BLOCK_EVALUATION_OK_LAST:
1071     do_free = GNUNET_YES;
1072     break;
1073   case GNUNET_BLOCK_EVALUATION_OK_MORE:
1074     GNUNET_array_append (record->seen_replies,
1075                          record->seen_replies_count,
1076                          ch);
1077     do_free = GNUNET_NO;
1078     break;
1079   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1080     /* should be impossible to encounter here */
1081     GNUNET_break (0);
1082     return GNUNET_YES;
1083   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1084     GNUNET_break_op (0);
1085     return GNUNET_NO;
1086   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1087     GNUNET_break (0);
1088     return GNUNET_NO;
1089   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1090     GNUNET_break (0);
1091     return GNUNET_NO;
1092   case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1093     return GNUNET_YES;
1094   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1095     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1096                 _("Unsupported block type (%u) in request!\n"), record->type);
1097     return GNUNET_NO;
1098   default:
1099     GNUNET_break (0);
1100     return GNUNET_NO;
1101   }
1102   GNUNET_STATISTICS_update (GDS_stats,
1103                             gettext_noop ("# RESULTS queued for clients"),
1104                             1,
1105                             GNUNET_NO);
1106   env = GNUNET_MQ_msg_extra (reply,
1107                              frc->data_size +
1108                              (frc->get_path_length + frc->put_path_length) * sizeof (struct GNUNET_PeerIdentity),
1109                              GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1110   reply->type = htonl (frc->type);
1111   reply->get_path_length = htonl (frc->get_path_length);
1112   reply->put_path_length = htonl (frc->put_path_length);
1113   reply->unique_id = record->unique_id;
1114   reply->expiration = GNUNET_TIME_absolute_hton (frc->expiration);
1115   reply->key = *key;
1116   paths = (struct GNUNET_PeerIdentity *) &reply[1];
1117   GNUNET_memcpy (paths,
1118                  frc->put_path,
1119                  sizeof (struct GNUNET_PeerIdentity) * frc->put_path_length);
1120   GNUNET_memcpy (&paths[frc->put_path_length],
1121                  frc->get_path,
1122                  sizeof (struct GNUNET_PeerIdentity) * frc->get_path_length);
1123   GNUNET_memcpy (&paths[frc->get_path_length + frc->put_path_length],
1124                  frc->data,
1125                  frc->data_size);
1126   LOG (GNUNET_ERROR_TYPE_DEBUG,
1127        "Sending reply to query %s for client %p\n",
1128        GNUNET_h2s (key),
1129        record->ch->client);
1130   GNUNET_MQ_send (record->ch->mq,
1131                   env);
1132   if (GNUNET_YES == do_free)
1133     remove_client_record (record);
1134   return GNUNET_YES;
1135 }
1136
1137
1138 /**
1139  * Handle a reply we've received from another peer.  If the reply
1140  * matches any of our pending queries, forward it to the respective
1141  * client(s).
1142  *
1143  * @param expiration when will the reply expire
1144  * @param key the query this reply is for
1145  * @param get_path_length number of peers in @a get_path
1146  * @param get_path path the reply took on get
1147  * @param put_path_length number of peers in @a put_path
1148  * @param put_path path the reply took on put
1149  * @param type type of the reply
1150  * @param data_size number of bytes in @a data
1151  * @param data application payload data
1152  */
1153 void
1154 GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
1155                           const struct GNUNET_HashCode *key,
1156                           unsigned int get_path_length,
1157                           const struct GNUNET_PeerIdentity *get_path,
1158                           unsigned int put_path_length,
1159                           const struct GNUNET_PeerIdentity *put_path,
1160                           enum GNUNET_BLOCK_Type type,
1161                           size_t data_size,
1162                           const void *data)
1163 {
1164   struct ForwardReplyContext frc;
1165   size_t msize;
1166
1167   msize = sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size +
1168     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1169   if (msize >= GNUNET_MAX_MESSAGE_SIZE)
1170   {
1171     GNUNET_break (0);
1172     return;
1173   }
1174   if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map,
1175                                                  key))
1176   {
1177     LOG (GNUNET_ERROR_TYPE_DEBUG,
1178          "No matching client for reply for key %s\n",
1179          GNUNET_h2s (key));
1180     GNUNET_STATISTICS_update (GDS_stats,
1181                               gettext_noop ("# REPLIES ignored for CLIENTS (no match)"),
1182                               1,
1183                               GNUNET_NO);
1184     return;                     /* no matching request, fast exit! */
1185   }
1186   frc.expiration = expiration;
1187   frc.get_path = get_path;
1188   frc.put_path = put_path;
1189   frc.data = data;
1190   frc.data_size = data_size;
1191   frc.get_path_length = get_path_length;
1192   frc.put_path_length = put_path_length;
1193   frc.type = type;
1194   LOG (GNUNET_ERROR_TYPE_DEBUG,
1195        "Forwarding reply for key %s to client\n",
1196        GNUNET_h2s (key));
1197   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1198                                               key,
1199                                               &forward_reply,
1200                                               &frc);
1201
1202 }
1203
1204
1205 /**
1206  * Check if some client is monitoring GET messages and notify
1207  * them in that case.
1208  *
1209  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1210  * @param type The type of data in the request.
1211  * @param hop_count Hop count so far.
1212  * @param path_length number of entries in path (or 0 if not recorded).
1213  * @param path peers on the GET path (or NULL if not recorded).
1214  * @param desired_replication_level Desired replication level.
1215  * @param key Key of the requested data.
1216  */
1217 void
1218 GDS_CLIENTS_process_get (uint32_t options,
1219                          enum GNUNET_BLOCK_Type type,
1220                          uint32_t hop_count,
1221                          uint32_t desired_replication_level,
1222                          unsigned int path_length,
1223                          const struct GNUNET_PeerIdentity *path,
1224                          const struct GNUNET_HashCode * key)
1225 {
1226   struct ClientMonitorRecord *m;
1227   struct ClientHandle **cl;
1228   unsigned int cl_size;
1229
1230   cl = NULL;
1231   cl_size = 0;
1232   for (m = monitor_head; NULL != m; m = m->next)
1233   {
1234     if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1235            (m->type == type) ) &&
1236          ( (NULL == m->key) ||
1237            (0 == memcmp (key,
1238                          m->key,
1239                          sizeof(struct GNUNET_HashCode))) ) )
1240     {
1241       struct GNUNET_MQ_Envelope *env;
1242       struct GNUNET_DHT_MonitorGetMessage *mmsg;
1243       struct GNUNET_PeerIdentity *msg_path;
1244       size_t msize;
1245       unsigned int i;
1246
1247       /* Don't send duplicates */
1248       for (i = 0; i < cl_size; i++)
1249         if (cl[i] == m->ch)
1250           break;
1251       if (i < cl_size)
1252         continue;
1253       GNUNET_array_append (cl,
1254                            cl_size,
1255                            m->ch);
1256
1257       msize = path_length * sizeof (struct GNUNET_PeerIdentity);
1258       env = GNUNET_MQ_msg_extra (mmsg,
1259                                  msize,
1260                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1261       mmsg->options = htonl(options);
1262       mmsg->type = htonl(type);
1263       mmsg->hop_count = htonl(hop_count);
1264       mmsg->desired_replication_level = htonl(desired_replication_level);
1265       mmsg->get_path_length = htonl(path_length);
1266       mmsg->key = *key;
1267       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1268       GNUNET_memcpy (msg_path,
1269                      path,
1270                      path_length * sizeof (struct GNUNET_PeerIdentity));
1271       GNUNET_MQ_send (m->ch->mq,
1272                       env);
1273     }
1274   }
1275   GNUNET_free_non_null (cl);
1276 }
1277
1278
1279 /**
1280  * Check if some client is monitoring GET RESP messages and notify
1281  * them in that case.
1282  *
1283  * @param type The type of data in the result.
1284  * @param get_path Peers on GET path (or NULL if not recorded).
1285  * @param get_path_length number of entries in get_path.
1286  * @param put_path peers on the PUT path (or NULL if not recorded).
1287  * @param put_path_length number of entries in get_path.
1288  * @param exp Expiration time of the data.
1289  * @param key Key of the data.
1290  * @param data Pointer to the result data.
1291  * @param size Number of bytes in @a data.
1292  */
1293 void
1294 GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1295                               const struct GNUNET_PeerIdentity *get_path,
1296                               unsigned int get_path_length,
1297                               const struct GNUNET_PeerIdentity *put_path,
1298                               unsigned int put_path_length,
1299                               struct GNUNET_TIME_Absolute exp,
1300                               const struct GNUNET_HashCode * key,
1301                               const void *data,
1302                               size_t size)
1303 {
1304   struct ClientMonitorRecord *m;
1305   struct ClientHandle **cl;
1306   unsigned int cl_size;
1307
1308   cl = NULL;
1309   cl_size = 0;
1310   for (m = monitor_head; NULL != m; m = m->next)
1311   {
1312     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1313         (NULL == m->key ||
1314          memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1315     {
1316       struct GNUNET_MQ_Envelope *env;
1317       struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1318       struct GNUNET_PeerIdentity *path;
1319       size_t msize;
1320       unsigned int i;
1321
1322       /* Don't send duplicates */
1323       for (i = 0; i < cl_size; i++)
1324         if (cl[i] == m->ch)
1325           break;
1326       if (i < cl_size)
1327         continue;
1328       GNUNET_array_append (cl,
1329                            cl_size,
1330                            m->ch);
1331
1332       msize = size;
1333       msize += (get_path_length + put_path_length)
1334                * sizeof (struct GNUNET_PeerIdentity);
1335       env = GNUNET_MQ_msg_extra (mmsg,
1336                                  msize,
1337                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1338       mmsg->type = htonl(type);
1339       mmsg->put_path_length = htonl(put_path_length);
1340       mmsg->get_path_length = htonl(get_path_length);
1341       mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1342       mmsg->key = *key;
1343       path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1344       GNUNET_memcpy (path,
1345                      put_path,
1346                      put_path_length * sizeof (struct GNUNET_PeerIdentity));
1347       GNUNET_memcpy (path,
1348                      get_path,
1349                      get_path_length * sizeof (struct GNUNET_PeerIdentity));
1350       GNUNET_memcpy (&path[get_path_length],
1351                      data,
1352                      size);
1353       GNUNET_MQ_send (m->ch->mq,
1354                       env);
1355     }
1356   }
1357   GNUNET_free_non_null (cl);
1358 }
1359
1360
1361 /**
1362  * Check if some client is monitoring PUT messages and notify
1363  * them in that case.
1364  *
1365  * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1366  * @param type The type of data in the request.
1367  * @param hop_count Hop count so far.
1368  * @param path_length number of entries in path (or 0 if not recorded).
1369  * @param path peers on the PUT path (or NULL if not recorded).
1370  * @param desired_replication_level Desired replication level.
1371  * @param exp Expiration time of the data.
1372  * @param key Key under which data is to be stored.
1373  * @param data Pointer to the data carried.
1374  * @param size Number of bytes in data.
1375  */
1376 void
1377 GDS_CLIENTS_process_put (uint32_t options,
1378                          enum GNUNET_BLOCK_Type type,
1379                          uint32_t hop_count,
1380                          uint32_t desired_replication_level,
1381                          unsigned int path_length,
1382                          const struct GNUNET_PeerIdentity *path,
1383                          struct GNUNET_TIME_Absolute exp,
1384                          const struct GNUNET_HashCode *key,
1385                          const void *data,
1386                          size_t size)
1387 {
1388   struct ClientMonitorRecord *m;
1389   struct ClientHandle **cl;
1390   unsigned int cl_size;
1391
1392   cl = NULL;
1393   cl_size = 0;
1394   for (m = monitor_head; NULL != m; m = m->next)
1395   {
1396     if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1397         (NULL == m->key ||
1398          memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1399     {
1400       struct GNUNET_MQ_Envelope *env;
1401       struct GNUNET_DHT_MonitorPutMessage *mmsg;
1402       struct GNUNET_PeerIdentity *msg_path;
1403       size_t msize;
1404       unsigned int i;
1405
1406       /* Don't send duplicates */
1407       for (i = 0; i < cl_size; i++)
1408         if (cl[i] == m->ch)
1409           break;
1410       if (i < cl_size)
1411         continue;
1412       GNUNET_array_append (cl,
1413                            cl_size,
1414                            m->ch);
1415
1416       msize = size;
1417       msize += path_length * sizeof (struct GNUNET_PeerIdentity);
1418       env = GNUNET_MQ_msg_extra (mmsg,
1419                                  msize,
1420                                  GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1421       mmsg->options = htonl(options);
1422       mmsg->type = htonl(type);
1423       mmsg->hop_count = htonl(hop_count);
1424       mmsg->desired_replication_level = htonl (desired_replication_level);
1425       mmsg->put_path_length = htonl (path_length);
1426       mmsg->key = *key;
1427       mmsg->expiration_time = GNUNET_TIME_absolute_hton (exp);
1428       msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1429       GNUNET_memcpy (msg_path,
1430                      path,
1431                      path_length * sizeof (struct GNUNET_PeerIdentity));
1432       GNUNET_memcpy (&msg_path[path_length],
1433                      data,
1434                      size);
1435       GNUNET_MQ_send (m->ch->mq,
1436                       env);
1437     }
1438   }
1439   GNUNET_free_non_null (cl);
1440 }
1441
1442
1443 /**
1444  * Initialize client subsystem.
1445  *
1446  * @param server the initialized server
1447  */
1448 static void
1449 GDS_CLIENTS_init ()
1450 {
1451   forward_map
1452     = GNUNET_CONTAINER_multihashmap_create (1024,
1453                                             GNUNET_YES);
1454   retry_heap
1455     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1456 }
1457
1458
1459 /**
1460  * Shutdown client subsystem.
1461  */
1462 static void
1463 GDS_CLIENTS_stop ()
1464 {
1465   if (NULL != retry_task)
1466   {
1467     GNUNET_SCHEDULER_cancel (retry_task);
1468     retry_task = NULL;
1469   }
1470 }
1471
1472
1473 /**
1474  * Define "main" method using service macro.
1475  *
1476  * @param name name of the service, i.e. "dht" or "xdht"
1477  * @param run name of the initializaton method for the service
1478  */
1479 #define GDS_DHT_SERVICE_INIT(name,run)          \
1480  GNUNET_SERVICE_MAIN \
1481   (name, \
1482   GNUNET_SERVICE_OPTION_NONE, \
1483   run, \
1484   &client_connect_cb, \
1485   &client_disconnect_cb, \
1486   NULL, \
1487   GNUNET_MQ_hd_var_size (dht_local_put, \
1488                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1489                          struct GNUNET_DHT_ClientPutMessage, \
1490                          NULL), \
1491   GNUNET_MQ_hd_var_size (dht_local_get, \
1492                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1493                          struct GNUNET_DHT_ClientGetMessage, \
1494                          NULL), \
1495   GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \
1496                           GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1497                           struct GNUNET_DHT_ClientGetStopMessage, \
1498                           NULL), \
1499   GNUNET_MQ_hd_fixed_size (dht_local_monitor, \
1500                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1501                            struct GNUNET_DHT_MonitorStartStopMessage, \
1502                            NULL), \
1503   GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \
1504                            GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1505                            struct GNUNET_DHT_MonitorStartStopMessage, \
1506                            NULL), \
1507   GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \
1508                          GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1509                          struct GNUNET_DHT_ClientGetResultSeenMessage , \
1510                          NULL), \
1511   GNUNET_MQ_handler_end ())
1512
1513
1514 /**
1515  * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1516  */
1517 void __attribute__ ((destructor))
1518 GDS_CLIENTS_done ()
1519 {
1520   if (NULL != retry_heap)
1521   {
1522     GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1523     GNUNET_CONTAINER_heap_destroy (retry_heap);
1524     retry_heap = NULL;
1525   }
1526   if (NULL != forward_map)
1527   {
1528     GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
1529     GNUNET_CONTAINER_multihashmap_destroy (forward_map);
1530     forward_map = NULL;
1531   }
1532 }
1533
1534 /* end of gnunet-service-dht_clients.c */