ebbde14b8c9a1e6a32637ed6ddc137992606f503
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet-service-dht.h"
32 #include "gnunet-service-dht_clients.h"
33 #include "gnunet-service-dht_datacache.h"
34 #include "gnunet-service-dht_neighbours.h"
35 #include "dht_new.h"
36
37
38 /**
39  * Linked list of messages to send to clients.
40  */
41 struct PendingMessage
42 {
43   /**
44    * Pointer to next item in the list
45    */
46   struct PendingMessage *next;
47
48   /**
49    * Pointer to previous item in the list
50    */
51   struct PendingMessage *prev;
52
53   /**
54    * Actual message to be sent, allocated at the end of the struct:
55    * // msg = (cast) &pm[1]; 
56    * // memcpy (&pm[1], data, len);
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60 };
61
62
63 /**
64  * Struct containing information about a client,
65  * handle to connect to it, and any pending messages
66  * that need to be sent to it.
67  */
68 struct ClientList
69 {
70   /**
71    * Linked list of active clients
72    */
73   struct ClientList *next;
74
75   /**
76    * Linked list of active clients
77    */
78   struct ClientList *prev;
79
80   /**
81    * The handle to this client
82    */
83   struct GNUNET_SERVER_Client *client_handle;
84
85   /**
86    * Handle to the current transmission request, NULL
87    * if none pending.
88    */
89   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
90
91   /**
92    * Linked list of pending messages for this client
93    */
94   struct PendingMessage *pending_head;
95
96   /**
97    * Tail of linked list of pending messages for this client
98    */
99   struct PendingMessage *pending_tail;
100
101 };
102
103
104 /**
105  * Entry in the DHT routing table for a client's GET request.
106  */
107 struct ClientQueryRecord
108 {
109
110   /**
111    * The key this request was about
112    */
113   GNUNET_HashCode key;
114
115   /**
116    * Client responsible for the request.
117    */
118   struct ClientList *client;
119
120   /**
121    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
122    */
123   const void *xquery;
124
125   /**
126    * Replies we have already seen for this request.
127    */
128   GNUNET_HashCode *seen_replies;
129
130   /**
131    * Pointer to this nodes heap location in the retry-heap (for fast removal)
132    */
133   struct GNUNET_CONTAINER_HeapNode *hnode;
134
135   /**
136    * What's the delay between re-try operations that we currently use for this
137    * request?
138    */
139   struct GNUNET_TIME_Relative retry_frequency;
140
141   /**
142    * What's the next time we should re-try this request?
143    */
144   struct GNUNET_TIME_Absolute retry_time;
145
146   /**
147    * The unique identifier of this request
148    */
149   uint64_t unique_id;
150
151   /**
152    * Number of bytes in xquery.
153    */
154   size_t xquery_size;
155
156   /**
157    * Number of entries in 'seen_replies'.
158    */
159   unsigned int seen_replies_count;
160
161   /**
162    * Desired replication level
163    */
164   uint32_t replication;
165
166   /**
167    * Any message options for this request
168    */
169   uint32_t msg_options;
170
171   /**
172    * The type for the data for the GET request.
173    */
174   enum GNUNET_BLOCK_Type type;
175
176 };
177
178
179 /**
180  * List of active clients.
181  */
182 static struct ClientList *client_head;
183
184 /**
185  * List of active clients.
186  */
187 static struct ClientList *client_tail;
188
189 /**
190  * Hashmap for fast key based lookup, maps keys to 'struct ClientQueryRecord' entries.
191  */
192 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
193
194 /**
195  * Heap with all of our client's request, sorted by retry time (earliest on top).
196  */
197 static struct GNUNET_CONTAINER_Heap *retry_heap;
198
199 /**
200  * Task that re-transmits requests (using retry_heap).
201  */
202 static GNUNET_SCHEDULER_TaskIdentifier retry_task;
203
204
205 /**
206  * Find a client if it exists, add it otherwise.
207  *
208  * @param client the server handle to the client
209  *
210  * @return the client if found, a new client otherwise
211  */
212 static struct ClientList *
213 find_active_client (struct GNUNET_SERVER_Client *client)
214 {
215   struct ClientList *pos = client_head;
216   struct ClientList *ret;
217
218   while (pos != NULL)
219   {
220     if (pos->client_handle == client)
221       return pos;
222     pos = pos->next;
223   }
224   ret = GNUNET_malloc (sizeof (struct ClientList));
225   ret->client_handle = client;
226   GNUNET_CONTAINER_DLL_insert (client_head,
227                                client_tail,
228                                ret);
229   return ret;
230 }
231
232
233 /**
234  * Iterator over hash map entries that frees all entries 
235  * associated with the given client.
236  *
237  * @param cls client to search for in source routes
238  * @param key current key code (ignored)
239  * @param value value in the hash map, a ClientQueryRecord
240  * @return GNUNET_YES (we should continue to iterate)
241  */
242 static int
243 remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
244 {
245   struct ClientList *client = cls;
246   struct ClientQueryRecord *record = value;
247
248   if (record->client != client)
249     return GNUNET_YES;
250   GNUNET_assert (GNUNET_YES ==
251                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
252                                                        key, record));
253   if (NULL != record->hnode)
254     GNUNET_CONTAINER_heap_remove_node (record->hnode);
255   GNUNET_array_grow (record->seen_replies,
256                      record->seen_replies_count,
257                      0);
258   GNUNET_free (record);
259   return GNUNET_YES;
260 }
261
262
263 /**
264  * Functions with this signature are called whenever a client
265  * is disconnected on the network level.
266  *
267  * @param cls closure (NULL for dht)
268  * @param client identification of the client; NULL
269  *        for the last call when the server is destroyed
270  */
271 static void
272 handle_client_disconnect (void *cls, 
273                           struct GNUNET_SERVER_Client *client)
274 {
275   struct ClientList *pos;
276   struct PendingMessage *reply;
277
278   pos = find_active_client (client);
279   GNUNET_CONTAINER_DLL_remove (client_head,
280                                client_tail,
281                                pos);
282   if (pos->transmit_handle != NULL)
283     GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle);
284   while (NULL != (reply = pos->pending_head))
285     {
286       GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail,
287                                    reply);
288       GNUNET_free (reply);
289     }
290   GNUNET_CONTAINER_multihashmap_iterate (forward_map,
291                                          &remove_client_records, pos);
292   GNUNET_free (pos);
293 }
294
295
296 /**
297  * Route the given request via the DHT.  This includes updating 
298  * the bloom filter and retransmission times, building the P2P
299  * message and initiating the routing operation.
300  */
301 static void
302 transmit_request (struct ClientQueryRecord *cqr)
303 {
304   int32_t reply_bf_mutator;
305   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
306
307   GNUNET_STATISTICS_update (GDS_stats,
308                             gettext_noop ("# GET requests from clients injected"), 1,
309                             GNUNET_NO);
310   reply_bf_mutator = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
311                                                          UINT32_MAX);
312   reply_bf = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
313                                                  cqr->seen_replies,
314                                                  cqr->seen_replies_count);
315   GDS_NEIGHBOURS_handle_get (cqr->type,
316                              cqr->msg_options,
317                              cqr->replication,
318                              0 /* hop count */,
319                              &cqr->key,
320                              cqr->xquery,
321                              cqr->xquery_size,
322                              reply_bf,
323                              reply_bf_mutator,
324                              NULL /* no peers blocked initially */);
325   GNUNET_CONTAINER_bloomfilter_free (reply_bf);
326
327   /* exponential back-off for retries, max 1h */
328   cqr->retry_frequency = 
329     GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_HOURS,
330                               GNUNET_TIME_relative_multiply (cqr->retry_frequency, 2));
331   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
332 }
333
334
335 /**
336  * Task that looks at the 'retry_heap' and transmits all of the requests
337  * on the heap that are ready for transmission.  Then re-schedules
338  * itself (unless the heap is empty).
339  *
340  * @param cls unused
341  * @param tc scheduler context
342  */
343 static void
344 transmit_next_request_task (void *cls,
345                             const struct GNUNET_SCHEDULER_TaskContext *tc)
346 {
347   struct ClientQueryRecord *cqr;
348   struct GNUNET_TIME_Relative delay;
349
350   retry_task = GNUNET_SCHEDULER_NO_TASK;
351   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
352     return;
353   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
354     {
355       cqr->hnode = NULL;
356       delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
357       if (delay.rel_value > 0)
358         {
359           cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
360                                                      cqr->retry_time.abs_value);
361           retry_task = GNUNET_SCHEDULER_add_delayed (delay,
362                                                      &transmit_next_request_task,
363                                                      NULL);
364           return;
365         }
366       transmit_request (cqr);
367     }
368 }
369
370
371 /**
372  * Handler for PUT messages.
373  *
374  * @param cls closure for the service
375  * @param client the client we received this message from
376  * @param message the actual message received
377  */
378 static void
379 handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
380                       const struct GNUNET_MessageHeader *message)
381 {
382   const struct GNUNET_DHT_ClientPutMessage *dht_msg;
383   uint16_t size;
384   
385   size = ntohs (message->size);
386   if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
387     {
388       GNUNET_break (0);
389       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
390       return;
391     }
392   GNUNET_STATISTICS_update (GDS_stats,
393                             gettext_noop ("# PUT requests received from clients"), 1,
394                             GNUNET_NO);
395   dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
396   /* give to local clients */
397   GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
398                            &dht_msg->key,
399                            0, NULL,
400                            0, NULL,
401                            ntohl (dht_msg->type),
402                            size - sizeof (struct GNUNET_DHT_ClientPutMessage),
403                            &dht_msg[1]);
404   /* store locally */
405   GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
406                             &dht_msg->key,
407                             0, NULL,
408                             ntohl (dht_msg->type),
409                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
410                             &dht_msg[1]);
411   /* route to other peers */
412   GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
413                              ntohl (dht_msg->options),
414                              ntohl (dht_msg->desired_replication_level),
415                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
416                              0 /* hop count */,
417                              NULL /* peer bloom filter */,
418                              &dht_msg->key,
419                              0, NULL,
420                              &dht_msg[1],
421                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
422   GNUNET_SERVER_receive_done (client, GNUNET_OK);
423 }
424
425
426 /**
427  * Handler for any generic DHT messages, calls the appropriate handler
428  * depending on message type, sends confirmation if responses aren't otherwise
429  * expected.
430  *
431  * @param cls closure for the service
432  * @param client the client we received this message from
433  * @param message the actual message received
434  */
435 static void
436 handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
437                       const struct GNUNET_MessageHeader *message)
438 {
439   const struct GNUNET_DHT_ClientGetMessage *get;
440   struct ClientQueryRecord *cqr;
441   size_t xquery_size;
442   const char* xquery;
443   uint16_t size;
444
445   size = ntohs (message->size);
446   if (size < sizeof (struct GNUNET_DHT_ClientGetMessage))
447     {
448       GNUNET_break (0);
449       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
450       return;
451     }
452   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
453   get = (const struct GNUNET_DHT_ClientGetMessage *) message;
454   xquery = (const char*) &get[1];
455   GNUNET_STATISTICS_update (GDS_stats,
456                             gettext_noop ("# GET requests received from clients"), 1,
457                             GNUNET_NO);
458   
459   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
460   cqr->key = get->key;
461   cqr->client = find_active_client (client);
462   cqr->xquery = (void*) &cqr[1];
463   memcpy (&cqr[1], xquery, xquery_size);
464   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); 
465   cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS;
466   cqr->retry_time = GNUNET_TIME_absolute_get ();
467   cqr->unique_id = get->unique_id;
468   cqr->xquery_size = xquery_size;
469   cqr->replication = ntohl (get->desired_replication_level);
470   cqr->msg_options = ntohl (get->options);
471   cqr->type = ntohl (get->type);  
472   GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
473                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
474   /* start remote requests */
475   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
476     GNUNET_SCHEDULER_cancel (retry_task);
477   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
478   /* perform local lookup */
479   GDS_DATACACHE_handle_get (&get->key,
480                             cqr->type,
481                             cqr->xquery,
482                             xquery_size,
483                             NULL, 0);
484   GNUNET_SERVER_receive_done (client, GNUNET_OK);
485 }
486
487
488 /**
489  * Closure for 'remove_by_unique_id'.
490  */
491 struct RemoveByUniqueIdContext
492 {
493   /**
494    * Client that issued the removal request.
495    */
496   struct ClientList *client;
497
498   /**
499    * Unique ID of the request.
500    */
501   uint64_t unique_id;
502 };
503
504
505 /**
506  * Iterator over hash map entries that frees all entries 
507  * that match the given client and unique ID.
508  *
509  * @param cls unique ID and client to search for in source routes
510  * @param key current key code
511  * @param value value in the hash map, a ClientQueryRecord
512  * @return GNUNET_YES (we should continue to iterate)
513  */
514 static int
515 remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value)
516 {
517   const struct RemoveByUniqueIdContext *ctx = cls;
518   struct ClientQueryRecord *record = value;
519
520   if (record->unique_id != ctx->unique_id)
521     return GNUNET_YES;
522   return remove_client_records (ctx->client, key, record);
523 }
524
525
526 /**
527  * Handler for any generic DHT stop messages, calls the appropriate handler
528  * depending on message type (if processed locally)
529  *
530  * @param cls closure for the service
531  * @param client the client we received this message from
532  * @param message the actual message received
533  *
534  */
535 static void
536 handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
537                            const struct GNUNET_MessageHeader *message)
538 {
539   const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
540     (const struct GNUNET_DHT_ClientGetStopMessage *) message;
541   struct RemoveByUniqueIdContext ctx;
542   
543   GNUNET_STATISTICS_update (GDS_stats,
544                             gettext_noop ("# GET STOP requests received from clients"), 1,
545                             GNUNET_NO);
546   ctx.client = find_active_client (client);
547   ctx.unique_id = dht_stop_msg->unique_id;
548   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
549                                               &dht_stop_msg->key,
550                                               &remove_by_unique_id,
551                                               &ctx);
552   GNUNET_SERVER_receive_done (client, GNUNET_OK);
553 }
554
555
556 /**
557  * Task run to check for messages that need to be sent to a client.
558  *
559  * @param client a ClientList, containing the client and any messages to be sent to it
560  */
561 static void
562 process_pending_messages (struct ClientList *client);
563
564
565 /**
566  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
567  * request.  A ClientList is passed as closure, take the head of the list
568  * and copy it into buf, which has the result of sending the message to the
569  * client.
570  *
571  * @param cls closure to this call
572  * @param size maximum number of bytes available to send
573  * @param buf where to copy the actual message to
574  *
575  * @return the number of bytes actually copied, 0 indicates failure
576  */
577 static size_t
578 send_reply_to_client (void *cls, size_t size, void *buf)
579 {
580   struct ClientList *client = cls;
581   char *cbuf = buf;
582   struct PendingMessage *reply;
583   size_t off;
584   size_t msize;
585
586   client->transmit_handle = NULL;
587   if (buf == NULL)
588   {
589     /* client disconnected */
590     return 0;
591   }
592   off = 0;
593   while ((NULL != (reply = client->pending_head)) &&
594          (size >= off + (msize = ntohs (reply->msg->size))))
595   {
596     GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
597                                  reply);
598     memcpy (&cbuf[off], reply->msg, msize);
599     GNUNET_free (reply);
600     off += msize;
601   }
602   process_pending_messages (client);
603   return off;
604 }
605
606
607 /**
608  * Task run to check for messages that need to be sent to a client.
609  *
610  * @param client a ClientList, containing the client and any messages to be sent to it
611  */
612 static void
613 process_pending_messages (struct ClientList *client)
614 {
615   if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
616     return;
617   client->transmit_handle =
618       GNUNET_SERVER_notify_transmit_ready (client->client_handle,
619                                            ntohs (client->pending_head->
620                                                   msg->size),
621                                            GNUNET_TIME_UNIT_FOREVER_REL,
622                                            &send_reply_to_client, client);
623 }
624
625
626 /**
627  * Add a PendingMessage to the clients list of messages to be sent
628  *
629  * @param client the active client to send the message to
630  * @param pending_message the actual message to send
631  */
632 static void
633 add_pending_message (struct ClientList *client,
634                      struct PendingMessage *pending_message)
635 {
636   GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
637                                     pending_message);
638   process_pending_messages (client);
639 }
640
641
642 /**
643  * Closure for 'forward_reply'
644  */
645 struct ForwardReplyContext
646 {
647
648   /**
649    * Actual message to send to matching clients. 
650    */
651   struct PendingMessage *pm;
652
653   /**
654    * Embedded payload.
655    */
656   const void *data;
657
658   /**
659    * Type of the data.
660    */
661   enum GNUNET_BLOCK_Type type;
662
663   /**
664    * Number of bytes in data.
665    */
666   size_t data_size;
667
668   /**
669    * Do we need to copy 'pm' because it was already used?
670    */
671   int do_copy;
672
673 };
674
675
676 /**
677  * Iterator over hash map entries that send a given reply to
678  * each of the matching clients.  With some tricky recycling
679  * of the buffer.
680  *
681  * @param cls the 'struct ForwardReplyContext'
682  * @param key current key
683  * @param value value in the hash map, a ClientQueryRecord
684  * @return GNUNET_YES (we should continue to iterate),
685  *         if the result is mal-formed, GNUNET_NO
686  */
687 static int
688 forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
689 {
690   struct ForwardReplyContext *frc = cls;
691   struct ClientQueryRecord *record = value;
692   struct PendingMessage *pm;
693   struct GNUNET_DHT_ClientResultMessage *reply;
694   enum GNUNET_BLOCK_EvaluationResult eval;
695   int do_free;
696   GNUNET_HashCode ch;
697   unsigned int i;
698   
699   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
700        (record->type != frc->type) )
701     return GNUNET_YES; /* type mismatch */
702   GNUNET_CRYPTO_hash (frc->data,
703                       frc->data_size,
704                       &ch);
705   for (i=0;i<record->seen_replies_count;i++)
706     if (0 == memcmp (&record->seen_replies[i],
707                      &ch,
708                      sizeof (GNUNET_HashCode)))
709       return GNUNET_YES; /* duplicate */             
710   eval =
711     GNUNET_BLOCK_evaluate (GDS_block_context, 
712                            record->type, key, 
713                            NULL, 0,
714                            record->xquery,
715                            record->xquery_size, 
716                            frc->data,
717                            frc->data_size);
718   switch (eval)
719   {
720   case GNUNET_BLOCK_EVALUATION_OK_LAST:
721     do_free = GNUNET_YES;
722     break;
723   case GNUNET_BLOCK_EVALUATION_OK_MORE:
724     GNUNET_array_append (record->seen_replies,
725                          record->seen_replies_count,
726                          ch);
727     do_free = GNUNET_NO;
728     break;
729   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
730     /* should be impossible to encounter here */
731     GNUNET_break (0);
732     return GNUNET_YES;
733   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
734     GNUNET_break_op (0);
735     return GNUNET_NO;
736   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
737     GNUNET_break (0);
738     return GNUNET_NO;
739   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
740     GNUNET_break (0);
741     return GNUNET_NO;
742   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
743     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
744                 "Unsupported block type (%u) in request!\n",
745                 record->type);
746     return GNUNET_NO;
747   }
748   if (GNUNET_NO == frc->do_copy)
749     {
750       /* first time, we can use the original data */
751       pm = frc->pm; 
752       frc->do_copy = GNUNET_YES;
753     }
754   else
755     {
756       /* two clients waiting for same reply, must copy for queueing */
757       pm = GNUNET_malloc (sizeof (struct PendingMessage) +
758                           ntohs (frc->pm->msg->size));
759       memcpy (pm, frc->pm, 
760               sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
761       pm->next = pm->prev = NULL;
762     }
763   GNUNET_STATISTICS_update (GDS_stats,
764                             gettext_noop ("# RESULTS queued for clients"), 1,
765                             GNUNET_NO);
766   reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1];  
767   reply->unique_id = record->unique_id;
768   add_pending_message (record->client, pm);
769   if (GNUNET_YES == do_free)
770     remove_client_records (record->client, key, record);
771   return GNUNET_YES;
772 }
773
774
775 /**
776  * Handle a reply we've received from another peer.  If the reply
777  * matches any of our pending queries, forward it to the respective
778  * client(s).
779  *
780  * @param expiration when will the reply expire
781  * @param key the query this reply is for
782  * @param get_path_length number of peers in 'get_path'
783  * @param get_path path the reply took on get
784  * @param put_path_length number of peers in 'put_path'
785  * @param put_path path the reply took on put
786  * @param type type of the reply
787  * @param data_size number of bytes in 'data'
788  * @param data application payload data
789  */
790 void
791 GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
792                          const GNUNET_HashCode *key,
793                          unsigned int get_path_length,
794                          const struct GNUNET_PeerIdentity *get_path,
795                          unsigned int put_path_length,
796                          const struct GNUNET_PeerIdentity *put_path,
797                          enum GNUNET_BLOCK_Type type,
798                          size_t data_size,
799                          const void *data)
800 {
801   struct ForwardReplyContext frc;
802   struct PendingMessage *pm;
803   struct GNUNET_DHT_ClientResultMessage *reply;
804   struct GNUNET_PeerIdentity *paths;
805   size_t msize;
806
807   if (NULL ==
808       GNUNET_CONTAINER_multihashmap_get (forward_map, key))
809     return; /* no matching request, fast exit! */
810   msize = sizeof(struct GNUNET_DHT_ClientResultMessage) + data_size + 
811     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
812   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
813     {
814       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
815                   _("Could not pass reply to client, message too big!\n"));
816       return;
817     }
818   pm = (struct PendingMessage *) GNUNET_malloc (msize + sizeof (struct PendingMessage));
819   reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1];
820   pm->msg = &reply->header;
821   reply->header.size = htons ((uint16_t) msize);
822   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
823   reply->type = htonl (type);
824   reply->get_path_length = htonl (get_path_length);
825   reply->put_path_length = htonl (put_path_length);
826   reply->unique_id = 0; /* filled in later */
827   reply->expiration = GNUNET_TIME_absolute_hton (expiration);
828   reply->key = *key;
829   paths = (struct GNUNET_PeerIdentity*) &reply[1];
830   memcpy (paths, get_path, 
831           sizeof (struct GNUNET_PeerIdentity) * get_path_length);
832   memcpy (&paths[get_path_length], 
833           put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
834   memcpy (&paths[get_path_length + put_path_length],
835           data, 
836           data_size);
837   frc.do_copy = GNUNET_NO;
838   frc.pm = pm;
839   frc.data = data;
840   frc.data_size = data_size;
841   frc.type = type;
842   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, key,
843                                               &forward_reply,
844                                               &frc);
845   if (GNUNET_NO == frc.do_copy)
846     {
847       /* did not match any of the requests, free! */
848       GNUNET_free (pm);
849     }
850 }
851
852
853 /**
854  * Initialize client subsystem.
855  *
856  * @param server the initialized server
857  */
858 void 
859 GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
860 {
861   static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
862     {&handle_dht_local_put, NULL, 
863      GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0},
864     {&handle_dht_local_get, NULL, 
865      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
866     {&handle_dht_local_get_stop, NULL,
867      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, 
868      sizeof (struct GNUNET_DHT_ClientGetStopMessage) },
869     {NULL, NULL, 0, 0}
870   };
871   forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
872   retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
873   GNUNET_SERVER_add_handlers (server, plugin_handlers);
874   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
875 }
876
877
878 /**
879  * Shutdown client subsystem.
880  */
881 void
882 GDS_CLIENTS_done ()
883 {
884   GNUNET_assert (client_head == NULL);
885   GNUNET_assert (client_tail == NULL);
886   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
887     {
888       GNUNET_SCHEDULER_cancel (retry_task);
889       retry_task = GNUNET_SCHEDULER_NO_TASK;
890     }
891   GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
892   GNUNET_CONTAINER_heap_destroy (retry_heap);
893   retry_heap = NULL;
894   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
895   GNUNET_CONTAINER_multihashmap_destroy (forward_map);
896   forward_map = NULL;
897 }
898
899 /* end of gnunet-service-dht_clients.c */
900