75506534bbd52f4e7b84cca54eabcb60cb9cf1c1
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_datacache_lib.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_statistics_service.h"
39 #include "dht_new.h"
40 #include <fenv.h>
41 #include "gnunet-service-dht_clients.h"
42 #include "gnunet-service-dht_neighbours.h"
43
44
45 /**
46  * Linked list of messages to send to clients.
47  */
48 struct PendingMessage
49 {
50   /**
51    * Pointer to next item in the list
52    */
53   struct PendingMessage *next;
54
55   /**
56    * Pointer to previous item in the list
57    */
58   struct PendingMessage *prev;
59
60   /**
61    * Actual message to be sent, allocated at the end of the struct:
62    * // msg = (cast) &pm[1]; 
63    * // memcpy (&pm[1], data, len);
64    */
65   const struct GNUNET_MessageHeader *msg;
66
67 };
68
69
70 /**
71  * Struct containing information about a client,
72  * handle to connect to it, and any pending messages
73  * that need to be sent to it.
74  */
75 struct ClientList
76 {
77   /**
78    * Linked list of active clients
79    */
80   struct ClientList *next;
81
82   /**
83    * Linked list of active clients
84    */
85   struct ClientList *prev;
86
87   /**
88    * The handle to this client
89    */
90   struct GNUNET_SERVER_Client *client_handle;
91
92   /**
93    * Handle to the current transmission request, NULL
94    * if none pending.
95    */
96   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
97
98   /**
99    * Linked list of pending messages for this client
100    */
101   struct PendingMessage *pending_head;
102
103   /**
104    * Tail of linked list of pending messages for this client
105    */
106   struct PendingMessage *pending_tail;
107
108 };
109
110
111 /**
112  * Entry in the DHT routing table for a client's GET request.
113  */
114 struct ClientQueryRecord
115 {
116
117   /**
118    * The key this request was about
119    */
120   GNUNET_HashCode key;
121
122   /**
123    * Client responsible for the request.
124    */
125   struct ClientList *client;
126
127   /**
128    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
129    */
130   const void *xquery;
131
132   /**
133    * Replies we have already seen for this request.
134    */
135   GNUNET_HashCode *seen_replies;
136
137   /**
138    * Pointer to this nodes heap location in the retry-heap (for fast removal)
139    */
140   struct GNUNET_CONTAINER_HeapNode *hnode;
141
142   /**
143    * What's the delay between re-try operations that we currently use for this
144    * request?
145    */
146   struct GNUNET_TIME_Relative retry_frequency;
147
148   /**
149    * What's the next time we should re-try this request?
150    */
151   struct GNUNET_TIME_Absolute retry_time;
152
153   /**
154    * The unique identifier of this request
155    */
156   uint64_t unique_id;
157
158   /**
159    * Number of bytes in xquery.
160    */
161   size_t xquery_size;
162
163   /**
164    * Number of entries in 'seen_replies'.
165    */
166   unsigned int seen_replies_count;
167
168   /**
169    * Desired replication level
170    */
171   uint32_t replication;
172
173   /**
174    * Any message options for this request
175    */
176   uint32_t msg_options;
177
178   /**
179    * The type for the data for the GET request; actually an 'enum
180    * GNUNET_BLOCK_Type'.
181    */
182   uint32_t msg_type;
183
184 };
185
186
187 /**
188  * List of active clients.
189  */
190 static struct ClientList *client_head;
191
192 /**
193  * List of active clients.
194  */
195 static struct ClientList *client_tail;
196
197 /**
198  * Hashmap for fast key based lookup, maps keys to 'struct ClientQueryRecord' entries.
199  */
200 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
201
202 /**
203  * Heap with all of our client's request, sorted by retry time (earliest on top).
204  */
205 static struct GNUNET_CONTAINER_Heap *retry_heap;
206
207 /**
208  * Task that re-transmits requests (using retry_heap).
209  */
210 static GNUNET_SCHEDULER_TaskIdentifier retry_task;
211
212
213 /**
214  * Find a client if it exists, add it otherwise.
215  *
216  * @param client the server handle to the client
217  *
218  * @return the client if found, a new client otherwise
219  */
220 static struct ClientList *
221 find_active_client (struct GNUNET_SERVER_Client *client)
222 {
223   struct ClientList *pos = client_list;
224   struct ClientList *ret;
225
226   while (pos != NULL)
227   {
228     if (pos->client_handle == client)
229       return pos;
230     pos = pos->next;
231   }
232   ret = GNUNET_malloc (sizeof (struct ClientList));
233   ret->client_handle = client;
234   GNUNET_CONTAINER_DLL_insert (client_head,
235                                client_tail,
236                                ret);
237   return ret;
238 }
239
240
241 /**
242  * Iterator over hash map entries that frees all entries 
243  * associated with the given client.
244  *
245  * @param cls client to search for in source routes
246  * @param key current key code (ignored)
247  * @param value value in the hash map, a ClientQueryRecord
248  * @return GNUNET_YES (we should continue to iterate)
249  */
250 static int
251 remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
252 {
253   struct ClientList *client = cls;
254   struct ClientQueryRecord *record = value;
255
256   if (record->client != client)
257     return GNUNET_YES;
258   GNUNET_assert (GNUNET_YES ==
259                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
260                                                        key, record));
261   GNUNET_CONTAINER_heap_remove_node (record->hnode);
262   GNUNET_ARRAY_append (record->seen_replies,
263                        record->seen_replies_count,
264                        0);
265   GNUNET_free (record);
266   return GNUNET_YES;
267 }
268
269
270 /**
271  * Functions with this signature are called whenever a client
272  * is disconnected on the network level.
273  *
274  * @param cls closure (NULL for dht)
275  * @param client identification of the client; NULL
276  *        for the last call when the server is destroyed
277  */
278 static void
279 handle_client_disconnect (void *cls, 
280                           struct GNUNET_SERVER_Client *client)
281 {
282   struct ClientList *pos = client_list;
283   struct ClientList *found;
284   struct PendingMessage *reply;
285
286   found = NULL;
287   while (pos != NULL)
288   {
289     if (pos->client_handle == client)
290     {
291       GNUNET_CONTAINER_DLL_remove (client_head,
292                                    client_tail,
293                                    pos);
294       found = pos;
295       break;
296     }
297     pos = pos->next;
298   }
299   if (found == NULL)
300     return;
301   if (found->transmit_handle != NULL)
302     GNUNET_CONNECTION_notify_transmit_ready_cancel (found->transmit_handle);
303   while (NULL != (reply = found->pending_head))
304     {
305       GNUNET_CONTAINER_DLL_remove (found->pending_head, found->pending_tail,
306                                    reply);
307       GNUNET_free (reply);
308     }
309   GNUNET_CONTAINER_multihashmap_iterate (forward_list.hashmap,
310                                          &remove_client_records, found);
311   GNUNET_free (found);
312 }
313
314
315 /**
316  * Route the given request via the DHT.  This includes updating 
317  * the bloom filter and retransmission times, building the P2P
318  * message and initiating the routing operation.
319  */
320 static void
321 transmit_request (struct ClientQueryRecord *cqr)
322 {
323   int32_t reply_bf_mutator;
324
325   reply_bf_mutator = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
326                                                          UINT32_MAX);
327   reply_bf = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
328                                                  cqr->seen_replies,
329                                                  cqr->seen_replies_count);
330   GST_NEIGHBOURS_handle_get (cqr->msg_type,
331                              cqr->msg_options,
332                              cqr->replication,
333                              &cqr->key,
334                              cqr->xquery,
335                              cqr->xquery_size,
336                              reply_bf,
337                              reply_bf_mutator,
338                              NULL /* no peers blocked initially */);
339   GNUNET_CONTAINER_bloomfilter_destroy (reply_bf);
340
341   /* exponential back-off for retries, max 1h */
342   cqr->retry_frequency = 
343     GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_HOURS,
344                               GNUNET_TIME_relative_multiply (cqr->retry_frequency, 2));
345   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
346 }
347
348
349 /**
350  * Task that looks at the 'retry_heap' and transmits all of the requests
351  * on the heap that are ready for transmission.  Then re-schedules
352  * itself (unless the heap is empty).
353  *
354  * @param cls unused
355  * @param tc scheduler context
356  */
357 static void
358 transmit_next_request_task (void *cls,
359                             const struct GNUNET_SCHEDULER_TaskContext *tc)
360 {
361   struct ClientQueryRecord *cqr;
362   struct GNUNET_TIME_Relative delay;
363
364   retry_task = GNUNET_SCHEDULER_NO_TASK;
365   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
366     return;
367   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
368     {
369       cqr->hnode = NULL;
370       delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
371       if (delay.value > 0)
372         {
373           cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
374                                                      cqr->retry_time.abs_value);
375           retry_task = GNUNET_SCHEDULER_add_delayed (delay,
376                                                      &transmit_next_request_task,
377                                                      NULL);
378           return;
379         }
380       transmit_request (cqr);
381     }
382 }
383
384
385 /**
386  * Handler for PUT messages.
387  *
388  * @param cls closure for the service
389  * @param client the client we received this message from
390  * @param message the actual message received
391  */
392 static void
393 handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
394                       const struct GNUNET_MessageHeader *message)
395 {
396   const struct GNUNET_DHT_ClientPutMessage *dht_msg;
397   uint16_t size;
398   
399   size = ntohs (message->size);
400   if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
401     {
402       GNUNET_break (0);
403       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
404     }
405   dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
406   GST_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
407                              ntohl (dht_msg->options),
408                              ntohl (dht_msg->desired_replication_level),
409                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
410                              &dht_msg->key,
411                              &dht_msg[1],
412                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
413   GNUNET_SERVER_receive_done (client, GNUNET_OK);
414 }
415
416
417 /**
418  * Handler for any generic DHT messages, calls the appropriate handler
419  * depending on message type, sends confirmation if responses aren't otherwise
420  * expected.
421  *
422  * @param cls closure for the service
423  * @param client the client we received this message from
424  * @param message the actual message received
425  */
426 static void
427 handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
428                       const struct GNUNET_MessageHeader *message)
429 {
430   const struct GNUNET_DHT_ClientGetMessage *get;
431   const struct GNUNET_MessageHeader *enc_msg;
432
433   struct ClientQueryRecord *cqr;
434   size_t xquery_size;
435   const char* xquery;
436   uint16_t size;
437
438   size = ntohs (message->size);
439   if (size < sizeof (struct GNUNET_DHT_ClientGetMessage))
440     {
441       GNUNET_break (0);
442       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
443       return;
444     }
445   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
446   get = (const struct GNUNET_DHT_ClientGetMessage *) message;
447   xquery = (const char*) &get[1];
448
449   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
450   cqr->key = get->key;
451   cqr->client = find_active_client (client);
452   cqr->xquery = (void*) &cqr[1];
453   memcpy (&cqr[1], xquery, xquery_size);
454   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); 
455   cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS;
456   cqr->retry_time = GNUNET_TIME_absolute_get ();
457   cqr->unique_id = get->unique_id;
458   cqr->xquery_size = xquery_size;
459   cqr->replication = ntohl (get->desired_replication_level);
460   cqr->msg_options = ntohl (get->options);
461   cqr->msg_type = ntohl (get->type);
462   GNUNET_CONTAINER_multihashmap_put (forward_map, KEY, cqr,
463                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
464   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
465     GNUNET_SCHEDULER_cancel (retry_task);
466   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
467   GNUNET_SERVER_receive_done (client, GNUNET_OK);
468 }
469
470
471 /**
472  * Closure for 'remove_by_uid'.
473  */
474 struct RemoveByUidContext
475 {
476   /**
477    * Client that issued the removal request.
478    */
479   struct ClientList *client;
480
481   /**
482    * Unique ID of the request.
483    */
484   uint64_t uid;
485 };
486
487
488 /**
489  * Iterator over hash map entries that frees all entries 
490  * that match the given client and UID.
491  *
492  * @param cls UID and client to search for in source routes
493  * @param key current key code
494  * @param value value in the hash map, a ClientQueryRecord
495  * @return GNUNET_YES (we should continue to iterate)
496  */
497 static int
498 remove_by_uid (void *cls, const GNUNET_HashCode * key, void *value)
499 {
500   const struct RemoveByUidContext *ctx = cls;
501   struct ClientQueryRecord *record = value;
502
503   if (record->uid != ctx->uid)
504     return GNUNET_YES;
505   return remove_client_records (ctx->client, key, record);
506 }
507
508
509 /**
510  * Handler for any generic DHT stop messages, calls the appropriate handler
511  * depending on message type (if processed locally)
512  *
513  * @param cls closure for the service
514  * @param client the client we received this message from
515  * @param message the actual message received
516  *
517  */
518 static void
519 handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
520                            const struct GNUNET_MessageHeader *message)
521 {
522   const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
523     (const struct GNUNET_DHT_ClientGetStopMessage *) message;
524   
525   ctx.client = find_active_client (client);
526   ctx.uid = &dht_stop_msg.unique_id);
527   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
528                                               &dht_stop_msg->key,
529                                               &remove_by_uid,
530                                               &ctx);
531   GNUNET_SERVER_receive_done (client, GNUNET_OK);
532 }
533
534
535 /**
536  * Task run to check for messages that need to be sent to a client.
537  *
538  * @param client a ClientList, containing the client and any messages to be sent to it
539  */
540 static void
541 process_pending_messages (struct ClientList *client);
542
543
544 /**
545  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
546  * request.  A ClientList is passed as closure, take the head of the list
547  * and copy it into buf, which has the result of sending the message to the
548  * client.
549  *
550  * @param cls closure to this call
551  * @param size maximum number of bytes available to send
552  * @param buf where to copy the actual message to
553  *
554  * @return the number of bytes actually copied, 0 indicates failure
555  */
556 static size_t
557 send_reply_to_client (void *cls, size_t size, void *buf)
558 {
559   struct ClientList *client = cls;
560   char *cbuf = buf;
561   struct PendingMessage *reply;
562   size_t off;
563   size_t msize;
564
565   client->transmit_handle = NULL;
566   if (buf == NULL)
567   {
568     /* client disconnected */
569     return 0;
570   }
571   off = 0;
572   while ((NULL != (reply = client->pending_head)) &&
573          (size >= off + (msize = ntohs (reply->msg->size))))
574   {
575     GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
576                                  reply);
577     memcpy (&cbuf[off], reply->msg, msize);
578     GNUNET_free (reply);
579     off += msize;
580   }
581   process_pending_messages (client);
582   return off;
583 }
584
585
586 /**
587  * Task run to check for messages that need to be sent to a client.
588  *
589  * @param client a ClientList, containing the client and any messages to be sent to it
590  */
591 static void
592 process_pending_messages (struct ClientList *client)
593 {
594   if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
595     return;
596   client->transmit_handle =
597       GNUNET_SERVER_notify_transmit_ready (client->client_handle,
598                                            ntohs (client->pending_head->
599                                                   msg->size),
600                                            GNUNET_TIME_UNIT_FOREVER_REL,
601                                            &send_reply_to_client, client);
602 }
603
604
605 /**
606  * Add a PendingMessage to the clients list of messages to be sent
607  *
608  * @param client the active client to send the message to
609  * @param pending_message the actual message to send
610  */
611 static void
612 add_pending_message (struct ClientList *client,
613                      struct PendingMessage *pending_message)
614 {
615   GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
616                                     pending_message);
617   process_pending_messages (client);
618 }
619
620
621 /**
622  * Closure for 'forward_reply'
623  */
624 struct ForwardReplyContext
625 {
626
627   /**
628    * Actual message to send to matching clients. 
629    */
630   struct PendingMessage *pm;
631
632   /**
633    * Embedded payload.
634    */
635   const void *data;
636
637   /**
638    * Type of the data.
639    */
640   uint32_t type;
641
642   /**
643    * Number of bytes in data.
644    */
645   size_t data_size;
646
647   /**
648    * Do we need to copy 'pm' because it was already used?
649    */
650   int do_copy;
651
652 };
653
654
655 /**
656  * Iterator over hash map entries that send a given reply to
657  * each of the matching clients.  With some tricky recycling
658  * of the buffer.
659  *
660  * @param cls the 'struct ForwardReplyContext'
661  * @param key current key
662  * @param value value in the hash map, a ClientQueryRecord
663  * @return GNUNET_YES (we should continue to iterate),
664  *         if the result is mal-formed, GNUNET_NO
665  */
666 static int
667 forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
668 {
669   struct ForwardReplyContext *frc = cls;
670   struct ClientQueryRecord *record = value;
671   struct PendingMessage *pm;
672   struct ReplyMessage *reply;
673   enum GNUNET_BLOCK_EvaluationResult eval;
674   int do_free;
675   GNUNET_HashCode ch;
676   unsigned int i;
677   
678   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
679        (record->type != frc->type) )
680     return GNUNET_YES; /* type mismatch */
681   GNUNET_CRYPTO_hash (frc->data,
682                       frc->data_size,
683                       &ch);
684   for (i=0;i<record->seen_replies_count;i++)
685     if (0 == memcmp (&record->seen_replies[i],
686                      &ch,
687                      sizeof (GNUNET_HashCode)))
688       return GNUNET_YES; /* duplicate */             
689   eval =
690     GNUNET_BLOCK_evaluate (GDS_block_context, 
691                            record->type, key, 
692                            NULL, 0,
693                            record->xquery,
694                            record->xquery_size, 
695                            frc->data,
696                            frc->data_size);
697   switch (eval)
698   {
699   case GNUNET_BLOCK_EVALUATION_OK_LAST:
700     do_free = GNUNET_YES;
701     break;
702   case GNUNET_BLOCK_EVALUATION_OK_MORE:
703     GNUNET_ARRAY_append (record->seen_replies,
704                          record->seen_replies_count,
705                          ch);
706     do_free = GNUNET_NO;
707     break;
708   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
709     /* should be impossible to encounter here */
710     GNUNET_break (0);
711     return GNUNET_YES;
712   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
713     GNUNET_break_op (0);
714     return GNUNET_NO;
715   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
716     GNUNET_break (0);
717     return GNUNET_NO;
718   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
719     GNUNET_break (0);
720     return GNUNET_NO;
721   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
722     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
723                 "Unsupported block type (%u) in request!\n",
724                 record->type);
725     return GNUNET_NO;
726   }
727   if (GNUNET_NO == frc->do_copy)
728     {
729       /* first time, we can use the original data */
730       pm = frc->pm; 
731       frc->do_copy = GNUNET_YES;
732     }
733   else
734     {
735       /* two clients waiting for same reply, must copy for queueing */
736       pm = GNUNET_malloc (sizeof (struct PendingMessage) +
737                           ntohs (frc->pm->msg->size));
738       memcpy (pm, frc->pm, 
739               sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
740       pm->next = pm->prev = NULL;
741     }
742   reply = (struct ReplyMessage*) &pm[1];  
743   reply->unique_id = record->unique_id;
744   add_pending_message (record->client, pm);
745   if (GNUNET_YES == do_free)
746     remove_client_records (record->client, key, record);
747   return GNUNET_YES;
748 }
749
750
751 /**
752  * Handle a reply we've received from another peer.  If the reply
753  * matches any of our pending queries, forward it to the respective
754  * client(s).
755  *
756  * @param expiration when will the reply expire
757  * @param key the query this reply is for
758  * @param get_path_length number of peers in 'get_path'
759  * @param get_path path the reply took on get
760  * @param put_path_length number of peers in 'put_path'
761  * @param put_path path the reply took on put
762  * @param type type of the reply
763  * @param data_size number of bytes in 'data'
764  * @param data application payload data
765  */
766 void
767 GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
768                          const GNUNET_HashCode *key,
769                          unsigned int get_path_length,
770                          const struct GNUNET_PeerIdentity *get_path,
771                          unsigned int put_path_length,
772                          const struct GNUNET_PeerIdentity *put_path,
773                          uint32_t type,
774                          size_t data_size,
775                          const void *data)
776 {
777   struct ForwardReplyContext frc;
778   struct PendingMessage *pm;
779   struct ReplyMessage *reply;
780   struct GNUNET_PeerIdentity *paths;
781   size_t msize;
782
783   if (NULL ==
784       GNUNET_CONTAINER_multihashmap_get (foward_map, key))
785     return; /* no matching request, fast exit! */
786   msize = sizeof(struct ReplyMessage) + data_size + 
787     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
788   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
789     {
790       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
791                   _("Could not pass reply to client, message too big!\n"));
792       return;
793     }
794   pm = (struct PendingMessage *) GNUNET_malloc (msize + sizeof (struct PendingMessage));
795   reply = (struct ReplyMessage*) &pm[1];
796   pm->msg = &reply->header;
797   reply->header.size = htons ((uint16_t) msize);
798   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
799   reply->type = htonl (type);
800   reply->get_path_length = htonl (get_path_length);
801   reply->put_path_length = htonl (put_path_length);
802   reply->unique_id = 0; /* filled in later */
803   reply->expiration = GNUNET_TIME_absolute_hton (expiration);
804   reply->key = *key;
805   paths = (struct GNUNET_PeerIdentity*) &reply[1];
806   mempcy (paths, get_path, 
807           sizeof (struct GNUNET_PeerIdentity) * get_path_length);
808   mempcy (&paths[get_path_length], 
809           put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
810   memcpy (&paths[get_path_length + put_path_length],
811           data, 
812           data_size);
813   frc.do_copy = GNUNET_NO;
814   frc.pm = pm;
815   frc.data = data;
816   frc.data_size = data_size;
817   frc.type = type;
818   GNUNET_CONTAINER_multihashmap_get_multiple (foward_map, key,
819                                               &forward_reply,
820                                               &frc);
821   if (GNUNET_NO == frc.do_copy)
822     {
823       /* did not match any of the requests, free! */
824       GNUNET_free (buf);
825     }
826 }
827
828
829 /**
830  * Initialize client subsystem.
831  *
832  * @param server the initialized server
833  */
834 void 
835 GDS_CLIENT_init (struct GNUNET_SERVER_Handle *server)
836 {
837   static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
838     {&handle_dht_local_put, NULL, 
839      GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0},
840     {&handle_dht_local_get, NULL, 
841      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
842     {&handle_dht_local_get_stop, NULL,
843      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, 
844      sizeof (struct GNUNET_DHT_StopMessage) },
845     {NULL, NULL, 0, 0}
846   };
847   forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
848   retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
849   GNUNET_SERVER_add_handlers (server, plugin_handlers);
850   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
851 }
852
853
854 /**
855  * Shutdown client subsystem.
856  */
857 void
858 GDS_CLIENT_done ()
859 {
860   GNUNET_assert (client_head == NULL);
861   GNUNET_assert (client_tail == NULL);
862   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
863     {
864       GNUNET_SCHEDULER_cancel (retry_task);
865       retry_task = GNUNET_SCHEDULER_NO_TASK;
866     }
867   GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
868   GNUNET_CONTAINER_heap_destroy (retry_heap);
869   retry_heap = NULL;
870   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
871   GNUNET_CONTAINER_multihashmap_destroy (forward_map);
872   forward_map = NULL;
873 }
874
875 /* end of gnunet-service-dht_clients.c */
876