fcfdfc7fd290164c666416f56e8ab07047e85d67
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet-service-dht.h"
32 #include "gnunet-service-dht_clients.h"
33 #include "gnunet-service-dht_datacache.h"
34 #include "gnunet-service-dht_neighbours.h"
35 #include "dht_new.h"
36
37
38 /**
39  * Linked list of messages to send to clients.
40  */
41 struct PendingMessage
42 {
43   /**
44    * Pointer to next item in the list
45    */
46   struct PendingMessage *next;
47
48   /**
49    * Pointer to previous item in the list
50    */
51   struct PendingMessage *prev;
52
53   /**
54    * Actual message to be sent, allocated at the end of the struct:
55    * // msg = (cast) &pm[1]; 
56    * // memcpy (&pm[1], data, len);
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60 };
61
62
63 /**
64  * Struct containing information about a client,
65  * handle to connect to it, and any pending messages
66  * that need to be sent to it.
67  */
68 struct ClientList
69 {
70   /**
71    * Linked list of active clients
72    */
73   struct ClientList *next;
74
75   /**
76    * Linked list of active clients
77    */
78   struct ClientList *prev;
79
80   /**
81    * The handle to this client
82    */
83   struct GNUNET_SERVER_Client *client_handle;
84
85   /**
86    * Handle to the current transmission request, NULL
87    * if none pending.
88    */
89   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
90
91   /**
92    * Linked list of pending messages for this client
93    */
94   struct PendingMessage *pending_head;
95
96   /**
97    * Tail of linked list of pending messages for this client
98    */
99   struct PendingMessage *pending_tail;
100
101 };
102
103
104 /**
105  * Entry in the DHT routing table for a client's GET request.
106  */
107 struct ClientQueryRecord
108 {
109
110   /**
111    * The key this request was about
112    */
113   GNUNET_HashCode key;
114
115   /**
116    * Client responsible for the request.
117    */
118   struct ClientList *client;
119
120   /**
121    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
122    */
123   const void *xquery;
124
125   /**
126    * Replies we have already seen for this request.
127    */
128   GNUNET_HashCode *seen_replies;
129
130   /**
131    * Pointer to this nodes heap location in the retry-heap (for fast removal)
132    */
133   struct GNUNET_CONTAINER_HeapNode *hnode;
134
135   /**
136    * What's the delay between re-try operations that we currently use for this
137    * request?
138    */
139   struct GNUNET_TIME_Relative retry_frequency;
140
141   /**
142    * What's the next time we should re-try this request?
143    */
144   struct GNUNET_TIME_Absolute retry_time;
145
146   /**
147    * The unique identifier of this request
148    */
149   uint64_t unique_id;
150
151   /**
152    * Number of bytes in xquery.
153    */
154   size_t xquery_size;
155
156   /**
157    * Number of entries in 'seen_replies'.
158    */
159   unsigned int seen_replies_count;
160
161   /**
162    * Desired replication level
163    */
164   uint32_t replication;
165
166   /**
167    * Any message options for this request
168    */
169   uint32_t msg_options;
170
171   /**
172    * The type for the data for the GET request.
173    */
174   enum GNUNET_BLOCK_Type type;
175
176 };
177
178
179 /**
180  * List of active clients.
181  */
182 static struct ClientList *client_head;
183
184 /**
185  * List of active clients.
186  */
187 static struct ClientList *client_tail;
188
189 /**
190  * Hashmap for fast key based lookup, maps keys to 'struct ClientQueryRecord' entries.
191  */
192 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
193
194 /**
195  * Heap with all of our client's request, sorted by retry time (earliest on top).
196  */
197 static struct GNUNET_CONTAINER_Heap *retry_heap;
198
199 /**
200  * Task that re-transmits requests (using retry_heap).
201  */
202 static GNUNET_SCHEDULER_TaskIdentifier retry_task;
203
204
205 /**
206  * Find a client if it exists, add it otherwise.
207  *
208  * @param client the server handle to the client
209  *
210  * @return the client if found, a new client otherwise
211  */
212 static struct ClientList *
213 find_active_client (struct GNUNET_SERVER_Client *client)
214 {
215   struct ClientList *pos = client_head;
216   struct ClientList *ret;
217
218   while (pos != NULL)
219   {
220     if (pos->client_handle == client)
221       return pos;
222     pos = pos->next;
223   }
224   ret = GNUNET_malloc (sizeof (struct ClientList));
225   ret->client_handle = client;
226   GNUNET_CONTAINER_DLL_insert (client_head,
227                                client_tail,
228                                ret);
229   return ret;
230 }
231
232
233 /**
234  * Iterator over hash map entries that frees all entries 
235  * associated with the given client.
236  *
237  * @param cls client to search for in source routes
238  * @param key current key code (ignored)
239  * @param value value in the hash map, a ClientQueryRecord
240  * @return GNUNET_YES (we should continue to iterate)
241  */
242 static int
243 remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
244 {
245   struct ClientList *client = cls;
246   struct ClientQueryRecord *record = value;
247
248   if (record->client != client)
249     return GNUNET_YES;
250   GNUNET_assert (GNUNET_YES ==
251                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
252                                                        key, record));
253   if (NULL != record->hnode)
254     GNUNET_CONTAINER_heap_remove_node (record->hnode);
255   GNUNET_array_grow (record->seen_replies,
256                      record->seen_replies_count,
257                      0);
258   GNUNET_free (record);
259   return GNUNET_YES;
260 }
261
262
263 /**
264  * Functions with this signature are called whenever a client
265  * is disconnected on the network level.
266  *
267  * @param cls closure (NULL for dht)
268  * @param client identification of the client; NULL
269  *        for the last call when the server is destroyed
270  */
271 static void
272 handle_client_disconnect (void *cls, 
273                           struct GNUNET_SERVER_Client *client)
274 {
275   struct ClientList *pos;
276   struct PendingMessage *reply;
277
278   pos = find_active_client (client);
279   GNUNET_CONTAINER_DLL_remove (client_head,
280                                client_tail,
281                                pos);
282   if (pos->transmit_handle != NULL)
283     GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle);
284   while (NULL != (reply = pos->pending_head))
285     {
286       GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail,
287                                    reply);
288       GNUNET_free (reply);
289     }
290   GNUNET_CONTAINER_multihashmap_iterate (forward_map,
291                                          &remove_client_records, pos);
292   GNUNET_free (pos);
293 }
294
295
296 /**
297  * Route the given request via the DHT.  This includes updating 
298  * the bloom filter and retransmission times, building the P2P
299  * message and initiating the routing operation.
300  */
301 static void
302 transmit_request (struct ClientQueryRecord *cqr)
303 {
304   int32_t reply_bf_mutator;
305   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
306
307   GNUNET_STATISTICS_update (GDS_stats,
308                             gettext_noop ("# GET requests from clients injected into P2P network"), 1,
309                             GNUNET_NO);
310   reply_bf_mutator = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
311                                                          UINT32_MAX);
312   reply_bf = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
313                                                  cqr->seen_replies,
314                                                  cqr->seen_replies_count);
315   GDS_NEIGHBOURS_handle_get (cqr->type,
316                              cqr->msg_options,
317                              cqr->replication,
318                              0 /* hop count */,
319                              &cqr->key,
320                              cqr->xquery,
321                              cqr->xquery_size,
322                              reply_bf,
323                              reply_bf_mutator,
324                              NULL /* no peers blocked initially */);
325   GNUNET_CONTAINER_bloomfilter_free (reply_bf);
326
327   /* exponential back-off for retries, max 1h */
328   cqr->retry_frequency = 
329     GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_HOURS,
330                               GNUNET_TIME_relative_multiply (cqr->retry_frequency, 2));
331   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
332 }
333
334
335 /**
336  * Task that looks at the 'retry_heap' and transmits all of the requests
337  * on the heap that are ready for transmission.  Then re-schedules
338  * itself (unless the heap is empty).
339  *
340  * @param cls unused
341  * @param tc scheduler context
342  */
343 static void
344 transmit_next_request_task (void *cls,
345                             const struct GNUNET_SCHEDULER_TaskContext *tc)
346 {
347   struct ClientQueryRecord *cqr;
348   struct GNUNET_TIME_Relative delay;
349
350   retry_task = GNUNET_SCHEDULER_NO_TASK;
351   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
352     return;
353   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
354     {
355       cqr->hnode = NULL;
356       delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
357       if (delay.rel_value > 0)
358         {
359           cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
360                                                      cqr->retry_time.abs_value);
361           retry_task = GNUNET_SCHEDULER_add_delayed (delay,
362                                                      &transmit_next_request_task,
363                                                      NULL);
364           return;
365         }
366       transmit_request (cqr);
367     }
368 }
369
370
371 /**
372  * Handler for PUT messages.
373  *
374  * @param cls closure for the service
375  * @param client the client we received this message from
376  * @param message the actual message received
377  */
378 static void
379 handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
380                       const struct GNUNET_MessageHeader *message)
381 {
382   const struct GNUNET_DHT_ClientPutMessage *dht_msg;
383   uint16_t size;
384   
385   size = ntohs (message->size);
386   if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
387     {
388       GNUNET_break (0);
389       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
390     }
391   GNUNET_STATISTICS_update (GDS_stats,
392                             gettext_noop ("# PUT requests received from clients"), 1,
393                             GNUNET_NO);
394   dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
395   /* give to local clients */
396   GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
397                            &dht_msg->key,
398                            0, NULL,
399                            0, NULL,
400                            ntohl (dht_msg->type),
401                            size - sizeof (struct GNUNET_DHT_ClientPutMessage),
402                            &dht_msg[1]);
403   /* store locally */
404   GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
405                             &dht_msg->key,
406                             0, NULL,
407                             ntohl (dht_msg->type),
408                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
409                             &dht_msg[1]);
410   /* route to other peers */
411   GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
412                              ntohl (dht_msg->options),
413                              ntohl (dht_msg->desired_replication_level),
414                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
415                              0 /* hop count */,
416                              NULL /* peer bloom filter */,
417                              &dht_msg->key,
418                              0, NULL,
419                              &dht_msg[1],
420                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
421   GNUNET_SERVER_receive_done (client, GNUNET_OK);
422 }
423
424
425 /**
426  * Handler for any generic DHT messages, calls the appropriate handler
427  * depending on message type, sends confirmation if responses aren't otherwise
428  * expected.
429  *
430  * @param cls closure for the service
431  * @param client the client we received this message from
432  * @param message the actual message received
433  */
434 static void
435 handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
436                       const struct GNUNET_MessageHeader *message)
437 {
438   const struct GNUNET_DHT_ClientGetMessage *get;
439   struct ClientQueryRecord *cqr;
440   size_t xquery_size;
441   const char* xquery;
442   uint16_t size;
443
444   size = ntohs (message->size);
445   if (size < sizeof (struct GNUNET_DHT_ClientGetMessage))
446     {
447       GNUNET_break (0);
448       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
449       return;
450     }
451   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
452   get = (const struct GNUNET_DHT_ClientGetMessage *) message;
453   xquery = (const char*) &get[1];
454   GNUNET_STATISTICS_update (GDS_stats,
455                             gettext_noop ("# GET requests received from clients"), 1,
456                             GNUNET_NO);
457   
458   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
459   cqr->key = get->key;
460   cqr->client = find_active_client (client);
461   cqr->xquery = (void*) &cqr[1];
462   memcpy (&cqr[1], xquery, xquery_size);
463   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); 
464   cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS;
465   cqr->retry_time = GNUNET_TIME_absolute_get ();
466   cqr->unique_id = get->unique_id;
467   cqr->xquery_size = xquery_size;
468   cqr->replication = ntohl (get->desired_replication_level);
469   cqr->msg_options = ntohl (get->options);
470   cqr->type = ntohl (get->type);  
471   GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
472                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
473   /* start remote requests */
474   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
475     GNUNET_SCHEDULER_cancel (retry_task);
476   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
477   /* perform local lookup */
478   GDS_DATACACHE_handle_get (&get->key,
479                             cqr->type,
480                             cqr->xquery,
481                             xquery_size,
482                             NULL, 0);
483   GNUNET_SERVER_receive_done (client, GNUNET_OK);
484 }
485
486
487 /**
488  * Closure for 'remove_by_unique_id'.
489  */
490 struct RemoveByUniqueIdContext
491 {
492   /**
493    * Client that issued the removal request.
494    */
495   struct ClientList *client;
496
497   /**
498    * Unique ID of the request.
499    */
500   uint64_t unique_id;
501 };
502
503
504 /**
505  * Iterator over hash map entries that frees all entries 
506  * that match the given client and unique ID.
507  *
508  * @param cls unique ID and client to search for in source routes
509  * @param key current key code
510  * @param value value in the hash map, a ClientQueryRecord
511  * @return GNUNET_YES (we should continue to iterate)
512  */
513 static int
514 remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value)
515 {
516   const struct RemoveByUniqueIdContext *ctx = cls;
517   struct ClientQueryRecord *record = value;
518
519   if (record->unique_id != ctx->unique_id)
520     return GNUNET_YES;
521   return remove_client_records (ctx->client, key, record);
522 }
523
524
525 /**
526  * Handler for any generic DHT stop messages, calls the appropriate handler
527  * depending on message type (if processed locally)
528  *
529  * @param cls closure for the service
530  * @param client the client we received this message from
531  * @param message the actual message received
532  *
533  */
534 static void
535 handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
536                            const struct GNUNET_MessageHeader *message)
537 {
538   const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
539     (const struct GNUNET_DHT_ClientGetStopMessage *) message;
540   struct RemoveByUniqueIdContext ctx;
541   
542   GNUNET_STATISTICS_update (GDS_stats,
543                             gettext_noop ("# GET STOP requests received from clients"), 1,
544                             GNUNET_NO);
545   ctx.client = find_active_client (client);
546   ctx.unique_id = dht_stop_msg->unique_id;
547   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
548                                               &dht_stop_msg->key,
549                                               &remove_by_unique_id,
550                                               &ctx);
551   GNUNET_SERVER_receive_done (client, GNUNET_OK);
552 }
553
554
555 /**
556  * Task run to check for messages that need to be sent to a client.
557  *
558  * @param client a ClientList, containing the client and any messages to be sent to it
559  */
560 static void
561 process_pending_messages (struct ClientList *client);
562
563
564 /**
565  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
566  * request.  A ClientList is passed as closure, take the head of the list
567  * and copy it into buf, which has the result of sending the message to the
568  * client.
569  *
570  * @param cls closure to this call
571  * @param size maximum number of bytes available to send
572  * @param buf where to copy the actual message to
573  *
574  * @return the number of bytes actually copied, 0 indicates failure
575  */
576 static size_t
577 send_reply_to_client (void *cls, size_t size, void *buf)
578 {
579   struct ClientList *client = cls;
580   char *cbuf = buf;
581   struct PendingMessage *reply;
582   size_t off;
583   size_t msize;
584
585   client->transmit_handle = NULL;
586   if (buf == NULL)
587   {
588     /* client disconnected */
589     return 0;
590   }
591   off = 0;
592   while ((NULL != (reply = client->pending_head)) &&
593          (size >= off + (msize = ntohs (reply->msg->size))))
594   {
595     GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
596                                  reply);
597     memcpy (&cbuf[off], reply->msg, msize);
598     GNUNET_free (reply);
599     off += msize;
600   }
601   process_pending_messages (client);
602   return off;
603 }
604
605
606 /**
607  * Task run to check for messages that need to be sent to a client.
608  *
609  * @param client a ClientList, containing the client and any messages to be sent to it
610  */
611 static void
612 process_pending_messages (struct ClientList *client)
613 {
614   if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
615     return;
616   client->transmit_handle =
617       GNUNET_SERVER_notify_transmit_ready (client->client_handle,
618                                            ntohs (client->pending_head->
619                                                   msg->size),
620                                            GNUNET_TIME_UNIT_FOREVER_REL,
621                                            &send_reply_to_client, client);
622 }
623
624
625 /**
626  * Add a PendingMessage to the clients list of messages to be sent
627  *
628  * @param client the active client to send the message to
629  * @param pending_message the actual message to send
630  */
631 static void
632 add_pending_message (struct ClientList *client,
633                      struct PendingMessage *pending_message)
634 {
635   GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
636                                     pending_message);
637   process_pending_messages (client);
638 }
639
640
641 /**
642  * Closure for 'forward_reply'
643  */
644 struct ForwardReplyContext
645 {
646
647   /**
648    * Actual message to send to matching clients. 
649    */
650   struct PendingMessage *pm;
651
652   /**
653    * Embedded payload.
654    */
655   const void *data;
656
657   /**
658    * Type of the data.
659    */
660   enum GNUNET_BLOCK_Type type;
661
662   /**
663    * Number of bytes in data.
664    */
665   size_t data_size;
666
667   /**
668    * Do we need to copy 'pm' because it was already used?
669    */
670   int do_copy;
671
672 };
673
674
675 /**
676  * Iterator over hash map entries that send a given reply to
677  * each of the matching clients.  With some tricky recycling
678  * of the buffer.
679  *
680  * @param cls the 'struct ForwardReplyContext'
681  * @param key current key
682  * @param value value in the hash map, a ClientQueryRecord
683  * @return GNUNET_YES (we should continue to iterate),
684  *         if the result is mal-formed, GNUNET_NO
685  */
686 static int
687 forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
688 {
689   struct ForwardReplyContext *frc = cls;
690   struct ClientQueryRecord *record = value;
691   struct PendingMessage *pm;
692   struct GNUNET_DHT_ClientResultMessage *reply;
693   enum GNUNET_BLOCK_EvaluationResult eval;
694   int do_free;
695   GNUNET_HashCode ch;
696   unsigned int i;
697   
698   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
699        (record->type != frc->type) )
700     return GNUNET_YES; /* type mismatch */
701   GNUNET_CRYPTO_hash (frc->data,
702                       frc->data_size,
703                       &ch);
704   for (i=0;i<record->seen_replies_count;i++)
705     if (0 == memcmp (&record->seen_replies[i],
706                      &ch,
707                      sizeof (GNUNET_HashCode)))
708       return GNUNET_YES; /* duplicate */             
709   eval =
710     GNUNET_BLOCK_evaluate (GDS_block_context, 
711                            record->type, key, 
712                            NULL, 0,
713                            record->xquery,
714                            record->xquery_size, 
715                            frc->data,
716                            frc->data_size);
717   switch (eval)
718   {
719   case GNUNET_BLOCK_EVALUATION_OK_LAST:
720     do_free = GNUNET_YES;
721     break;
722   case GNUNET_BLOCK_EVALUATION_OK_MORE:
723     GNUNET_array_append (record->seen_replies,
724                          record->seen_replies_count,
725                          ch);
726     do_free = GNUNET_NO;
727     break;
728   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
729     /* should be impossible to encounter here */
730     GNUNET_break (0);
731     return GNUNET_YES;
732   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
733     GNUNET_break_op (0);
734     return GNUNET_NO;
735   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
736     GNUNET_break (0);
737     return GNUNET_NO;
738   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
739     GNUNET_break (0);
740     return GNUNET_NO;
741   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
742     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
743                 "Unsupported block type (%u) in request!\n",
744                 record->type);
745     return GNUNET_NO;
746   }
747   if (GNUNET_NO == frc->do_copy)
748     {
749       /* first time, we can use the original data */
750       pm = frc->pm; 
751       frc->do_copy = GNUNET_YES;
752     }
753   else
754     {
755       /* two clients waiting for same reply, must copy for queueing */
756       pm = GNUNET_malloc (sizeof (struct PendingMessage) +
757                           ntohs (frc->pm->msg->size));
758       memcpy (pm, frc->pm, 
759               sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
760       pm->next = pm->prev = NULL;
761     }
762   GNUNET_STATISTICS_update (GDS_stats,
763                             gettext_noop ("# RESULTS queued for clients"), 1,
764                             GNUNET_NO);
765   reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1];  
766   reply->unique_id = record->unique_id;
767   add_pending_message (record->client, pm);
768   if (GNUNET_YES == do_free)
769     remove_client_records (record->client, key, record);
770   return GNUNET_YES;
771 }
772
773
774 /**
775  * Handle a reply we've received from another peer.  If the reply
776  * matches any of our pending queries, forward it to the respective
777  * client(s).
778  *
779  * @param expiration when will the reply expire
780  * @param key the query this reply is for
781  * @param get_path_length number of peers in 'get_path'
782  * @param get_path path the reply took on get
783  * @param put_path_length number of peers in 'put_path'
784  * @param put_path path the reply took on put
785  * @param type type of the reply
786  * @param data_size number of bytes in 'data'
787  * @param data application payload data
788  */
789 void
790 GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
791                          const GNUNET_HashCode *key,
792                          unsigned int get_path_length,
793                          const struct GNUNET_PeerIdentity *get_path,
794                          unsigned int put_path_length,
795                          const struct GNUNET_PeerIdentity *put_path,
796                          enum GNUNET_BLOCK_Type type,
797                          size_t data_size,
798                          const void *data)
799 {
800   struct ForwardReplyContext frc;
801   struct PendingMessage *pm;
802   struct GNUNET_DHT_ClientResultMessage *reply;
803   struct GNUNET_PeerIdentity *paths;
804   size_t msize;
805
806   if (NULL ==
807       GNUNET_CONTAINER_multihashmap_get (forward_map, key))
808     return; /* no matching request, fast exit! */
809   msize = sizeof(struct GNUNET_DHT_ClientResultMessage) + data_size + 
810     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
811   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
812     {
813       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
814                   _("Could not pass reply to client, message too big!\n"));
815       return;
816     }
817   pm = (struct PendingMessage *) GNUNET_malloc (msize + sizeof (struct PendingMessage));
818   reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1];
819   pm->msg = &reply->header;
820   reply->header.size = htons ((uint16_t) msize);
821   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
822   reply->type = htonl (type);
823   reply->get_path_length = htonl (get_path_length);
824   reply->put_path_length = htonl (put_path_length);
825   reply->unique_id = 0; /* filled in later */
826   reply->expiration = GNUNET_TIME_absolute_hton (expiration);
827   reply->key = *key;
828   paths = (struct GNUNET_PeerIdentity*) &reply[1];
829   memcpy (paths, get_path, 
830           sizeof (struct GNUNET_PeerIdentity) * get_path_length);
831   memcpy (&paths[get_path_length], 
832           put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
833   memcpy (&paths[get_path_length + put_path_length],
834           data, 
835           data_size);
836   frc.do_copy = GNUNET_NO;
837   frc.pm = pm;
838   frc.data = data;
839   frc.data_size = data_size;
840   frc.type = type;
841   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, key,
842                                               &forward_reply,
843                                               &frc);
844   if (GNUNET_NO == frc.do_copy)
845     {
846       /* did not match any of the requests, free! */
847       GNUNET_free (pm);
848     }
849 }
850
851
852 /**
853  * Initialize client subsystem.
854  *
855  * @param server the initialized server
856  */
857 void 
858 GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
859 {
860   static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
861     {&handle_dht_local_put, NULL, 
862      GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0},
863     {&handle_dht_local_get, NULL, 
864      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
865     {&handle_dht_local_get_stop, NULL,
866      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, 
867      sizeof (struct GNUNET_DHT_ClientGetStopMessage) },
868     {NULL, NULL, 0, 0}
869   };
870   forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
871   retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
872   GNUNET_SERVER_add_handlers (server, plugin_handlers);
873   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
874 }
875
876
877 /**
878  * Shutdown client subsystem.
879  */
880 void
881 GDS_CLIENTS_done ()
882 {
883   GNUNET_assert (client_head == NULL);
884   GNUNET_assert (client_tail == NULL);
885   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
886     {
887       GNUNET_SCHEDULER_cancel (retry_task);
888       retry_task = GNUNET_SCHEDULER_NO_TASK;
889     }
890   GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
891   GNUNET_CONTAINER_heap_destroy (retry_heap);
892   retry_heap = NULL;
893   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
894   GNUNET_CONTAINER_multihashmap_destroy (forward_map);
895   forward_map = NULL;
896 }
897
898 /* end of gnunet-service-dht_clients.c */
899