8790d8fbbbd81b25a1c07e30528c5bc523b1d895
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_datacache_lib.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_statistics_service.h"
39 #include "dht_new.h"
40 #include <fenv.h>
41 #include "gnunet-service-dht_clients.h"
42 #include "gnunet-service-dht_datacache.h"
43 #include "gnunet-service-dht_neighbours.h"
44
45
46 /**
47  * Linked list of messages to send to clients.
48  */
49 struct PendingMessage
50 {
51   /**
52    * Pointer to next item in the list
53    */
54   struct PendingMessage *next;
55
56   /**
57    * Pointer to previous item in the list
58    */
59   struct PendingMessage *prev;
60
61   /**
62    * Actual message to be sent, allocated at the end of the struct:
63    * // msg = (cast) &pm[1]; 
64    * // memcpy (&pm[1], data, len);
65    */
66   const struct GNUNET_MessageHeader *msg;
67
68 };
69
70
71 /**
72  * Struct containing information about a client,
73  * handle to connect to it, and any pending messages
74  * that need to be sent to it.
75  */
76 struct ClientList
77 {
78   /**
79    * Linked list of active clients
80    */
81   struct ClientList *next;
82
83   /**
84    * Linked list of active clients
85    */
86   struct ClientList *prev;
87
88   /**
89    * The handle to this client
90    */
91   struct GNUNET_SERVER_Client *client_handle;
92
93   /**
94    * Handle to the current transmission request, NULL
95    * if none pending.
96    */
97   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
98
99   /**
100    * Linked list of pending messages for this client
101    */
102   struct PendingMessage *pending_head;
103
104   /**
105    * Tail of linked list of pending messages for this client
106    */
107   struct PendingMessage *pending_tail;
108
109 };
110
111
112 /**
113  * Entry in the DHT routing table for a client's GET request.
114  */
115 struct ClientQueryRecord
116 {
117
118   /**
119    * The key this request was about
120    */
121   GNUNET_HashCode key;
122
123   /**
124    * Client responsible for the request.
125    */
126   struct ClientList *client;
127
128   /**
129    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
130    */
131   const void *xquery;
132
133   /**
134    * Replies we have already seen for this request.
135    */
136   GNUNET_HashCode *seen_replies;
137
138   /**
139    * Pointer to this nodes heap location in the retry-heap (for fast removal)
140    */
141   struct GNUNET_CONTAINER_HeapNode *hnode;
142
143   /**
144    * What's the delay between re-try operations that we currently use for this
145    * request?
146    */
147   struct GNUNET_TIME_Relative retry_frequency;
148
149   /**
150    * What's the next time we should re-try this request?
151    */
152   struct GNUNET_TIME_Absolute retry_time;
153
154   /**
155    * The unique identifier of this request
156    */
157   uint64_t unique_id;
158
159   /**
160    * Number of bytes in xquery.
161    */
162   size_t xquery_size;
163
164   /**
165    * Number of entries in 'seen_replies'.
166    */
167   unsigned int seen_replies_count;
168
169   /**
170    * Desired replication level
171    */
172   uint32_t replication;
173
174   /**
175    * Any message options for this request
176    */
177   uint32_t msg_options;
178
179   /**
180    * The type for the data for the GET request.
181    */
182   enum GNUNET_BLOCK_Type msg_type;
183
184 };
185
186
187 /**
188  * List of active clients.
189  */
190 static struct ClientList *client_head;
191
192 /**
193  * List of active clients.
194  */
195 static struct ClientList *client_tail;
196
197 /**
198  * Hashmap for fast key based lookup, maps keys to 'struct ClientQueryRecord' entries.
199  */
200 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
201
202 /**
203  * Heap with all of our client's request, sorted by retry time (earliest on top).
204  */
205 static struct GNUNET_CONTAINER_Heap *retry_heap;
206
207 /**
208  * Task that re-transmits requests (using retry_heap).
209  */
210 static GNUNET_SCHEDULER_TaskIdentifier retry_task;
211
212
213 /**
214  * Find a client if it exists, add it otherwise.
215  *
216  * @param client the server handle to the client
217  *
218  * @return the client if found, a new client otherwise
219  */
220 static struct ClientList *
221 find_active_client (struct GNUNET_SERVER_Client *client)
222 {
223   struct ClientList *pos = client_list;
224   struct ClientList *ret;
225
226   while (pos != NULL)
227   {
228     if (pos->client_handle == client)
229       return pos;
230     pos = pos->next;
231   }
232   ret = GNUNET_malloc (sizeof (struct ClientList));
233   ret->client_handle = client;
234   GNUNET_CONTAINER_DLL_insert (client_head,
235                                client_tail,
236                                ret);
237   return ret;
238 }
239
240
241 /**
242  * Iterator over hash map entries that frees all entries 
243  * associated with the given client.
244  *
245  * @param cls client to search for in source routes
246  * @param key current key code (ignored)
247  * @param value value in the hash map, a ClientQueryRecord
248  * @return GNUNET_YES (we should continue to iterate)
249  */
250 static int
251 remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
252 {
253   struct ClientList *client = cls;
254   struct ClientQueryRecord *record = value;
255
256   if (record->client != client)
257     return GNUNET_YES;
258   GNUNET_assert (GNUNET_YES ==
259                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
260                                                        key, record));
261   GNUNET_CONTAINER_heap_remove_node (record->hnode);
262   GNUNET_ARRAY_append (record->seen_replies,
263                        record->seen_replies_count,
264                        0);
265   GNUNET_free (record);
266   return GNUNET_YES;
267 }
268
269
270 /**
271  * Functions with this signature are called whenever a client
272  * is disconnected on the network level.
273  *
274  * @param cls closure (NULL for dht)
275  * @param client identification of the client; NULL
276  *        for the last call when the server is destroyed
277  */
278 static void
279 handle_client_disconnect (void *cls, 
280                           struct GNUNET_SERVER_Client *client)
281 {
282   struct ClientList *pos = client_list;
283   struct ClientList *found;
284   struct PendingMessage *reply;
285
286   found = NULL;
287   while (pos != NULL)
288   {
289     if (pos->client_handle == client)
290     {
291       GNUNET_CONTAINER_DLL_remove (client_head,
292                                    client_tail,
293                                    pos);
294       found = pos;
295       break;
296     }
297     pos = pos->next;
298   }
299   if (found == NULL)
300     return;
301   if (found->transmit_handle != NULL)
302     GNUNET_CONNECTION_notify_transmit_ready_cancel (found->transmit_handle);
303   while (NULL != (reply = found->pending_head))
304     {
305       GNUNET_CONTAINER_DLL_remove (found->pending_head, found->pending_tail,
306                                    reply);
307       GNUNET_free (reply);
308     }
309   GNUNET_CONTAINER_multihashmap_iterate (forward_list.hashmap,
310                                          &remove_client_records, found);
311   GNUNET_free (found);
312 }
313
314
315 /**
316  * Route the given request via the DHT.  This includes updating 
317  * the bloom filter and retransmission times, building the P2P
318  * message and initiating the routing operation.
319  */
320 static void
321 transmit_request (struct ClientQueryRecord *cqr)
322 {
323   int32_t reply_bf_mutator;
324
325   reply_bf_mutator = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
326                                                          UINT32_MAX);
327   reply_bf = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
328                                                  cqr->seen_replies,
329                                                  cqr->seen_replies_count);
330   GST_NEIGHBOURS_handle_get (cqr->msg_type,
331                              cqr->msg_options,
332                              cqr->replication,
333                              &cqr->key,
334                              cqr->xquery,
335                              cqr->xquery_size,
336                              reply_bf,
337                              reply_bf_mutator,
338                              NULL /* no peers blocked initially */);
339   GNUNET_CONTAINER_bloomfilter_destroy (reply_bf);
340
341   /* exponential back-off for retries, max 1h */
342   cqr->retry_frequency = 
343     GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_HOURS,
344                               GNUNET_TIME_relative_multiply (cqr->retry_frequency, 2));
345   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
346 }
347
348
349 /**
350  * Task that looks at the 'retry_heap' and transmits all of the requests
351  * on the heap that are ready for transmission.  Then re-schedules
352  * itself (unless the heap is empty).
353  *
354  * @param cls unused
355  * @param tc scheduler context
356  */
357 static void
358 transmit_next_request_task (void *cls,
359                             const struct GNUNET_SCHEDULER_TaskContext *tc)
360 {
361   struct ClientQueryRecord *cqr;
362   struct GNUNET_TIME_Relative delay;
363
364   retry_task = GNUNET_SCHEDULER_NO_TASK;
365   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
366     return;
367   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
368     {
369       cqr->hnode = NULL;
370       delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
371       if (delay.value > 0)
372         {
373           cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
374                                                      cqr->retry_time.abs_value);
375           retry_task = GNUNET_SCHEDULER_add_delayed (delay,
376                                                      &transmit_next_request_task,
377                                                      NULL);
378           return;
379         }
380       transmit_request (cqr);
381     }
382 }
383
384
385 /**
386  * Handler for PUT messages.
387  *
388  * @param cls closure for the service
389  * @param client the client we received this message from
390  * @param message the actual message received
391  */
392 static void
393 handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
394                       const struct GNUNET_MessageHeader *message)
395 {
396   const struct GNUNET_DHT_ClientPutMessage *dht_msg;
397   uint16_t size;
398   
399   size = ntohs (message->size);
400   if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
401     {
402       GNUNET_break (0);
403       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
404     }
405   dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
406   /* give to local clients */
407   GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
408                            &dht_msg->key,
409                            0, NULL,
410                            0, NULL,
411                            ntohl (dht_msg->type),
412                            size - sizeof (struct GNUNET_DHT_ClientPutMessage),
413                            &dht_msg[1]);
414   /* store locally */
415   GST_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
416                             &dht_msg->key,
417                             0, NULL,
418                             ntohl (dht_msg->type),
419                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
420                             &dht_msg[1]);
421   /* route to other peers */
422   GST_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
423                              ntohl (dht_msg->options),
424                              ntohl (dht_msg->desired_replication_level),
425                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
426                              &dht_msg->key,
427                              &dht_msg[1],
428                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
429   GNUNET_SERVER_receive_done (client, GNUNET_OK);
430 }
431
432
433 /**
434  * Handler for any generic DHT messages, calls the appropriate handler
435  * depending on message type, sends confirmation if responses aren't otherwise
436  * expected.
437  *
438  * @param cls closure for the service
439  * @param client the client we received this message from
440  * @param message the actual message received
441  */
442 static void
443 handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
444                       const struct GNUNET_MessageHeader *message)
445 {
446   const struct GNUNET_DHT_ClientGetMessage *get;
447   const struct GNUNET_MessageHeader *enc_msg;
448
449   struct ClientQueryRecord *cqr;
450   size_t xquery_size;
451   const char* xquery;
452   uint16_t size;
453
454   size = ntohs (message->size);
455   if (size < sizeof (struct GNUNET_DHT_ClientGetMessage))
456     {
457       GNUNET_break (0);
458       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
459       return;
460     }
461   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
462   get = (const struct GNUNET_DHT_ClientGetMessage *) message;
463   xquery = (const char*) &get[1];
464
465   
466   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
467   cqr->key = get->key;
468   cqr->client = find_active_client (client);
469   cqr->xquery = (void*) &cqr[1];
470   memcpy (&cqr[1], xquery, xquery_size);
471   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); 
472   cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS;
473   cqr->retry_time = GNUNET_TIME_absolute_get ();
474   cqr->unique_id = get->unique_id;
475   cqr->xquery_size = xquery_size;
476   cqr->replication = ntohl (get->desired_replication_level);
477   cqr->msg_options = ntohl (get->options);
478   cqr->msg_type = ntohl (get->type);  
479   GNUNET_CONTAINER_multihashmap_put (forward_map, KEY, cqr,
480                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
481   /* start remote requests */
482   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
483     GNUNET_SCHEDULER_cancel (retry_task);
484   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
485   /* perform local lookup */
486   GDS_DATACACHE_handle_get (&get->key,
487                             cqr->msg_type,
488                             cqr->xquery,
489                             xquery_size,
490                             NULL, 0);
491   GNUNET_SERVER_receive_done (client, GNUNET_OK);
492 }
493
494
495 /**
496  * Closure for 'remove_by_uid'.
497  */
498 struct RemoveByUidContext
499 {
500   /**
501    * Client that issued the removal request.
502    */
503   struct ClientList *client;
504
505   /**
506    * Unique ID of the request.
507    */
508   uint64_t uid;
509 };
510
511
512 /**
513  * Iterator over hash map entries that frees all entries 
514  * that match the given client and UID.
515  *
516  * @param cls UID and client to search for in source routes
517  * @param key current key code
518  * @param value value in the hash map, a ClientQueryRecord
519  * @return GNUNET_YES (we should continue to iterate)
520  */
521 static int
522 remove_by_uid (void *cls, const GNUNET_HashCode * key, void *value)
523 {
524   const struct RemoveByUidContext *ctx = cls;
525   struct ClientQueryRecord *record = value;
526
527   if (record->uid != ctx->uid)
528     return GNUNET_YES;
529   return remove_client_records (ctx->client, key, record);
530 }
531
532
533 /**
534  * Handler for any generic DHT stop messages, calls the appropriate handler
535  * depending on message type (if processed locally)
536  *
537  * @param cls closure for the service
538  * @param client the client we received this message from
539  * @param message the actual message received
540  *
541  */
542 static void
543 handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
544                            const struct GNUNET_MessageHeader *message)
545 {
546   const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
547     (const struct GNUNET_DHT_ClientGetStopMessage *) message;
548   
549   ctx.client = find_active_client (client);
550   ctx.uid = &dht_stop_msg.unique_id);
551   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
552                                               &dht_stop_msg->key,
553                                               &remove_by_uid,
554                                               &ctx);
555   GNUNET_SERVER_receive_done (client, GNUNET_OK);
556 }
557
558
559 /**
560  * Task run to check for messages that need to be sent to a client.
561  *
562  * @param client a ClientList, containing the client and any messages to be sent to it
563  */
564 static void
565 process_pending_messages (struct ClientList *client);
566
567
568 /**
569  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
570  * request.  A ClientList is passed as closure, take the head of the list
571  * and copy it into buf, which has the result of sending the message to the
572  * client.
573  *
574  * @param cls closure to this call
575  * @param size maximum number of bytes available to send
576  * @param buf where to copy the actual message to
577  *
578  * @return the number of bytes actually copied, 0 indicates failure
579  */
580 static size_t
581 send_reply_to_client (void *cls, size_t size, void *buf)
582 {
583   struct ClientList *client = cls;
584   char *cbuf = buf;
585   struct PendingMessage *reply;
586   size_t off;
587   size_t msize;
588
589   client->transmit_handle = NULL;
590   if (buf == NULL)
591   {
592     /* client disconnected */
593     return 0;
594   }
595   off = 0;
596   while ((NULL != (reply = client->pending_head)) &&
597          (size >= off + (msize = ntohs (reply->msg->size))))
598   {
599     GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
600                                  reply);
601     memcpy (&cbuf[off], reply->msg, msize);
602     GNUNET_free (reply);
603     off += msize;
604   }
605   process_pending_messages (client);
606   return off;
607 }
608
609
610 /**
611  * Task run to check for messages that need to be sent to a client.
612  *
613  * @param client a ClientList, containing the client and any messages to be sent to it
614  */
615 static void
616 process_pending_messages (struct ClientList *client)
617 {
618   if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
619     return;
620   client->transmit_handle =
621       GNUNET_SERVER_notify_transmit_ready (client->client_handle,
622                                            ntohs (client->pending_head->
623                                                   msg->size),
624                                            GNUNET_TIME_UNIT_FOREVER_REL,
625                                            &send_reply_to_client, client);
626 }
627
628
629 /**
630  * Add a PendingMessage to the clients list of messages to be sent
631  *
632  * @param client the active client to send the message to
633  * @param pending_message the actual message to send
634  */
635 static void
636 add_pending_message (struct ClientList *client,
637                      struct PendingMessage *pending_message)
638 {
639   GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
640                                     pending_message);
641   process_pending_messages (client);
642 }
643
644
645 /**
646  * Closure for 'forward_reply'
647  */
648 struct ForwardReplyContext
649 {
650
651   /**
652    * Actual message to send to matching clients. 
653    */
654   struct PendingMessage *pm;
655
656   /**
657    * Embedded payload.
658    */
659   const void *data;
660
661   /**
662    * Type of the data.
663    */
664   enum GNUNET_BLOCK_Type type;
665
666   /**
667    * Number of bytes in data.
668    */
669   size_t data_size;
670
671   /**
672    * Do we need to copy 'pm' because it was already used?
673    */
674   int do_copy;
675
676 };
677
678
679 /**
680  * Iterator over hash map entries that send a given reply to
681  * each of the matching clients.  With some tricky recycling
682  * of the buffer.
683  *
684  * @param cls the 'struct ForwardReplyContext'
685  * @param key current key
686  * @param value value in the hash map, a ClientQueryRecord
687  * @return GNUNET_YES (we should continue to iterate),
688  *         if the result is mal-formed, GNUNET_NO
689  */
690 static int
691 forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
692 {
693   struct ForwardReplyContext *frc = cls;
694   struct ClientQueryRecord *record = value;
695   struct PendingMessage *pm;
696   struct ReplyMessage *reply;
697   enum GNUNET_BLOCK_EvaluationResult eval;
698   int do_free;
699   GNUNET_HashCode ch;
700   unsigned int i;
701   
702   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
703        (record->type != frc->type) )
704     return GNUNET_YES; /* type mismatch */
705   GNUNET_CRYPTO_hash (frc->data,
706                       frc->data_size,
707                       &ch);
708   for (i=0;i<record->seen_replies_count;i++)
709     if (0 == memcmp (&record->seen_replies[i],
710                      &ch,
711                      sizeof (GNUNET_HashCode)))
712       return GNUNET_YES; /* duplicate */             
713   eval =
714     GNUNET_BLOCK_evaluate (GDS_block_context, 
715                            record->type, key, 
716                            NULL, 0,
717                            record->xquery,
718                            record->xquery_size, 
719                            frc->data,
720                            frc->data_size);
721   switch (eval)
722   {
723   case GNUNET_BLOCK_EVALUATION_OK_LAST:
724     do_free = GNUNET_YES;
725     break;
726   case GNUNET_BLOCK_EVALUATION_OK_MORE:
727     GNUNET_ARRAY_append (record->seen_replies,
728                          record->seen_replies_count,
729                          ch);
730     do_free = GNUNET_NO;
731     break;
732   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
733     /* should be impossible to encounter here */
734     GNUNET_break (0);
735     return GNUNET_YES;
736   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
737     GNUNET_break_op (0);
738     return GNUNET_NO;
739   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
740     GNUNET_break (0);
741     return GNUNET_NO;
742   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
743     GNUNET_break (0);
744     return GNUNET_NO;
745   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
746     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
747                 "Unsupported block type (%u) in request!\n",
748                 record->type);
749     return GNUNET_NO;
750   }
751   if (GNUNET_NO == frc->do_copy)
752     {
753       /* first time, we can use the original data */
754       pm = frc->pm; 
755       frc->do_copy = GNUNET_YES;
756     }
757   else
758     {
759       /* two clients waiting for same reply, must copy for queueing */
760       pm = GNUNET_malloc (sizeof (struct PendingMessage) +
761                           ntohs (frc->pm->msg->size));
762       memcpy (pm, frc->pm, 
763               sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
764       pm->next = pm->prev = NULL;
765     }
766   reply = (struct ReplyMessage*) &pm[1];  
767   reply->unique_id = record->unique_id;
768   add_pending_message (record->client, pm);
769   if (GNUNET_YES == do_free)
770     remove_client_records (record->client, key, record);
771   return GNUNET_YES;
772 }
773
774
775 /**
776  * Handle a reply we've received from another peer.  If the reply
777  * matches any of our pending queries, forward it to the respective
778  * client(s).
779  *
780  * @param expiration when will the reply expire
781  * @param key the query this reply is for
782  * @param get_path_length number of peers in 'get_path'
783  * @param get_path path the reply took on get
784  * @param put_path_length number of peers in 'put_path'
785  * @param put_path path the reply took on put
786  * @param type type of the reply
787  * @param data_size number of bytes in 'data'
788  * @param data application payload data
789  */
790 void
791 GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
792                          const GNUNET_HashCode *key,
793                          unsigned int get_path_length,
794                          const struct GNUNET_PeerIdentity *get_path,
795                          unsigned int put_path_length,
796                          const struct GNUNET_PeerIdentity *put_path,
797                          enum GNUNET_BLOCK_Type type,
798                          size_t data_size,
799                          const void *data)
800 {
801   struct ForwardReplyContext frc;
802   struct PendingMessage *pm;
803   struct ReplyMessage *reply;
804   struct GNUNET_PeerIdentity *paths;
805   size_t msize;
806
807   if (NULL ==
808       GNUNET_CONTAINER_multihashmap_get (foward_map, key))
809     return; /* no matching request, fast exit! */
810   msize = sizeof(struct ReplyMessage) + data_size + 
811     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
812   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
813     {
814       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
815                   _("Could not pass reply to client, message too big!\n"));
816       return;
817     }
818   pm = (struct PendingMessage *) GNUNET_malloc (msize + sizeof (struct PendingMessage));
819   reply = (struct ReplyMessage*) &pm[1];
820   pm->msg = &reply->header;
821   reply->header.size = htons ((uint16_t) msize);
822   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
823   reply->type = htonl (type);
824   reply->get_path_length = htonl (get_path_length);
825   reply->put_path_length = htonl (put_path_length);
826   reply->unique_id = 0; /* filled in later */
827   reply->expiration = GNUNET_TIME_absolute_hton (expiration);
828   reply->key = *key;
829   paths = (struct GNUNET_PeerIdentity*) &reply[1];
830   mempcy (paths, get_path, 
831           sizeof (struct GNUNET_PeerIdentity) * get_path_length);
832   mempcy (&paths[get_path_length], 
833           put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
834   memcpy (&paths[get_path_length + put_path_length],
835           data, 
836           data_size);
837   frc.do_copy = GNUNET_NO;
838   frc.pm = pm;
839   frc.data = data;
840   frc.data_size = data_size;
841   frc.type = type;
842   GNUNET_CONTAINER_multihashmap_get_multiple (foward_map, key,
843                                               &forward_reply,
844                                               &frc);
845   if (GNUNET_NO == frc.do_copy)
846     {
847       /* did not match any of the requests, free! */
848       GNUNET_free (buf);
849     }
850 }
851
852
853 /**
854  * Initialize client subsystem.
855  *
856  * @param server the initialized server
857  */
858 void 
859 GDS_CLIENT_init (struct GNUNET_SERVER_Handle *server)
860 {
861   static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
862     {&handle_dht_local_put, NULL, 
863      GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0},
864     {&handle_dht_local_get, NULL, 
865      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
866     {&handle_dht_local_get_stop, NULL,
867      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, 
868      sizeof (struct GNUNET_DHT_StopMessage) },
869     {NULL, NULL, 0, 0}
870   };
871   forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
872   retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
873   GNUNET_SERVER_add_handlers (server, plugin_handlers);
874   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
875 }
876
877
878 /**
879  * Shutdown client subsystem.
880  */
881 void
882 GDS_CLIENT_done ()
883 {
884   GNUNET_assert (client_head == NULL);
885   GNUNET_assert (client_tail == NULL);
886   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
887     {
888       GNUNET_SCHEDULER_cancel (retry_task);
889       retry_task = GNUNET_SCHEDULER_NO_TASK;
890     }
891   GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
892   GNUNET_CONTAINER_heap_destroy (retry_heap);
893   retry_heap = NULL;
894   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
895   GNUNET_CONTAINER_multihashmap_destroy (forward_map);
896   forward_map = NULL;
897 }
898
899 /* end of gnunet-service-dht_clients.c */
900