154d8d98ee1d8a365808e498ce8741c696cf49f6
[oweals/gnunet.git] / src / dht / gnunet-service-dht_clients.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dht/gnunet-service-dht_clients.c
23  * @brief GNUnet DHT service's client management code
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27
28 #include "platform.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet-service-dht.h"
32 #include "gnunet-service-dht_clients.h"
33 #include "gnunet-service-dht_datacache.h"
34 #include "gnunet-service-dht_neighbours.h"
35 #include "dht_new.h"
36
37
38 /**
39  * Linked list of messages to send to clients.
40  */
41 struct PendingMessage
42 {
43   /**
44    * Pointer to next item in the list
45    */
46   struct PendingMessage *next;
47
48   /**
49    * Pointer to previous item in the list
50    */
51   struct PendingMessage *prev;
52
53   /**
54    * Actual message to be sent, allocated at the end of the struct:
55    * // msg = (cast) &pm[1]; 
56    * // memcpy (&pm[1], data, len);
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60 };
61
62
63 /**
64  * Struct containing information about a client,
65  * handle to connect to it, and any pending messages
66  * that need to be sent to it.
67  */
68 struct ClientList
69 {
70   /**
71    * Linked list of active clients
72    */
73   struct ClientList *next;
74
75   /**
76    * Linked list of active clients
77    */
78   struct ClientList *prev;
79
80   /**
81    * The handle to this client
82    */
83   struct GNUNET_SERVER_Client *client_handle;
84
85   /**
86    * Handle to the current transmission request, NULL
87    * if none pending.
88    */
89   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
90
91   /**
92    * Linked list of pending messages for this client
93    */
94   struct PendingMessage *pending_head;
95
96   /**
97    * Tail of linked list of pending messages for this client
98    */
99   struct PendingMessage *pending_tail;
100
101 };
102
103
104 /**
105  * Entry in the DHT routing table for a client's GET request.
106  */
107 struct ClientQueryRecord
108 {
109
110   /**
111    * The key this request was about
112    */
113   GNUNET_HashCode key;
114
115   /**
116    * Client responsible for the request.
117    */
118   struct ClientList *client;
119
120   /**
121    * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
122    */
123   const void *xquery;
124
125   /**
126    * Replies we have already seen for this request.
127    */
128   GNUNET_HashCode *seen_replies;
129
130   /**
131    * Pointer to this nodes heap location in the retry-heap (for fast removal)
132    */
133   struct GNUNET_CONTAINER_HeapNode *hnode;
134
135   /**
136    * What's the delay between re-try operations that we currently use for this
137    * request?
138    */
139   struct GNUNET_TIME_Relative retry_frequency;
140
141   /**
142    * What's the next time we should re-try this request?
143    */
144   struct GNUNET_TIME_Absolute retry_time;
145
146   /**
147    * The unique identifier of this request
148    */
149   uint64_t unique_id;
150
151   /**
152    * Number of bytes in xquery.
153    */
154   size_t xquery_size;
155
156   /**
157    * Number of entries in 'seen_replies'.
158    */
159   unsigned int seen_replies_count;
160
161   /**
162    * Desired replication level
163    */
164   uint32_t replication;
165
166   /**
167    * Any message options for this request
168    */
169   uint32_t msg_options;
170
171   /**
172    * The type for the data for the GET request.
173    */
174   enum GNUNET_BLOCK_Type type;
175
176 };
177
178
179 /**
180  * List of active clients.
181  */
182 static struct ClientList *client_head;
183
184 /**
185  * List of active clients.
186  */
187 static struct ClientList *client_tail;
188
189 /**
190  * Hashmap for fast key based lookup, maps keys to 'struct ClientQueryRecord' entries.
191  */
192 static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
193
194 /**
195  * Heap with all of our client's request, sorted by retry time (earliest on top).
196  */
197 static struct GNUNET_CONTAINER_Heap *retry_heap;
198
199 /**
200  * Task that re-transmits requests (using retry_heap).
201  */
202 static GNUNET_SCHEDULER_TaskIdentifier retry_task;
203
204
205 /**
206  * Find a client if it exists, add it otherwise.
207  *
208  * @param client the server handle to the client
209  *
210  * @return the client if found, a new client otherwise
211  */
212 static struct ClientList *
213 find_active_client (struct GNUNET_SERVER_Client *client)
214 {
215   struct ClientList *pos = client_head;
216   struct ClientList *ret;
217
218   while (pos != NULL)
219   {
220     if (pos->client_handle == client)
221       return pos;
222     pos = pos->next;
223   }
224   ret = GNUNET_malloc (sizeof (struct ClientList));
225   ret->client_handle = client;
226   GNUNET_CONTAINER_DLL_insert (client_head,
227                                client_tail,
228                                ret);
229   return ret;
230 }
231
232
233 /**
234  * Iterator over hash map entries that frees all entries 
235  * associated with the given client.
236  *
237  * @param cls client to search for in source routes
238  * @param key current key code (ignored)
239  * @param value value in the hash map, a ClientQueryRecord
240  * @return GNUNET_YES (we should continue to iterate)
241  */
242 static int
243 remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
244 {
245   struct ClientList *client = cls;
246   struct ClientQueryRecord *record = value;
247
248   if (record->client != client)
249     return GNUNET_YES;
250   GNUNET_assert (GNUNET_YES ==
251                  GNUNET_CONTAINER_multihashmap_remove (forward_map,
252                                                        key, record));
253   GNUNET_CONTAINER_heap_remove_node (record->hnode);
254   GNUNET_array_append (record->seen_replies,
255                        record->seen_replies_count,
256                        *key);
257   GNUNET_free (record);
258   return GNUNET_YES;
259 }
260
261
262 /**
263  * Functions with this signature are called whenever a client
264  * is disconnected on the network level.
265  *
266  * @param cls closure (NULL for dht)
267  * @param client identification of the client; NULL
268  *        for the last call when the server is destroyed
269  */
270 static void
271 handle_client_disconnect (void *cls, 
272                           struct GNUNET_SERVER_Client *client)
273 {
274   struct ClientList *pos;
275   struct PendingMessage *reply;
276
277   pos = find_active_client (client);
278   GNUNET_CONTAINER_DLL_remove (client_head,
279                                client_tail,
280                                pos);
281   if (pos->transmit_handle != NULL)
282     GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle);
283   while (NULL != (reply = pos->pending_head))
284     {
285       GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail,
286                                    reply);
287       GNUNET_free (reply);
288     }
289   GNUNET_CONTAINER_multihashmap_iterate (forward_map,
290                                          &remove_client_records, pos);
291   GNUNET_free (pos);
292 }
293
294
295 /**
296  * Route the given request via the DHT.  This includes updating 
297  * the bloom filter and retransmission times, building the P2P
298  * message and initiating the routing operation.
299  */
300 static void
301 transmit_request (struct ClientQueryRecord *cqr)
302 {
303   int32_t reply_bf_mutator;
304   struct GNUNET_CONTAINER_BloomFilter *reply_bf;
305
306   reply_bf_mutator = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
307                                                          UINT32_MAX);
308   reply_bf = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
309                                                  cqr->seen_replies,
310                                                  cqr->seen_replies_count);
311   GDS_NEIGHBOURS_handle_get (cqr->type,
312                              cqr->msg_options,
313                              cqr->replication,
314                              0 /* hop count */,
315                              &cqr->key,
316                              cqr->xquery,
317                              cqr->xquery_size,
318                              reply_bf,
319                              reply_bf_mutator,
320                              NULL /* no peers blocked initially */);
321   GNUNET_CONTAINER_bloomfilter_free (reply_bf);
322
323   /* exponential back-off for retries, max 1h */
324   cqr->retry_frequency = 
325     GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_HOURS,
326                               GNUNET_TIME_relative_multiply (cqr->retry_frequency, 2));
327   cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
328 }
329
330
331 /**
332  * Task that looks at the 'retry_heap' and transmits all of the requests
333  * on the heap that are ready for transmission.  Then re-schedules
334  * itself (unless the heap is empty).
335  *
336  * @param cls unused
337  * @param tc scheduler context
338  */
339 static void
340 transmit_next_request_task (void *cls,
341                             const struct GNUNET_SCHEDULER_TaskContext *tc)
342 {
343   struct ClientQueryRecord *cqr;
344   struct GNUNET_TIME_Relative delay;
345
346   retry_task = GNUNET_SCHEDULER_NO_TASK;
347   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
348     return;
349   while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
350     {
351       cqr->hnode = NULL;
352       delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
353       if (delay.rel_value > 0)
354         {
355           cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
356                                                      cqr->retry_time.abs_value);
357           retry_task = GNUNET_SCHEDULER_add_delayed (delay,
358                                                      &transmit_next_request_task,
359                                                      NULL);
360           return;
361         }
362       transmit_request (cqr);
363     }
364 }
365
366
367 /**
368  * Handler for PUT messages.
369  *
370  * @param cls closure for the service
371  * @param client the client we received this message from
372  * @param message the actual message received
373  */
374 static void
375 handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
376                       const struct GNUNET_MessageHeader *message)
377 {
378   const struct GNUNET_DHT_ClientPutMessage *dht_msg;
379   uint16_t size;
380   
381   size = ntohs (message->size);
382   if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
383     {
384       GNUNET_break (0);
385       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
386     }
387   dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
388   /* give to local clients */
389   GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
390                            &dht_msg->key,
391                            0, NULL,
392                            0, NULL,
393                            ntohl (dht_msg->type),
394                            size - sizeof (struct GNUNET_DHT_ClientPutMessage),
395                            &dht_msg[1]);
396   /* store locally */
397   GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
398                             &dht_msg->key,
399                             0, NULL,
400                             ntohl (dht_msg->type),
401                             size - sizeof (struct GNUNET_DHT_ClientPutMessage),
402                             &dht_msg[1]);
403   /* route to other peers */
404   GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
405                              ntohl (dht_msg->options),
406                              ntohl (dht_msg->desired_replication_level),
407                              GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
408                              0 /* hop count */,
409                              NULL /* peer bloom filter */,
410                              &dht_msg->key,
411                              0, NULL,
412                              &dht_msg[1],
413                              size - sizeof (struct GNUNET_DHT_ClientPutMessage));
414   GNUNET_SERVER_receive_done (client, GNUNET_OK);
415 }
416
417
418 /**
419  * Handler for any generic DHT messages, calls the appropriate handler
420  * depending on message type, sends confirmation if responses aren't otherwise
421  * expected.
422  *
423  * @param cls closure for the service
424  * @param client the client we received this message from
425  * @param message the actual message received
426  */
427 static void
428 handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
429                       const struct GNUNET_MessageHeader *message)
430 {
431   const struct GNUNET_DHT_ClientGetMessage *get;
432   struct ClientQueryRecord *cqr;
433   size_t xquery_size;
434   const char* xquery;
435   uint16_t size;
436
437   size = ntohs (message->size);
438   if (size < sizeof (struct GNUNET_DHT_ClientGetMessage))
439     {
440       GNUNET_break (0);
441       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
442       return;
443     }
444   xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
445   get = (const struct GNUNET_DHT_ClientGetMessage *) message;
446   xquery = (const char*) &get[1];
447
448   
449   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
450   cqr->key = get->key;
451   cqr->client = find_active_client (client);
452   cqr->xquery = (void*) &cqr[1];
453   memcpy (&cqr[1], xquery, xquery_size);
454   cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); 
455   cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS;
456   cqr->retry_time = GNUNET_TIME_absolute_get ();
457   cqr->unique_id = get->unique_id;
458   cqr->xquery_size = xquery_size;
459   cqr->replication = ntohl (get->desired_replication_level);
460   cqr->msg_options = ntohl (get->options);
461   cqr->type = ntohl (get->type);  
462   GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
463                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
464   /* start remote requests */
465   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
466     GNUNET_SCHEDULER_cancel (retry_task);
467   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
468   /* perform local lookup */
469   GDS_DATACACHE_handle_get (&get->key,
470                             cqr->type,
471                             cqr->xquery,
472                             xquery_size,
473                             NULL, 0);
474   GNUNET_SERVER_receive_done (client, GNUNET_OK);
475 }
476
477
478 /**
479  * Closure for 'remove_by_unique_id'.
480  */
481 struct RemoveByUniqueIdContext
482 {
483   /**
484    * Client that issued the removal request.
485    */
486   struct ClientList *client;
487
488   /**
489    * Unique ID of the request.
490    */
491   uint64_t unique_id;
492 };
493
494
495 /**
496  * Iterator over hash map entries that frees all entries 
497  * that match the given client and unique ID.
498  *
499  * @param cls unique ID and client to search for in source routes
500  * @param key current key code
501  * @param value value in the hash map, a ClientQueryRecord
502  * @return GNUNET_YES (we should continue to iterate)
503  */
504 static int
505 remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value)
506 {
507   const struct RemoveByUniqueIdContext *ctx = cls;
508   struct ClientQueryRecord *record = value;
509
510   if (record->unique_id != ctx->unique_id)
511     return GNUNET_YES;
512   return remove_client_records (ctx->client, key, record);
513 }
514
515
516 /**
517  * Handler for any generic DHT stop messages, calls the appropriate handler
518  * depending on message type (if processed locally)
519  *
520  * @param cls closure for the service
521  * @param client the client we received this message from
522  * @param message the actual message received
523  *
524  */
525 static void
526 handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
527                            const struct GNUNET_MessageHeader *message)
528 {
529   const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
530     (const struct GNUNET_DHT_ClientGetStopMessage *) message;
531   struct RemoveByUniqueIdContext ctx;
532   
533   ctx.client = find_active_client (client);
534   ctx.unique_id = dht_stop_msg->unique_id;
535   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
536                                               &dht_stop_msg->key,
537                                               &remove_by_unique_id,
538                                               &ctx);
539   GNUNET_SERVER_receive_done (client, GNUNET_OK);
540 }
541
542
543 /**
544  * Task run to check for messages that need to be sent to a client.
545  *
546  * @param client a ClientList, containing the client and any messages to be sent to it
547  */
548 static void
549 process_pending_messages (struct ClientList *client);
550
551
552 /**
553  * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
554  * request.  A ClientList is passed as closure, take the head of the list
555  * and copy it into buf, which has the result of sending the message to the
556  * client.
557  *
558  * @param cls closure to this call
559  * @param size maximum number of bytes available to send
560  * @param buf where to copy the actual message to
561  *
562  * @return the number of bytes actually copied, 0 indicates failure
563  */
564 static size_t
565 send_reply_to_client (void *cls, size_t size, void *buf)
566 {
567   struct ClientList *client = cls;
568   char *cbuf = buf;
569   struct PendingMessage *reply;
570   size_t off;
571   size_t msize;
572
573   client->transmit_handle = NULL;
574   if (buf == NULL)
575   {
576     /* client disconnected */
577     return 0;
578   }
579   off = 0;
580   while ((NULL != (reply = client->pending_head)) &&
581          (size >= off + (msize = ntohs (reply->msg->size))))
582   {
583     GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
584                                  reply);
585     memcpy (&cbuf[off], reply->msg, msize);
586     GNUNET_free (reply);
587     off += msize;
588   }
589   process_pending_messages (client);
590   return off;
591 }
592
593
594 /**
595  * Task run to check for messages that need to be sent to a client.
596  *
597  * @param client a ClientList, containing the client and any messages to be sent to it
598  */
599 static void
600 process_pending_messages (struct ClientList *client)
601 {
602   if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
603     return;
604   client->transmit_handle =
605       GNUNET_SERVER_notify_transmit_ready (client->client_handle,
606                                            ntohs (client->pending_head->
607                                                   msg->size),
608                                            GNUNET_TIME_UNIT_FOREVER_REL,
609                                            &send_reply_to_client, client);
610 }
611
612
613 /**
614  * Add a PendingMessage to the clients list of messages to be sent
615  *
616  * @param client the active client to send the message to
617  * @param pending_message the actual message to send
618  */
619 static void
620 add_pending_message (struct ClientList *client,
621                      struct PendingMessage *pending_message)
622 {
623   GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
624                                     pending_message);
625   process_pending_messages (client);
626 }
627
628
629 /**
630  * Closure for 'forward_reply'
631  */
632 struct ForwardReplyContext
633 {
634
635   /**
636    * Actual message to send to matching clients. 
637    */
638   struct PendingMessage *pm;
639
640   /**
641    * Embedded payload.
642    */
643   const void *data;
644
645   /**
646    * Type of the data.
647    */
648   enum GNUNET_BLOCK_Type type;
649
650   /**
651    * Number of bytes in data.
652    */
653   size_t data_size;
654
655   /**
656    * Do we need to copy 'pm' because it was already used?
657    */
658   int do_copy;
659
660 };
661
662
663 /**
664  * Iterator over hash map entries that send a given reply to
665  * each of the matching clients.  With some tricky recycling
666  * of the buffer.
667  *
668  * @param cls the 'struct ForwardReplyContext'
669  * @param key current key
670  * @param value value in the hash map, a ClientQueryRecord
671  * @return GNUNET_YES (we should continue to iterate),
672  *         if the result is mal-formed, GNUNET_NO
673  */
674 static int
675 forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
676 {
677   struct ForwardReplyContext *frc = cls;
678   struct ClientQueryRecord *record = value;
679   struct PendingMessage *pm;
680   struct GNUNET_DHT_ClientResultMessage *reply;
681   enum GNUNET_BLOCK_EvaluationResult eval;
682   int do_free;
683   GNUNET_HashCode ch;
684   unsigned int i;
685   
686   if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
687        (record->type != frc->type) )
688     return GNUNET_YES; /* type mismatch */
689   GNUNET_CRYPTO_hash (frc->data,
690                       frc->data_size,
691                       &ch);
692   for (i=0;i<record->seen_replies_count;i++)
693     if (0 == memcmp (&record->seen_replies[i],
694                      &ch,
695                      sizeof (GNUNET_HashCode)))
696       return GNUNET_YES; /* duplicate */             
697   eval =
698     GNUNET_BLOCK_evaluate (GDS_block_context, 
699                            record->type, key, 
700                            NULL, 0,
701                            record->xquery,
702                            record->xquery_size, 
703                            frc->data,
704                            frc->data_size);
705   switch (eval)
706   {
707   case GNUNET_BLOCK_EVALUATION_OK_LAST:
708     do_free = GNUNET_YES;
709     break;
710   case GNUNET_BLOCK_EVALUATION_OK_MORE:
711     GNUNET_array_append (record->seen_replies,
712                          record->seen_replies_count,
713                          ch);
714     do_free = GNUNET_NO;
715     break;
716   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
717     /* should be impossible to encounter here */
718     GNUNET_break (0);
719     return GNUNET_YES;
720   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
721     GNUNET_break_op (0);
722     return GNUNET_NO;
723   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
724     GNUNET_break (0);
725     return GNUNET_NO;
726   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
727     GNUNET_break (0);
728     return GNUNET_NO;
729   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
730     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
731                 "Unsupported block type (%u) in request!\n",
732                 record->type);
733     return GNUNET_NO;
734   }
735   if (GNUNET_NO == frc->do_copy)
736     {
737       /* first time, we can use the original data */
738       pm = frc->pm; 
739       frc->do_copy = GNUNET_YES;
740     }
741   else
742     {
743       /* two clients waiting for same reply, must copy for queueing */
744       pm = GNUNET_malloc (sizeof (struct PendingMessage) +
745                           ntohs (frc->pm->msg->size));
746       memcpy (pm, frc->pm, 
747               sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
748       pm->next = pm->prev = NULL;
749     }
750   reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1];  
751   reply->unique_id = record->unique_id;
752   add_pending_message (record->client, pm);
753   if (GNUNET_YES == do_free)
754     remove_client_records (record->client, key, record);
755   return GNUNET_YES;
756 }
757
758
759 /**
760  * Handle a reply we've received from another peer.  If the reply
761  * matches any of our pending queries, forward it to the respective
762  * client(s).
763  *
764  * @param expiration when will the reply expire
765  * @param key the query this reply is for
766  * @param get_path_length number of peers in 'get_path'
767  * @param get_path path the reply took on get
768  * @param put_path_length number of peers in 'put_path'
769  * @param put_path path the reply took on put
770  * @param type type of the reply
771  * @param data_size number of bytes in 'data'
772  * @param data application payload data
773  */
774 void
775 GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
776                          const GNUNET_HashCode *key,
777                          unsigned int get_path_length,
778                          const struct GNUNET_PeerIdentity *get_path,
779                          unsigned int put_path_length,
780                          const struct GNUNET_PeerIdentity *put_path,
781                          enum GNUNET_BLOCK_Type type,
782                          size_t data_size,
783                          const void *data)
784 {
785   struct ForwardReplyContext frc;
786   struct PendingMessage *pm;
787   struct GNUNET_DHT_ClientResultMessage *reply;
788   struct GNUNET_PeerIdentity *paths;
789   size_t msize;
790
791   if (NULL ==
792       GNUNET_CONTAINER_multihashmap_get (forward_map, key))
793     return; /* no matching request, fast exit! */
794   msize = sizeof(struct GNUNET_DHT_ClientResultMessage) + data_size + 
795     (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
796   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
797     {
798       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
799                   _("Could not pass reply to client, message too big!\n"));
800       return;
801     }
802   pm = (struct PendingMessage *) GNUNET_malloc (msize + sizeof (struct PendingMessage));
803   reply = (struct GNUNET_DHT_ClientResultMessage*) &pm[1];
804   pm->msg = &reply->header;
805   reply->header.size = htons ((uint16_t) msize);
806   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
807   reply->type = htonl (type);
808   reply->get_path_length = htonl (get_path_length);
809   reply->put_path_length = htonl (put_path_length);
810   reply->unique_id = 0; /* filled in later */
811   reply->expiration = GNUNET_TIME_absolute_hton (expiration);
812   reply->key = *key;
813   paths = (struct GNUNET_PeerIdentity*) &reply[1];
814   memcpy (paths, get_path, 
815           sizeof (struct GNUNET_PeerIdentity) * get_path_length);
816   memcpy (&paths[get_path_length], 
817           put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
818   memcpy (&paths[get_path_length + put_path_length],
819           data, 
820           data_size);
821   frc.do_copy = GNUNET_NO;
822   frc.pm = pm;
823   frc.data = data;
824   frc.data_size = data_size;
825   frc.type = type;
826   GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, key,
827                                               &forward_reply,
828                                               &frc);
829   if (GNUNET_NO == frc.do_copy)
830     {
831       /* did not match any of the requests, free! */
832       GNUNET_free (pm);
833     }
834 }
835
836
837 /**
838  * Initialize client subsystem.
839  *
840  * @param server the initialized server
841  */
842 void 
843 GDS_CLIENT_init (struct GNUNET_SERVER_Handle *server)
844 {
845   static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
846     {&handle_dht_local_put, NULL, 
847      GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0},
848     {&handle_dht_local_get, NULL, 
849      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
850     {&handle_dht_local_get_stop, NULL,
851      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, 
852      sizeof (struct GNUNET_DHT_ClientGetStopMessage) },
853     {NULL, NULL, 0, 0}
854   };
855   forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
856   retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
857   GNUNET_SERVER_add_handlers (server, plugin_handlers);
858   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
859 }
860
861
862 /**
863  * Shutdown client subsystem.
864  */
865 void
866 GDS_CLIENT_done ()
867 {
868   GNUNET_assert (client_head == NULL);
869   GNUNET_assert (client_tail == NULL);
870   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
871     {
872       GNUNET_SCHEDULER_cancel (retry_task);
873       retry_task = GNUNET_SCHEDULER_NO_TASK;
874     }
875   GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
876   GNUNET_CONTAINER_heap_destroy (retry_heap);
877   retry_heap = NULL;
878   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
879   GNUNET_CONTAINER_multihashmap_destroy (forward_map);
880   forward_map = NULL;
881 }
882
883 /* end of gnunet-service-dht_clients.c */
884