95a0d68d0a0018410f26c1fe0f7b3f0d53213c40
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_datacache_lib.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_statistics_service.h"
39 #include "dht_new.h"
40 #include <fenv.h>
41 #include "gnunet-service-dht_clients.h"
42 #include "gnunet-service-dht_datacache.h"
43 #include "gnunet-service-dht_neighbours.h"
44
45
46 /**
47  * Linked list of messages to send to clients.
48  */
49 struct PendingMessage
50 {
51   /**
52    * Pointer to next item in the list
53    */
54   struct PendingMessage *next;
55
56   /**
57    * Pointer to previous item in the list
58    */
59   struct PendingMessage *prev;
60
61   /**
62    * Actual message to be sent, allocated at the end of the struct:
63    * // msg = (cast) &pm[1]; 
64    * // memcpy (&pm[1], data, len);
65    */
66   const struct GNUNET_MessageHeader *msg;
67
68 };
69
70
71 /**
72  * Struct containing information about a client,
73  * handle to connect to it, and any pending messages
74  * that need to be sent to it.
75  */
76 struct ClientList
77 {
78   /**
79    * Linked list of active clients
80    */
81   struct ClientList *next;
82
83   /**
84    * Linked list of active clients
85    */
86   struct ClientList *prev;
87
88   /**
89    * The handle to this client
90    */
91   struct GNUNET_SERVER_Client *client_handle;
92
93   /**
94    * Handle to the current transmission request, NULL
95    * if none pending.
96    */
97   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
98
99   /**
100    * Linked list of pending messages for this client
101    */
102   struct PendingMessage *pending_head;
103
104   /**
105    * Tail of linked list of pending messages for this client
106    */
107   struct PendingMessage *pending_tail;
108
109 };
110
111
112 /**
113  * Entry in the DHT routing table for a client's GET request.
114  */
115 struct ClientQueryRecord
116 {
117
118   /**
119    * The key this request was about
120    */
121   GNUNET_HashCode key;
122
123   /**
124    * Client responsible for the request.
125    */
126   struct ClientList *client;
127
128   /**
129    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
130    */
131   const void *xquery;
132
133   /**
134    * Replies we have already seen for this request.
135    */
136   GNUNET_HashCode *seen_replies;
137
138   /**
139    * Pointer to this nodes heap location in the retry-heap (for fast removal)
140    */
141   struct GNUNET_CONTAINER_HeapNode *hnode;
142
143   /**
144    * What's the delay between re-try operations that we currently use for this
145    * request?
146    */
147   struct GNUNET_TIME_Relative retry_frequency;
148
149   /**
150    * What's the next time we should re-try this request?
151    */
152   struct GNUNET_TIME_Absolute retry_time;
153
154   /**
155    * The unique identifier of this request
156    */
157   uint64_t unique_id;
158
159   /**
160    * Number of bytes in xquery.
161    */
162   size_t xquery_size;
163
164   /**
165    * Number of entries in 'seen_replies'.
166    */
167   unsigned int seen_replies_count;
168
169   /**
170    * Desired replication level
171    */
172   uint32_t replication;
173
174   /**
175    * Any message options for this request
176    */
177   uint32_t msg_options;
178
179   /**
180    * The type for the data for the GET request; actually an 'enum
181    * GNUNET_BLOCK_Type'.
182    */
183   uint32_t msg_type;
184
185 };
186
187
188 /**
189  * List of active clients.
190  */
191 static struct ClientList *client_head;
192
193 /**
194  * List of active clients.
195  */
196 static struct ClientList *client_tail;
197
198 /**
199  * Hashmap for fast key based lookup, maps keys to 'struct ClientQueryRecord' entries.
200  */
201 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
202
203 /**
204  * Heap with all of our client's request, sorted by retry time (earliest on top).
205  */
206 static struct GNUNET_CONTAINER_Heap *retry_heap;
207
208 /**
209  * Task that re-transmits requests (using retry_heap).
210  */
211 static GNUNET_SCHEDULER_TaskIdentifier retry_task;
212
213
214 /**
215  * Find a client if it exists, add it otherwise.
216  *
217  * @param client the server handle to the client
218  *
219  * @return the client if found, a new client otherwise
220  */
221 static struct ClientList *
222 find_active_client (struct GNUNET_SERVER_Client *client)
223 {
224   struct ClientList *pos = client_list;
225   struct ClientList *ret;
226
227   while (pos != NULL)
228   {
229     if (pos->client_handle == client)
230       return pos;
231     pos = pos->next;
232   }
233   ret = GNUNET_malloc (sizeof (struct ClientList));
234   ret->client_handle = client;
235   GNUNET_CONTAINER_DLL_insert (client_head,
236                                client_tail,
237                                ret);
238   return ret;
239 }
240
241
242 /**
243  * Iterator over hash map entries that frees all entries 
244  * associated with the given client.
245  *
246  * @param cls client to search for in source routes
247  * @param key current key code (ignored)
248  * @param value value in the hash map, a ClientQueryRecord
249  * @return GNUNET_YES (we should continue to iterate)
250  */
251 static int
252 remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
253 {
254   struct ClientList *client = cls;
255   struct ClientQueryRecord *record = value;
256
257   if (record->client != client)
258     return GNUNET_YES;
259   GNUNET_assert (GNUNET_YES ==
260                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
261                                                        key, record));
262   GNUNET_CONTAINER_heap_remove_node (record->hnode);
263   GNUNET_ARRAY_append (record->seen_replies,
264                        record->seen_replies_count,
265                        0);
266   GNUNET_free (record);
267   return GNUNET_YES;
268 }
269
270
271 /**
272  * Functions with this signature are called whenever a client
273  * is disconnected on the network level.
274  *
275  * @param cls closure (NULL for dht)
276  * @param client identification of the client; NULL
277  *        for the last call when the server is destroyed
278  */
279 static void
280 handle_client_disconnect (void *cls, 
281                           struct GNUNET_SERVER_Client *client)
282 {
283   struct ClientList *pos = client_list;
284   struct ClientList *found;
285   struct PendingMessage *reply;
286
287   found = NULL;
288   while (pos != NULL)
289   {
290     if (pos->client_handle == client)
291     {
292       GNUNET_CONTAINER_DLL_remove (client_head,
293                                    client_tail,
294                                    pos);
295       found = pos;
296       break;
297     }
298     pos = pos->next;
299   }
300   if (found == NULL)
301     return;
302   if (found->transmit_handle != NULL)
303     GNUNET_CONNECTION_notify_transmit_ready_cancel (found->transmit_handle);
304   while (NULL != (reply = found->pending_head))
305     {
306       GNUNET_CONTAINER_DLL_remove (found->pending_head, found->pending_tail,
307                                    reply);
308       GNUNET_free (reply);
309     }
310   GNUNET_CONTAINER_multihashmap_iterate (forward_list.hashmap,
311                                          &remove_client_records, found);
312   GNUNET_free (found);
313 }
314
315
316 /**
317  * Route the given request via the DHT.  This includes updating 
318  * the bloom filter and retransmission times, building the P2P
319  * message and initiating the routing operation.
320  */
321 static void
322 transmit_request (struct ClientQueryRecord *cqr)
323 {
324   int32_t reply_bf_mutator;
325
326   reply_bf_mutator = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
327                                                          UINT32_MAX);
328   reply_bf = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
329                                                  cqr->seen_replies,
330                                                  cqr->seen_replies_count);
331   GST_NEIGHBOURS_handle_get (cqr->msg_type,
332                              cqr->msg_options,
333                              cqr->replication,
334                              &cqr->key,
335                              cqr->xquery,
336                              cqr->xquery_size,
337                              reply_bf,
338                              reply_bf_mutator,
339                              NULL /* no peers blocked initially */);
340   GNUNET_CONTAINER_bloomfilter_destroy (reply_bf);
341
342   /* exponential back-off for retries, max 1h */
343   cqr->retry_frequency = 
344     GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_HOURS,
345                               GNUNET_TIME_relative_multiply (cqr->retry_frequency, 2));
346   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
347 }
348
349
350 /**
351  * Task that looks at the 'retry_heap' and transmits all of the requests
352  * on the heap that are ready for transmission.  Then re-schedules
353  * itself (unless the heap is empty).
354  *
355  * @param cls unused
356  * @param tc scheduler context
357  */
358 static void
359 transmit_next_request_task (void *cls,
360                             const struct GNUNET_SCHEDULER_TaskContext *tc)
361 {
362   struct ClientQueryRecord *cqr;
363   struct GNUNET_TIME_Relative delay;
364
365   retry_task = GNUNET_SCHEDULER_NO_TASK;
366   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
367     return;
368   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
369     {
370       cqr->hnode = NULL;
371       delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
372       if (delay.value > 0)
373         {
374           cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
375                                                      cqr->retry_time.abs_value);
376           retry_task = GNUNET_SCHEDULER_add_delayed (delay,
377                                                      &transmit_next_request_task,
378                                                      NULL);
379           return;
380         }
381       transmit_request (cqr);
382     }
383 }
384
385
386 /**
387  * Handler for PUT messages.
388  *
389  * @param cls closure for the service
390  * @param client the client we received this message from
391  * @param message the actual message received
392  */
393 static void
394 handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
395                       const struct GNUNET_MessageHeader *message)
396 {
397   const struct GNUNET_DHT_ClientPutMessage *dht_msg;
398   uint16_t size;
399   
400   size = ntohs (message->size);
401   if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
402     {
403       GNUNET_break (0);
404       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
405     }
406   dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
407   /* give to local clients */
408   GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
409                            &dht_msg->key,
410                            0, NULL,
411                            0, NULL,
412                            ntohl (dht_msg->type),
413                            size - sizeof (struct GNUNET_DHT_ClientPutMessage),
414                            &dht_msg[1]);
415   /* store locally */
416   GST_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
417                             &dht_msg->key,
418                             0, NULL,
419                             ntohl (dht_msg->type),
420                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
421                             &dht_msg[1]);
422   /* route to other peers */
423   GST_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
424                              ntohl (dht_msg->options),
425                              ntohl (dht_msg->desired_replication_level),
426                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
427                              &dht_msg->key,
428                              &dht_msg[1],
429                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
430   GNUNET_SERVER_receive_done (client, GNUNET_OK);
431 }
432
433
434 /**
435  * Handler for any generic DHT messages, calls the appropriate handler
436  * depending on message type, sends confirmation if responses aren't otherwise
437  * expected.
438  *
439  * @param cls closure for the service
440  * @param client the client we received this message from
441  * @param message the actual message received
442  */
443 static void
444 handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
445                       const struct GNUNET_MessageHeader *message)
446 {
447   const struct GNUNET_DHT_ClientGetMessage *get;
448   const struct GNUNET_MessageHeader *enc_msg;
449
450   struct ClientQueryRecord *cqr;
451   size_t xquery_size;
452   const char* xquery;
453   uint16_t size;
454
455   size = ntohs (message->size);
456   if (size < sizeof (struct GNUNET_DHT_ClientGetMessage))
457     {
458       GNUNET_break (0);
459       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
460       return;
461     }
462   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
463   get = (const struct GNUNET_DHT_ClientGetMessage *) message;
464   xquery = (const char*) &get[1];
465
466   
467   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
468   cqr->key = get->key;
469   cqr->client = find_active_client (client);
470   cqr->xquery = (void*) &cqr[1];
471   memcpy (&cqr[1], xquery, xquery_size);
472   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); 
473   cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS;
474   cqr->retry_time = GNUNET_TIME_absolute_get ();
475   cqr->unique_id = get->unique_id;
476   cqr->xquery_size = xquery_size;
477   cqr->replication = ntohl (get->desired_replication_level);
478   cqr->msg_options = ntohl (get->options);
479   cqr->msg_type = ntohl (get->type);  
480   GNUNET_CONTAINER_multihashmap_put (forward_map, KEY, cqr,
481                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
482   /* start remote requests */
483   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
484     GNUNET_SCHEDULER_cancel (retry_task);
485   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
486   /* perform local lookup */
487   GDS_DATACACHE_handle_get (&get->key,
488                             cqr->msg_type,
489                             cqr->xquery,
490                             xquery_size,
491                             NULL, 0);
492   GNUNET_SERVER_receive_done (client, GNUNET_OK);
493 }
494
495
496 /**
497  * Closure for 'remove_by_uid'.
498  */
499 struct RemoveByUidContext
500 {
501   /**
502    * Client that issued the removal request.
503    */
504   struct ClientList *client;
505
506   /**
507    * Unique ID of the request.
508    */
509   uint64_t uid;
510 };
511
512
513 /**
514  * Iterator over hash map entries that frees all entries 
515  * that match the given client and UID.
516  *
517  * @param cls UID and client to search for in source routes
518  * @param key current key code
519  * @param value value in the hash map, a ClientQueryRecord
520  * @return GNUNET_YES (we should continue to iterate)
521  */
522 static int
523 remove_by_uid (void *cls, const GNUNET_HashCode * key, void *value)
524 {
525   const struct RemoveByUidContext *ctx = cls;
526   struct ClientQueryRecord *record = value;
527
528   if (record->uid != ctx->uid)
529     return GNUNET_YES;
530   return remove_client_records (ctx->client, key, record);
531 }
532
533
534 /**
535  * Handler for any generic DHT stop messages, calls the appropriate handler
536  * depending on message type (if processed locally)
537  *
538  * @param cls closure for the service
539  * @param client the client we received this message from
540  * @param message the actual message received
541  *
542  */
543 static void
544 handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
545                            const struct GNUNET_MessageHeader *message)
546 {
547   const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
548     (const struct GNUNET_DHT_ClientGetStopMessage *) message;
549   
550   ctx.client = find_active_client (client);
551   ctx.uid = &dht_stop_msg.unique_id);
552   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
553                                               &dht_stop_msg->key,
554                                               &remove_by_uid,
555                                               &ctx);
556   GNUNET_SERVER_receive_done (client, GNUNET_OK);
557 }
558
559
560 /**
561  * Task run to check for messages that need to be sent to a client.
562  *
563  * @param client a ClientList, containing the client and any messages to be sent to it
564  */
565 static void
566 process_pending_messages (struct ClientList *client);
567
568
569 /**
570  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
571  * request.  A ClientList is passed as closure, take the head of the list
572  * and copy it into buf, which has the result of sending the message to the
573  * client.
574  *
575  * @param cls closure to this call
576  * @param size maximum number of bytes available to send
577  * @param buf where to copy the actual message to
578  *
579  * @return the number of bytes actually copied, 0 indicates failure
580  */
581 static size_t
582 send_reply_to_client (void *cls, size_t size, void *buf)
583 {
584   struct ClientList *client = cls;
585   char *cbuf = buf;
586   struct PendingMessage *reply;
587   size_t off;
588   size_t msize;
589
590   client->transmit_handle = NULL;
591   if (buf == NULL)
592   {
593     /* client disconnected */
594     return 0;
595   }
596   off = 0;
597   while ((NULL != (reply = client->pending_head)) &&
598          (size >= off + (msize = ntohs (reply->msg->size))))
599   {
600     GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
601                                  reply);
602     memcpy (&cbuf[off], reply->msg, msize);
603     GNUNET_free (reply);
604     off += msize;
605   }
606   process_pending_messages (client);
607   return off;
608 }
609
610
611 /**
612  * Task run to check for messages that need to be sent to a client.
613  *
614  * @param client a ClientList, containing the client and any messages to be sent to it
615  */
616 static void
617 process_pending_messages (struct ClientList *client)
618 {
619   if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
620     return;
621   client->transmit_handle =
622       GNUNET_SERVER_notify_transmit_ready (client->client_handle,
623                                            ntohs (client->pending_head->
624                                                   msg->size),
625                                            GNUNET_TIME_UNIT_FOREVER_REL,
626                                            &send_reply_to_client, client);
627 }
628
629
630 /**
631  * Add a PendingMessage to the clients list of messages to be sent
632  *
633  * @param client the active client to send the message to
634  * @param pending_message the actual message to send
635  */
636 static void
637 add_pending_message (struct ClientList *client,
638                      struct PendingMessage *pending_message)
639 {
640   GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
641                                     pending_message);
642   process_pending_messages (client);
643 }
644
645
646 /**
647  * Closure for 'forward_reply'
648  */
649 struct ForwardReplyContext
650 {
651
652   /**
653    * Actual message to send to matching clients. 
654    */
655   struct PendingMessage *pm;
656
657   /**
658    * Embedded payload.
659    */
660   const void *data;
661
662   /**
663    * Type of the data.
664    */
665   uint32_t type;
666
667   /**
668    * Number of bytes in data.
669    */
670   size_t data_size;
671
672   /**
673    * Do we need to copy 'pm' because it was already used?
674    */
675   int do_copy;
676
677 };
678
679
680 /**
681  * Iterator over hash map entries that send a given reply to
682  * each of the matching clients.  With some tricky recycling
683  * of the buffer.
684  *
685  * @param cls the 'struct ForwardReplyContext'
686  * @param key current key
687  * @param value value in the hash map, a ClientQueryRecord
688  * @return GNUNET_YES (we should continue to iterate),
689  *         if the result is mal-formed, GNUNET_NO
690  */
691 static int
692 forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
693 {
694   struct ForwardReplyContext *frc = cls;
695   struct ClientQueryRecord *record = value;
696   struct PendingMessage *pm;
697   struct ReplyMessage *reply;
698   enum GNUNET_BLOCK_EvaluationResult eval;
699   int do_free;
700   GNUNET_HashCode ch;
701   unsigned int i;
702   
703   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
704        (record->type != frc->type) )
705     return GNUNET_YES; /* type mismatch */
706   GNUNET_CRYPTO_hash (frc->data,
707                       frc->data_size,
708                       &ch);
709   for (i=0;i<record->seen_replies_count;i++)
710     if (0 == memcmp (&record->seen_replies[i],
711                      &ch,
712                      sizeof (GNUNET_HashCode)))
713       return GNUNET_YES; /* duplicate */             
714   eval =
715     GNUNET_BLOCK_evaluate (GDS_block_context, 
716                            record->type, key, 
717                            NULL, 0,
718                            record->xquery,
719                            record->xquery_size, 
720                            frc->data,
721                            frc->data_size);
722   switch (eval)
723   {
724   case GNUNET_BLOCK_EVALUATION_OK_LAST:
725     do_free = GNUNET_YES;
726     break;
727   case GNUNET_BLOCK_EVALUATION_OK_MORE:
728     GNUNET_ARRAY_append (record->seen_replies,
729                          record->seen_replies_count,
730                          ch);
731     do_free = GNUNET_NO;
732     break;
733   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
734     /* should be impossible to encounter here */
735     GNUNET_break (0);
736     return GNUNET_YES;
737   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
738     GNUNET_break_op (0);
739     return GNUNET_NO;
740   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
741     GNUNET_break (0);
742     return GNUNET_NO;
743   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
744     GNUNET_break (0);
745     return GNUNET_NO;
746   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
747     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
748                 "Unsupported block type (%u) in request!\n",
749                 record->type);
750     return GNUNET_NO;
751   }
752   if (GNUNET_NO == frc->do_copy)
753     {
754       /* first time, we can use the original data */
755       pm = frc->pm; 
756       frc->do_copy = GNUNET_YES;
757     }
758   else
759     {
760       /* two clients waiting for same reply, must copy for queueing */
761       pm = GNUNET_malloc (sizeof (struct PendingMessage) +
762                           ntohs (frc->pm->msg->size));
763       memcpy (pm, frc->pm, 
764               sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
765       pm->next = pm->prev = NULL;
766     }
767   reply = (struct ReplyMessage*) &pm[1];  
768   reply->unique_id = record->unique_id;
769   add_pending_message (record->client, pm);
770   if (GNUNET_YES == do_free)
771     remove_client_records (record->client, key, record);
772   return GNUNET_YES;
773 }
774
775
776 /**
777  * Handle a reply we've received from another peer.  If the reply
778  * matches any of our pending queries, forward it to the respective
779  * client(s).
780  *
781  * @param expiration when will the reply expire
782  * @param key the query this reply is for
783  * @param get_path_length number of peers in 'get_path'
784  * @param get_path path the reply took on get
785  * @param put_path_length number of peers in 'put_path'
786  * @param put_path path the reply took on put
787  * @param type type of the reply
788  * @param data_size number of bytes in 'data'
789  * @param data application payload data
790  */
791 void
792 GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
793                          const GNUNET_HashCode *key,
794                          unsigned int get_path_length,
795                          const struct GNUNET_PeerIdentity *get_path,
796                          unsigned int put_path_length,
797                          const struct GNUNET_PeerIdentity *put_path,
798                          uint32_t type,
799                          size_t data_size,
800                          const void *data)
801 {
802   struct ForwardReplyContext frc;
803   struct PendingMessage *pm;
804   struct ReplyMessage *reply;
805   struct GNUNET_PeerIdentity *paths;
806   size_t msize;
807
808   if (NULL ==
809       GNUNET_CONTAINER_multihashmap_get (foward_map, key))
810     return; /* no matching request, fast exit! */
811   msize = sizeof(struct ReplyMessage) + data_size + 
812     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
813   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
814     {
815       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
816                   _("Could not pass reply to client, message too big!\n"));
817       return;
818     }
819   pm = (struct PendingMessage *) GNUNET_malloc (msize + sizeof (struct PendingMessage));
820   reply = (struct ReplyMessage*) &pm[1];
821   pm->msg = &reply->header;
822   reply->header.size = htons ((uint16_t) msize);
823   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
824   reply->type = htonl (type);
825   reply->get_path_length = htonl (get_path_length);
826   reply->put_path_length = htonl (put_path_length);
827   reply->unique_id = 0; /* filled in later */
828   reply->expiration = GNUNET_TIME_absolute_hton (expiration);
829   reply->key = *key;
830   paths = (struct GNUNET_PeerIdentity*) &reply[1];
831   mempcy (paths, get_path, 
832           sizeof (struct GNUNET_PeerIdentity) * get_path_length);
833   mempcy (&paths[get_path_length], 
834           put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
835   memcpy (&paths[get_path_length + put_path_length],
836           data, 
837           data_size);
838   frc.do_copy = GNUNET_NO;
839   frc.pm = pm;
840   frc.data = data;
841   frc.data_size = data_size;
842   frc.type = type;
843   GNUNET_CONTAINER_multihashmap_get_multiple (foward_map, key,
844                                               &forward_reply,
845                                               &frc);
846   if (GNUNET_NO == frc.do_copy)
847     {
848       /* did not match any of the requests, free! */
849       GNUNET_free (buf);
850     }
851 }
852
853
854 /**
855  * Initialize client subsystem.
856  *
857  * @param server the initialized server
858  */
859 void 
860 GDS_CLIENT_init (struct GNUNET_SERVER_Handle *server)
861 {
862   static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
863     {&handle_dht_local_put, NULL, 
864      GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0},
865     {&handle_dht_local_get, NULL, 
866      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
867     {&handle_dht_local_get_stop, NULL,
868      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, 
869      sizeof (struct GNUNET_DHT_StopMessage) },
870     {NULL, NULL, 0, 0}
871   };
872   forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
873   retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
874   GNUNET_SERVER_add_handlers (server, plugin_handlers);
875   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
876 }
877
878
879 /**
880  * Shutdown client subsystem.
881  */
882 void
883 GDS_CLIENT_done ()
884 {
885   GNUNET_assert (client_head == NULL);
886   GNUNET_assert (client_tail == NULL);
887   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
888     {
889       GNUNET_SCHEDULER_cancel (retry_task);
890       retry_task = GNUNET_SCHEDULER_NO_TASK;
891     }
892   GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
893   GNUNET_CONTAINER_heap_destroy (retry_heap);
894   retry_heap = NULL;
895   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
896   GNUNET_CONTAINER_multihashmap_destroy (forward_map);
897   forward_map = NULL;
898 }
899
900 /* end of gnunet-service-dht_clients.c */
901