fixes
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet-service-dht.h"
32 #include "gnunet-service-dht_clients.h"
33 #include "gnunet-service-dht_datacache.h"
34 #include "gnunet-service-dht_neighbours.h"
35 #include "dht_new.h"
36
37
38 /**
39  * Linked list of messages to send to clients.
40  */
41 struct PendingMessage
42 {
43   /**
44    * Pointer to next item in the list
45    */
46   struct PendingMessage *next;
47
48   /**
49    * Pointer to previous item in the list
50    */
51   struct PendingMessage *prev;
52
53   /**
54    * Actual message to be sent, allocated at the end of the struct:
55    * // msg = (cast) &pm[1]; 
56    * // memcpy (&pm[1], data, len);
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60 };
61
62
63 /**
64  * Struct containing information about a client,
65  * handle to connect to it, and any pending messages
66  * that need to be sent to it.
67  */
68 struct ClientList
69 {
70   /**
71    * Linked list of active clients
72    */
73   struct ClientList *next;
74
75   /**
76    * Linked list of active clients
77    */
78   struct ClientList *prev;
79
80   /**
81    * The handle to this client
82    */
83   struct GNUNET_SERVER_Client *client_handle;
84
85   /**
86    * Handle to the current transmission request, NULL
87    * if none pending.
88    */
89   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
90
91   /**
92    * Linked list of pending messages for this client
93    */
94   struct PendingMessage *pending_head;
95
96   /**
97    * Tail of linked list of pending messages for this client
98    */
99   struct PendingMessage *pending_tail;
100
101 };
102
103
104 /**
105  * Entry in the DHT routing table for a client's GET request.
106  */
107 struct ClientQueryRecord
108 {
109
110   /**
111    * The key this request was about
112    */
113   GNUNET_HashCode key;
114
115   /**
116    * Client responsible for the request.
117    */
118   struct ClientList *client;
119
120   /**
121    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
122    */
123   const void *xquery;
124
125   /**
126    * Replies we have already seen for this request.
127    */
128   GNUNET_HashCode *seen_replies;
129
130   /**
131    * Pointer to this nodes heap location in the retry-heap (for fast removal)
132    */
133   struct GNUNET_CONTAINER_HeapNode *hnode;
134
135   /**
136    * What's the delay between re-try operations that we currently use for this
137    * request?
138    */
139   struct GNUNET_TIME_Relative retry_frequency;
140
141   /**
142    * What's the next time we should re-try this request?
143    */
144   struct GNUNET_TIME_Absolute retry_time;
145
146   /**
147    * The unique identifier of this request
148    */
149   uint64_t unique_id;
150
151   /**
152    * Number of bytes in xquery.
153    */
154   size_t xquery_size;
155
156   /**
157    * Number of entries in 'seen_replies'.
158    */
159   unsigned int seen_replies_count;
160
161   /**
162    * Desired replication level
163    */
164   uint32_t replication;
165
166   /**
167    * Any message options for this request
168    */
169   uint32_t msg_options;
170
171   /**
172    * The type for the data for the GET request.
173    */
174   enum GNUNET_BLOCK_Type type;
175
176 };
177
178
179 /**
180  * List of active clients.
181  */
182 static struct ClientList *client_head;
183
184 /**
185  * List of active clients.
186  */
187 static struct ClientList *client_tail;
188
189 /**
190  * Hashmap for fast key based lookup, maps keys to 'struct ClientQueryRecord' entries.
191  */
192 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
193
194 /**
195  * Heap with all of our client's request, sorted by retry time (earliest on top).
196  */
197 static struct GNUNET_CONTAINER_Heap *retry_heap;
198
199 /**
200  * Task that re-transmits requests (using retry_heap).
201  */
202 static GNUNET_SCHEDULER_TaskIdentifier retry_task;
203
204
205 /**
206  * Find a client if it exists, add it otherwise.
207  *
208  * @param client the server handle to the client
209  *
210  * @return the client if found, a new client otherwise
211  */
212 static struct ClientList *
213 find_active_client (struct GNUNET_SERVER_Client *client)
214 {
215   struct ClientList *pos = client_head;
216   struct ClientList *ret;
217
218   while (pos != NULL)
219   {
220     if (pos->client_handle == client)
221       return pos;
222     pos = pos->next;
223   }
224   ret = GNUNET_malloc (sizeof (struct ClientList));
225   ret->client_handle = client;
226   GNUNET_CONTAINER_DLL_insert (client_head,
227                                client_tail,
228                                ret);
229   return ret;
230 }
231
232
233 /**
234  * Iterator over hash map entries that frees all entries 
235  * associated with the given client.
236  *
237  * @param cls client to search for in source routes
238  * @param key current key code (ignored)
239  * @param value value in the hash map, a ClientQueryRecord
240  * @return GNUNET_YES (we should continue to iterate)
241  */
242 static int
243 remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
244 {
245   struct ClientList *client = cls;
246   struct ClientQueryRecord *record = value;
247
248   if (record->client != client)
249     return GNUNET_YES;
250   GNUNET_assert (GNUNET_YES ==
251                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
252                                                        key, record));
253   if (NULL != record->hnode)
254     GNUNET_CONTAINER_heap_remove_node (record->hnode);
255   GNUNET_array_grow (record->seen_replies,
256                      record->seen_replies_count,
257                      0);
258   GNUNET_free (record);
259   return GNUNET_YES;
260 }
261
262
263 /**
264  * Functions with this signature are called whenever a client
265  * is disconnected on the network level.
266  *
267  * @param cls closure (NULL for dht)
268  * @param client identification of the client; NULL
269  *        for the last call when the server is destroyed
270  */
271 static void
272 handle_client_disconnect (void *cls, 
273                           struct GNUNET_SERVER_Client *client)
274 {
275   struct ClientList *pos;
276   struct PendingMessage *reply;
277
278   pos = find_active_client (client);
279   GNUNET_CONTAINER_DLL_remove (client_head,
280                                client_tail,
281                                pos);
282   if (pos->transmit_handle != NULL)
283     GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle);
284   while (NULL != (reply = pos->pending_head))
285     {
286       GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail,
287                                    reply);
288       GNUNET_free (reply);
289     }
290   GNUNET_CONTAINER_multihashmap_iterate (forward_map,
291                                          &remove_client_records, pos);
292   GNUNET_free (pos);
293 }
294
295
296 /**
297  * Route the given request via the DHT.  This includes updating 
298  * the bloom filter and retransmission times, building the P2P
299  * message and initiating the routing operation.
300  */
301 static void
302 transmit_request (struct ClientQueryRecord *cqr)
303 {
304   int32_t reply_bf_mutator;
305   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
306
307   reply_bf_mutator = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
308                                                          UINT32_MAX);
309   reply_bf = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
310                                                  cqr->seen_replies,
311                                                  cqr->seen_replies_count);
312   GDS_NEIGHBOURS_handle_get (cqr->type,
313                              cqr->msg_options,
314                              cqr->replication,
315                              0 /* hop count */,
316                              &cqr->key,
317                              cqr->xquery,
318                              cqr->xquery_size,
319                              reply_bf,
320                              reply_bf_mutator,
321                              NULL /* no peers blocked initially */);
322   GNUNET_CONTAINER_bloomfilter_free (reply_bf);
323
324   /* exponential back-off for retries, max 1h */
325   cqr->retry_frequency = 
326     GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_HOURS,
327                               GNUNET_TIME_relative_multiply (cqr->retry_frequency, 2));
328   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
329 }
330
331
332 /**
333  * Task that looks at the 'retry_heap' and transmits all of the requests
334  * on the heap that are ready for transmission.  Then re-schedules
335  * itself (unless the heap is empty).
336  *
337  * @param cls unused
338  * @param tc scheduler context
339  */
340 static void
341 transmit_next_request_task (void *cls,
342                             const struct GNUNET_SCHEDULER_TaskContext *tc)
343 {
344   struct ClientQueryRecord *cqr;
345   struct GNUNET_TIME_Relative delay;
346
347   retry_task = GNUNET_SCHEDULER_NO_TASK;
348   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
349     return;
350   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
351     {
352       cqr->hnode = NULL;
353       delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
354       if (delay.rel_value > 0)
355         {
356           cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
357                                                      cqr->retry_time.abs_value);
358           retry_task = GNUNET_SCHEDULER_add_delayed (delay,
359                                                      &transmit_next_request_task,
360                                                      NULL);
361           return;
362         }
363       transmit_request (cqr);
364     }
365 }
366
367
368 /**
369  * Handler for PUT messages.
370  *
371  * @param cls closure for the service
372  * @param client the client we received this message from
373  * @param message the actual message received
374  */
375 static void
376 handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
377                       const struct GNUNET_MessageHeader *message)
378 {
379   const struct GNUNET_DHT_ClientPutMessage *dht_msg;
380   uint16_t size;
381   
382   size = ntohs (message->size);
383   if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
384     {
385       GNUNET_break (0);
386       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
387     }
388   dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
389   /* give to local clients */
390   GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
391                            &dht_msg->key,
392                            0, NULL,
393                            0, NULL,
394                            ntohl (dht_msg->type),
395                            size - sizeof (struct GNUNET_DHT_ClientPutMessage),
396                            &dht_msg[1]);
397   /* store locally */
398   GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
399                             &dht_msg->key,
400                             0, NULL,
401                             ntohl (dht_msg->type),
402                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
403                             &dht_msg[1]);
404   /* route to other peers */
405   GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
406                              ntohl (dht_msg->options),
407                              ntohl (dht_msg->desired_replication_level),
408                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
409                              0 /* hop count */,
410                              NULL /* peer bloom filter */,
411                              &dht_msg->key,
412                              0, NULL,
413                              &dht_msg[1],
414                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
415   GNUNET_SERVER_receive_done (client, GNUNET_OK);
416 }
417
418
419 /**
420  * Handler for any generic DHT messages, calls the appropriate handler
421  * depending on message type, sends confirmation if responses aren't otherwise
422  * expected.
423  *
424  * @param cls closure for the service
425  * @param client the client we received this message from
426  * @param message the actual message received
427  */
428 static void
429 handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
430                       const struct GNUNET_MessageHeader *message)
431 {
432   const struct GNUNET_DHT_ClientGetMessage *get;
433   struct ClientQueryRecord *cqr;
434   size_t xquery_size;
435   const char* xquery;
436   uint16_t size;
437
438   size = ntohs (message->size);
439   if (size < sizeof (struct GNUNET_DHT_ClientGetMessage))
440     {
441       GNUNET_break (0);
442       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
443       return;
444     }
445   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
446   get = (const struct GNUNET_DHT_ClientGetMessage *) message;
447   xquery = (const char*) &get[1];
448
449   
450   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
451   cqr->key = get->key;
452   cqr->client = find_active_client (client);
453   cqr->xquery = (void*) &cqr[1];
454   memcpy (&cqr[1], xquery, xquery_size);
455   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); 
456   cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS;
457   cqr->retry_time = GNUNET_TIME_absolute_get ();
458   cqr->unique_id = get->unique_id;
459   cqr->xquery_size = xquery_size;
460   cqr->replication = ntohl (get->desired_replication_level);
461   cqr->msg_options = ntohl (get->options);
462   cqr->type = ntohl (get->type);  
463   GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
464                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
465   /* start remote requests */
466   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
467     GNUNET_SCHEDULER_cancel (retry_task);
468   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
469   /* perform local lookup */
470   GDS_DATACACHE_handle_get (&get->key,
471                             cqr->type,
472                             cqr->xquery,
473                             xquery_size,
474                             NULL, 0);
475   GNUNET_SERVER_receive_done (client, GNUNET_OK);
476 }
477
478
479 /**
480  * Closure for 'remove_by_unique_id'.
481  */
482 struct RemoveByUniqueIdContext
483 {
484   /**
485    * Client that issued the removal request.
486    */
487   struct ClientList *client;
488
489   /**
490    * Unique ID of the request.
491    */
492   uint64_t unique_id;
493 };
494
495
496 /**
497  * Iterator over hash map entries that frees all entries 
498  * that match the given client and unique ID.
499  *
500  * @param cls unique ID and client to search for in source routes
501  * @param key current key code
502  * @param value value in the hash map, a ClientQueryRecord
503  * @return GNUNET_YES (we should continue to iterate)
504  */
505 static int
506 remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value)
507 {
508   const struct RemoveByUniqueIdContext *ctx = cls;
509   struct ClientQueryRecord *record = value;
510
511   if (record->unique_id != ctx->unique_id)
512     return GNUNET_YES;
513   return remove_client_records (ctx->client, key, record);
514 }
515
516
517 /**
518  * Handler for any generic DHT stop messages, calls the appropriate handler
519  * depending on message type (if processed locally)
520  *
521  * @param cls closure for the service
522  * @param client the client we received this message from
523  * @param message the actual message received
524  *
525  */
526 static void
527 handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
528                            const struct GNUNET_MessageHeader *message)
529 {
530   const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
531     (const struct GNUNET_DHT_ClientGetStopMessage *) message;
532   struct RemoveByUniqueIdContext ctx;
533   
534   ctx.client = find_active_client (client);
535   ctx.unique_id = dht_stop_msg->unique_id;
536   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
537                                               &dht_stop_msg->key,
538                                               &remove_by_unique_id,
539                                               &ctx);
540   GNUNET_SERVER_receive_done (client, GNUNET_OK);
541 }
542
543
544 /**
545  * Task run to check for messages that need to be sent to a client.
546  *
547  * @param client a ClientList, containing the client and any messages to be sent to it
548  */
549 static void
550 process_pending_messages (struct ClientList *client);
551
552
553 /**
554  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
555  * request.  A ClientList is passed as closure, take the head of the list
556  * and copy it into buf, which has the result of sending the message to the
557  * client.
558  *
559  * @param cls closure to this call
560  * @param size maximum number of bytes available to send
561  * @param buf where to copy the actual message to
562  *
563  * @return the number of bytes actually copied, 0 indicates failure
564  */
565 static size_t
566 send_reply_to_client (void *cls, size_t size, void *buf)
567 {
568   struct ClientList *client = cls;
569   char *cbuf = buf;
570   struct PendingMessage *reply;
571   size_t off;
572   size_t msize;
573
574   client->transmit_handle = NULL;
575   if (buf == NULL)
576   {
577     /* client disconnected */
578     return 0;
579   }
580   off = 0;
581   while ((NULL != (reply = client->pending_head)) &&
582          (size >= off + (msize = ntohs (reply->msg->size))))
583   {
584     GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
585                                  reply);
586     memcpy (&cbuf[off], reply->msg, msize);
587     GNUNET_free (reply);
588     off += msize;
589   }
590   process_pending_messages (client);
591   return off;
592 }
593
594
595 /**
596  * Task run to check for messages that need to be sent to a client.
597  *
598  * @param client a ClientList, containing the client and any messages to be sent to it
599  */
600 static void
601 process_pending_messages (struct ClientList *client)
602 {
603   if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
604     return;
605   client->transmit_handle =
606       GNUNET_SERVER_notify_transmit_ready (client->client_handle,
607                                            ntohs (client->pending_head->
608                                                   msg->size),
609                                            GNUNET_TIME_UNIT_FOREVER_REL,
610                                            &send_reply_to_client, client);
611 }
612
613
614 /**
615  * Add a PendingMessage to the clients list of messages to be sent
616  *
617  * @param client the active client to send the message to
618  * @param pending_message the actual message to send
619  */
620 static void
621 add_pending_message (struct ClientList *client,
622                      struct PendingMessage *pending_message)
623 {
624   GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
625                                     pending_message);
626   process_pending_messages (client);
627 }
628
629
630 /**
631  * Closure for 'forward_reply'
632  */
633 struct ForwardReplyContext
634 {
635
636   /**
637    * Actual message to send to matching clients. 
638    */
639   struct PendingMessage *pm;
640
641   /**
642    * Embedded payload.
643    */
644   const void *data;
645
646   /**
647    * Type of the data.
648    */
649   enum GNUNET_BLOCK_Type type;
650
651   /**
652    * Number of bytes in data.
653    */
654   size_t data_size;
655
656   /**
657    * Do we need to copy 'pm' because it was already used?
658    */
659   int do_copy;
660
661 };
662
663
664 /**
665  * Iterator over hash map entries that send a given reply to
666  * each of the matching clients.  With some tricky recycling
667  * of the buffer.
668  *
669  * @param cls the 'struct ForwardReplyContext'
670  * @param key current key
671  * @param value value in the hash map, a ClientQueryRecord
672  * @return GNUNET_YES (we should continue to iterate),
673  *         if the result is mal-formed, GNUNET_NO
674  */
675 static int
676 forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
677 {
678   struct ForwardReplyContext *frc = cls;
679   struct ClientQueryRecord *record = value;
680   struct PendingMessage *pm;
681   struct GNUNET_DHT_ClientResultMessage *reply;
682   enum GNUNET_BLOCK_EvaluationResult eval;
683   int do_free;
684   GNUNET_HashCode ch;
685   unsigned int i;
686   
687   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
688        (record->type != frc->type) )
689     return GNUNET_YES; /* type mismatch */
690   GNUNET_CRYPTO_hash (frc->data,
691                       frc->data_size,
692                       &ch);
693   for (i=0;i<record->seen_replies_count;i++)
694     if (0 == memcmp (&record->seen_replies[i],
695                      &ch,
696                      sizeof (GNUNET_HashCode)))
697       return GNUNET_YES; /* duplicate */             
698   eval =
699     GNUNET_BLOCK_evaluate (GDS_block_context, 
700                            record->type, key, 
701                            NULL, 0,
702                            record->xquery,
703                            record->xquery_size, 
704                            frc->data,
705                            frc->data_size);
706   switch (eval)
707   {
708   case GNUNET_BLOCK_EVALUATION_OK_LAST:
709     do_free = GNUNET_YES;
710     break;
711   case GNUNET_BLOCK_EVALUATION_OK_MORE:
712     GNUNET_array_append (record->seen_replies,
713                          record->seen_replies_count,
714                          ch);
715     do_free = GNUNET_NO;
716     break;
717   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
718     /* should be impossible to encounter here */
719     GNUNET_break (0);
720     return GNUNET_YES;
721   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
722     GNUNET_break_op (0);
723     return GNUNET_NO;
724   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
725     GNUNET_break (0);
726     return GNUNET_NO;
727   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
728     GNUNET_break (0);
729     return GNUNET_NO;
730   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
731     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
732                 "Unsupported block type (%u) in request!\n",
733                 record->type);
734     return GNUNET_NO;
735   }
736   if (GNUNET_NO == frc->do_copy)
737     {
738       /* first time, we can use the original data */
739       pm = frc->pm; 
740       frc->do_copy = GNUNET_YES;
741     }
742   else
743     {
744       /* two clients waiting for same reply, must copy for queueing */
745       pm = GNUNET_malloc (sizeof (struct PendingMessage) +
746                           ntohs (frc->pm->msg->size));
747       memcpy (pm, frc->pm, 
748               sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
749       pm->next = pm->prev = NULL;
750     }
751   reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1];  
752   reply->unique_id = record->unique_id;
753   add_pending_message (record->client, pm);
754   if (GNUNET_YES == do_free)
755     remove_client_records (record->client, key, record);
756   return GNUNET_YES;
757 }
758
759
760 /**
761  * Handle a reply we've received from another peer.  If the reply
762  * matches any of our pending queries, forward it to the respective
763  * client(s).
764  *
765  * @param expiration when will the reply expire
766  * @param key the query this reply is for
767  * @param get_path_length number of peers in 'get_path'
768  * @param get_path path the reply took on get
769  * @param put_path_length number of peers in 'put_path'
770  * @param put_path path the reply took on put
771  * @param type type of the reply
772  * @param data_size number of bytes in 'data'
773  * @param data application payload data
774  */
775 void
776 GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
777                          const GNUNET_HashCode *key,
778                          unsigned int get_path_length,
779                          const struct GNUNET_PeerIdentity *get_path,
780                          unsigned int put_path_length,
781                          const struct GNUNET_PeerIdentity *put_path,
782                          enum GNUNET_BLOCK_Type type,
783                          size_t data_size,
784                          const void *data)
785 {
786   struct ForwardReplyContext frc;
787   struct PendingMessage *pm;
788   struct GNUNET_DHT_ClientResultMessage *reply;
789   struct GNUNET_PeerIdentity *paths;
790   size_t msize;
791
792   if (NULL ==
793       GNUNET_CONTAINER_multihashmap_get (forward_map, key))
794     return; /* no matching request, fast exit! */
795   msize = sizeof(struct GNUNET_DHT_ClientResultMessage) + data_size + 
796     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
797   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
798     {
799       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
800                   _("Could not pass reply to client, message too big!\n"));
801       return;
802     }
803   pm = (struct PendingMessage *) GNUNET_malloc (msize + sizeof (struct PendingMessage));
804   reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1];
805   pm->msg = &reply->header;
806   reply->header.size = htons ((uint16_t) msize);
807   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
808   reply->type = htonl (type);
809   reply->get_path_length = htonl (get_path_length);
810   reply->put_path_length = htonl (put_path_length);
811   reply->unique_id = 0; /* filled in later */
812   reply->expiration = GNUNET_TIME_absolute_hton (expiration);
813   reply->key = *key;
814   paths = (struct GNUNET_PeerIdentity*) &reply[1];
815   memcpy (paths, get_path, 
816           sizeof (struct GNUNET_PeerIdentity) * get_path_length);
817   memcpy (&paths[get_path_length], 
818           put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
819   memcpy (&paths[get_path_length + put_path_length],
820           data, 
821           data_size);
822   frc.do_copy = GNUNET_NO;
823   frc.pm = pm;
824   frc.data = data;
825   frc.data_size = data_size;
826   frc.type = type;
827   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, key,
828                                               &forward_reply,
829                                               &frc);
830   if (GNUNET_NO == frc.do_copy)
831     {
832       /* did not match any of the requests, free! */
833       GNUNET_free (pm);
834     }
835 }
836
837
838 /**
839  * Initialize client subsystem.
840  *
841  * @param server the initialized server
842  */
843 void 
844 GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
845 {
846   static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
847     {&handle_dht_local_put, NULL, 
848      GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0},
849     {&handle_dht_local_get, NULL, 
850      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
851     {&handle_dht_local_get_stop, NULL,
852      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, 
853      sizeof (struct GNUNET_DHT_ClientGetStopMessage) },
854     {NULL, NULL, 0, 0}
855   };
856   forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
857   retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
858   GNUNET_SERVER_add_handlers (server, plugin_handlers);
859   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
860 }
861
862
863 /**
864  * Shutdown client subsystem.
865  */
866 void
867 GDS_CLIENTS_done ()
868 {
869   GNUNET_assert (client_head == NULL);
870   GNUNET_assert (client_tail == NULL);
871   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
872     {
873       GNUNET_SCHEDULER_cancel (retry_task);
874       retry_task = GNUNET_SCHEDULER_NO_TASK;
875     }
876   GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
877   GNUNET_CONTAINER_heap_destroy (retry_heap);
878   retry_heap = NULL;
879   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
880   GNUNET_CONTAINER_multihashmap_destroy (forward_map);
881   forward_map = NULL;
882 }
883
884 /* end of gnunet-service-dht_clients.c */
885