reduce loop counters to more practical levels
[oweals/gnunet.git] / src / namestore / gnunet-service-namestore.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2014, 2018 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file namestore/gnunet-service-namestore.c
23  * @brief namestore for the GNUnet naming system
24  * @author Matthias Wachs
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - run testcases, make sure everything works!
29  */
30 #include "platform.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_dnsparser_lib.h"
33 #include "gnunet_gns_service.h"
34 #include "gnunet_namecache_service.h"
35 #include "gnunet_namestore_service.h"
36 #include "gnunet_namestore_plugin.h"
37 #include "gnunet_statistics_service.h"
38 #include "gnunet_signatures.h"
39 #include "namestore.h"
40
41 #define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
42
43 /**
44  * If a monitor takes more than 1 minute to process an event, print a warning.
45  */
46 #define MONITOR_STALL_WARN_DELAY GNUNET_TIME_UNIT_MINUTES
47
48
49 /**
50  * A namestore client
51  */
52 struct NamestoreClient;
53
54
55 /**
56  * A namestore iteration operation.
57  */
58 struct ZoneIteration
59 {
60   /**
61    * Next element in the DLL
62    */
63   struct ZoneIteration *next;
64
65   /**
66    * Previous element in the DLL
67    */
68   struct ZoneIteration *prev;
69
70   /**
71    * Namestore client which intiated this zone iteration
72    */
73   struct NamestoreClient *nc;
74
75   /**
76    * The nick to add to the records
77    */
78   struct GNUNET_GNSRECORD_Data *nick;
79
80   /**
81    * Key of the zone we are iterating over.
82    */
83   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
84
85   /**
86    * Last sequence number in the zone iteration used to address next
87    * result of the zone iteration in the store
88    *
89    * Initialy set to 0.
90    * Updated in #zone_iterate_proc()
91    */
92   uint64_t seq;
93
94   /**
95    * The operation id fot the zone iteration in the response for the client
96    */
97   uint32_t request_id;
98
99   /**
100    * Offset of the zone iteration used to address next result of the zone
101    * iteration in the store
102    *
103    * Initialy set to 0 in #handle_iteration_start
104    * Incremented with by every call to #handle_iteration_next
105    */
106   uint32_t offset;
107
108 };
109
110
111 /**
112  * A namestore client
113  */
114 struct NamestoreClient
115 {
116
117   /**
118    * The client
119    */
120   struct GNUNET_SERVICE_Client *client;
121
122   /**
123    * Message queue for transmission to @e client
124    */
125   struct GNUNET_MQ_Handle *mq;
126
127   /**
128    * Head of the DLL of
129    * Zone iteration operations in progress initiated by this client
130    */
131   struct ZoneIteration *op_head;
132
133   /**
134    * Tail of the DLL of
135    * Zone iteration operations in progress initiated by this client
136    */
137   struct ZoneIteration *op_tail;
138 };
139
140
141 /**
142  * A namestore monitor.
143  */
144 struct ZoneMonitor
145 {
146   /**
147    * Next element in the DLL
148    */
149   struct ZoneMonitor *next;
150
151   /**
152    * Previous element in the DLL
153    */
154   struct ZoneMonitor *prev;
155
156   /**
157    * Namestore client which intiated this zone monitor
158    */
159   struct NamestoreClient *nc;
160
161   /**
162    * Private key of the zone.
163    */
164   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
165
166   /**
167    * Task active during initial iteration.
168    */
169   struct GNUNET_SCHEDULER_Task *task;
170
171   /**
172    * Task to warn about slow monitors.
173    */
174   struct GNUNET_SCHEDULER_Task *sa_wait_warning;
175
176   /**
177    * Since when are we blocked on this monitor?
178    */
179   struct GNUNET_TIME_Absolute sa_waiting_start;
180
181   /**
182    * Last sequence number in the zone iteration used to address next
183    * result of the zone iteration in the store
184    *
185    * Initialy set to 0.
186    * Updated in #monitor_iterate_cb()
187    */
188   uint64_t seq;
189
190   /**
191    * Current limit of how many more messages we are allowed
192    * to queue to this monitor.
193    */
194   uint64_t limit;
195
196   /**
197    * How many more requests may we receive from the iterator
198    * before it is at the limit we gave it?  Will be below or
199    * equal to @e limit.  The effective limit for monitor
200    * events is thus @e iteration_cnt - @e limit!
201    */
202   uint64_t iteration_cnt;
203
204   /**
205    * Are we (still) in the initial iteration pass?
206    */
207   int in_first_iteration;
208
209   /**
210    * Is there a store activity waiting for this monitor?  We only raise the
211    * flag when it happens and search the DLL for the store activity when we
212    * had a limit increase.  If we cannot find any waiting store activity at
213    * that time, we clear the flag again.
214    */
215   int sa_waiting;
216
217 };
218
219
220 /**
221  * Pending operation on the namecache.
222  */
223 struct CacheOperation
224 {
225
226   /**
227    * Kept in a DLL.
228    */
229   struct CacheOperation *prev;
230
231   /**
232    * Kept in a DLL.
233    */
234   struct CacheOperation *next;
235
236   /**
237    * Handle to namecache queue.
238    */
239   struct GNUNET_NAMECACHE_QueueEntry *qe;
240
241   /**
242    * Client to notify about the result.
243    */
244   struct NamestoreClient *nc;
245
246   /**
247    * Client's request ID.
248    */
249   uint32_t rid;
250 };
251
252
253 /**
254  * Information for an ongoing #handle_record_store() operation.
255  * Needed as we may wait for monitors to be ready for the notification.
256  */
257 struct StoreActivity
258 {
259   /**
260    * Kept in a DLL.
261    */
262   struct StoreActivity *next;
263
264   /**
265    * Kept in a DLL.
266    */
267   struct StoreActivity *prev;
268
269   /**
270    * Which client triggered the store activity?
271    */
272   struct NamestoreClient *nc;
273
274   /**
275    * Copy of the original store message (as data fields in @e rd will
276    * point into it!).
277    */
278   const struct RecordStoreMessage *rsm;
279
280   /**
281    * Next zone monitor that still needs to be notified about this PUT.
282    */
283   struct ZoneMonitor *zm_pos;
284
285   /**
286    * Label nicely canonicalized (lower case).
287    */
288   char *conv_name;
289
290 };
291
292
293 /**
294  * Public key of all zeros.
295  */
296 static const struct GNUNET_CRYPTO_EcdsaPrivateKey zero;
297
298 /**
299  * Configuration handle.
300  */
301 static const struct GNUNET_CONFIGURATION_Handle *GSN_cfg;
302
303 /**
304  * Handle to the statistics service
305  */
306 static struct GNUNET_STATISTICS_Handle *statistics;
307
308 /**
309  * Namecache handle.
310  */
311 static struct GNUNET_NAMECACHE_Handle *namecache;
312
313 /**
314  * Database handle
315  */
316 static struct GNUNET_NAMESTORE_PluginFunctions *GSN_database;
317
318 /**
319  * Name of the database plugin
320  */
321 static char *db_lib_name;
322
323 /**
324  * Head of cop DLL.
325  */
326 static struct CacheOperation *cop_head;
327
328 /**
329  * Tail of cop DLL.
330  */
331 static struct CacheOperation *cop_tail;
332
333 /**
334  * First active zone monitor.
335  */
336 static struct ZoneMonitor *monitor_head;
337
338 /**
339  * Last active zone monitor.
340  */
341 static struct ZoneMonitor *monitor_tail;
342
343 /**
344  * Head of DLL of monitor-blocked store activities.
345  */
346 static struct StoreActivity *sa_head;
347
348 /**
349  * Tail of DLL of monitor-blocked store activities.
350  */
351 static struct StoreActivity *sa_tail;
352
353 /**
354  * Notification context shared by all monitors.
355  */
356 static struct GNUNET_NotificationContext *monitor_nc;
357
358 /**
359  * Optimize block insertion by caching map of private keys to
360  * public keys in memory?
361  */
362 static int cache_keys;
363
364 /**
365  * Use the namecache? Doing so creates additional cryptographic
366  * operations whenever we touch a record.
367  */
368 static int disable_namecache;
369
370
371 /**
372  * Task run during shutdown.
373  *
374  * @param cls unused
375  */
376 static void
377 cleanup_task (void *cls)
378 {
379   struct CacheOperation *cop;
380
381   (void) cls;
382   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
383               "Stopping namestore service\n");
384   while (NULL != (cop = cop_head))
385   {
386     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
387                 "Aborting incomplete namecache operation\n");
388     GNUNET_NAMECACHE_cancel (cop->qe);
389     GNUNET_CONTAINER_DLL_remove (cop_head,
390                                  cop_tail,
391                                  cop);
392     GNUNET_free (cop);
393   }
394   if (NULL != namecache)
395   {
396     GNUNET_NAMECACHE_disconnect (namecache);
397     namecache = NULL;
398   }
399   GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name,
400                                               GSN_database));
401   GNUNET_free (db_lib_name);
402   db_lib_name = NULL;
403   if (NULL != monitor_nc)
404   {
405     GNUNET_notification_context_destroy (monitor_nc);
406     monitor_nc = NULL;
407   }
408   if (NULL != statistics)
409   {
410     GNUNET_STATISTICS_destroy (statistics,
411                                GNUNET_NO);
412     statistics = NULL;
413   }
414 }
415
416
417 /**
418  * Release memory used by @a sa.
419  *
420  * @param sa activity to free
421  */
422 static void
423 free_store_activity (struct StoreActivity *sa)
424 {
425   GNUNET_CONTAINER_DLL_remove (sa_head,
426                                sa_tail,
427                                sa);
428   GNUNET_free (sa->conv_name);
429   GNUNET_free (sa);
430 }
431
432
433 /**
434  * Function called with the records for the #GNUNET_GNS_EMPTY_LABEL_AT
435  * label in the zone.  Used to locate the #GNUNET_GNSRECORD_TYPE_NICK
436  * record, which (if found) is then copied to @a cls for future use.
437  *
438  * @param cls a `struct GNUNET_GNSRECORD_Data **` for storing the nick (if found)
439  * @param seq sequence number of the record
440  * @param private_key the private key of the zone (unused)
441  * @param label should be #GNUNET_GNS_EMPTY_LABEL_AT
442  * @param rd_count number of records in @a rd
443  * @param rd records stored under @a label in the zone
444  */
445 static void
446 lookup_nick_it (void *cls,
447                 uint64_t seq,
448                 const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
449                 const char *label,
450                 unsigned int rd_count,
451                 const struct GNUNET_GNSRECORD_Data *rd)
452 {
453   struct GNUNET_GNSRECORD_Data **res = cls;
454
455   (void) private_key;
456   (void) seq;
457   if (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT))
458   {
459     GNUNET_break (0);
460     return;
461   }
462   for (unsigned int c = 0; c < rd_count; c++)
463   {
464     if (GNUNET_GNSRECORD_TYPE_NICK == rd[c].record_type)
465     {
466       (*res) = GNUNET_malloc (rd[c].data_size + sizeof (struct GNUNET_GNSRECORD_Data));
467       (*res)->data = &(*res)[1];
468       GNUNET_memcpy ((void *) (*res)->data,
469                      rd[c].data,
470                      rd[c].data_size);
471       (*res)->data_size = rd[c].data_size;
472       (*res)->expiration_time = rd[c].expiration_time;
473       (*res)->flags = rd[c].flags;
474       (*res)->record_type = GNUNET_GNSRECORD_TYPE_NICK;
475       return;
476     }
477   }
478   (*res) = NULL;
479 }
480
481
482 /**
483  * Return the NICK record for the zone (if it exists).
484  *
485  * @param zone private key for the zone to look for nick
486  * @return NULL if no NICK record was found
487  */
488 static struct GNUNET_GNSRECORD_Data *
489 get_nick_record (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone)
490 {
491   struct GNUNET_CRYPTO_EcdsaPublicKey pub;
492   struct GNUNET_GNSRECORD_Data *nick;
493   int res;
494
495   nick = NULL;
496   res = GSN_database->lookup_records (GSN_database->cls,
497                                       zone,
498                                       GNUNET_GNS_EMPTY_LABEL_AT,
499                                       &lookup_nick_it,
500                                       &nick);
501   if ( (GNUNET_OK != res) ||
502        (NULL == nick) )
503   {
504     GNUNET_CRYPTO_ecdsa_key_get_public (zone, &pub);
505     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
506                 "No nick name set for zone `%s'\n",
507                 GNUNET_GNSRECORD_z2s (&pub));
508     return NULL;
509   }
510   return nick;
511 }
512
513
514 /**
515  * Merge the nick record @a nick_rd with the rest of the
516  * record set given in @a rd2.  Store the result in @a rdc_res
517  * and @a rd_res.  The @a nick_rd's expiration time is set to
518  * the maximum expiration time of all of the records in @a rd2.
519  *
520  * @param nick_rd the nick record to integrate
521  * @param rd2_length length of the @a rd2 array
522  * @param rd2 array of records
523  * @param rdc_res[out] length of the resulting @a rd_res array
524  * @param rd_res[out] set to an array of records,
525  *                    including @a nick_rd and @a rd2;
526  *           all of the variable-size 'data' fields in @a rd2 are
527  *           allocated in the same chunk of memory!
528  */
529 static void
530 merge_with_nick_records (const struct GNUNET_GNSRECORD_Data *nick_rd,
531                          unsigned int rd2_length,
532                          const struct GNUNET_GNSRECORD_Data *rd2,
533                          unsigned int *rdc_res,
534                          struct GNUNET_GNSRECORD_Data **rd_res)
535 {
536   uint64_t latest_expiration;
537   size_t req;
538   char *data;
539   size_t data_offset;
540   struct GNUNET_GNSRECORD_Data *target;
541
542   (*rdc_res) = 1 + rd2_length;
543   if (0 == 1 + rd2_length)
544   {
545     GNUNET_break (0);
546     (*rd_res) = NULL;
547     return;
548   }
549   req = sizeof (struct GNUNET_GNSRECORD_Data) + nick_rd->data_size;
550   for (unsigned int i=0; i<rd2_length; i++)
551   {
552     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
553
554     if (req + sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size < req)
555     {
556       GNUNET_break (0);
557       (*rd_res) = NULL;
558       return;
559     }
560     req += sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size;
561   }
562   target = GNUNET_malloc (req);
563   (*rd_res) = target;
564   data = (char *) &target[1 + rd2_length];
565   data_offset = 0;
566   latest_expiration = 0;
567   for (unsigned int i=0;i<rd2_length;i++)
568   {
569     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
570
571     if (0 != (orig->flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
572     {
573       if ((GNUNET_TIME_absolute_get().abs_value_us + orig->expiration_time) >
574           latest_expiration)
575         latest_expiration = orig->expiration_time;
576     }
577     else if (orig->expiration_time > latest_expiration)
578       latest_expiration = orig->expiration_time;
579     target[i] = *orig;
580     target[i].data = (void *) &data[data_offset];
581     GNUNET_memcpy (&data[data_offset],
582                    orig->data,
583                    orig->data_size);
584     data_offset += orig->data_size;
585   }
586   /* append nick */
587   target[rd2_length] = *nick_rd;
588   target[rd2_length].expiration_time = latest_expiration;
589   target[rd2_length].data = (void *) &data[data_offset];
590   GNUNET_memcpy (&data[data_offset],
591                  nick_rd->data,
592                  nick_rd->data_size);
593   data_offset += nick_rd->data_size;
594   GNUNET_assert (req ==
595                  (sizeof (struct GNUNET_GNSRECORD_Data)) * (*rdc_res) + data_offset);
596 }
597
598
599 /**
600  * Generate a `struct LookupNameResponseMessage` and send it to the
601  * given client using the given notification context.
602  *
603  * @param nc client to unicast to
604  * @param request_id request ID to use
605  * @param zone_key zone key of the zone
606  * @param name name
607  * @param rd_count number of records in @a rd
608  * @param rd array of records
609  */
610 static void
611 send_lookup_response (struct NamestoreClient *nc,
612                       uint32_t request_id,
613                       const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
614                       const char *name,
615                       unsigned int rd_count,
616                       const struct GNUNET_GNSRECORD_Data *rd)
617 {
618   struct GNUNET_MQ_Envelope *env;
619   struct RecordResultMessage *zir_msg;
620   struct GNUNET_GNSRECORD_Data *nick;
621   struct GNUNET_GNSRECORD_Data *res;
622   unsigned int res_count;
623   size_t name_len;
624   ssize_t rd_ser_len;
625   char *name_tmp;
626   char *rd_ser;
627
628   nick = get_nick_record (zone_key);
629
630   GNUNET_assert (-1 !=
631                  GNUNET_GNSRECORD_records_get_size (rd_count,
632                                                     rd));
633
634   if ( (NULL != nick) &&
635        (0 != strcmp (name,
636                      GNUNET_GNS_EMPTY_LABEL_AT)))
637   {
638     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
639     merge_with_nick_records (nick,
640                              rd_count,
641                              rd,
642                              &res_count,
643                              &res);
644     GNUNET_free (nick);
645   }
646   else
647   {
648     res_count = rd_count;
649     res = (struct GNUNET_GNSRECORD_Data *) rd;
650   }
651
652   GNUNET_assert (-1 !=
653                  GNUNET_GNSRECORD_records_get_size (res_count,
654                                                     res));
655
656
657   name_len = strlen (name) + 1;
658   rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count,
659                                                   res);
660   if (rd_ser_len < 0)
661   {
662     GNUNET_break (0);
663     GNUNET_SERVICE_client_drop (nc->client);
664     return;
665   }
666   if (rd_ser_len >= UINT16_MAX - name_len - sizeof (*zir_msg))
667   {
668     GNUNET_break (0);
669     GNUNET_SERVICE_client_drop (nc->client);
670     return;
671   }
672   env = GNUNET_MQ_msg_extra (zir_msg,
673                              name_len + rd_ser_len,
674                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
675   zir_msg->gns_header.r_id = htonl (request_id);
676   zir_msg->name_len = htons (name_len);
677   zir_msg->rd_count = htons (res_count);
678   zir_msg->rd_len = htons ((uint16_t) rd_ser_len);
679   zir_msg->private_key = *zone_key;
680   name_tmp = (char *) &zir_msg[1];
681   GNUNET_memcpy (name_tmp,
682                  name,
683                  name_len);
684   rd_ser = &name_tmp[name_len];
685   GNUNET_assert (rd_ser_len ==
686                  GNUNET_GNSRECORD_records_serialize (res_count,
687                                                      res,
688                                                      rd_ser_len,
689                                                      rd_ser));
690   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
691               "Sending RECORD_RESULT message with %u records\n",
692               res_count);
693   GNUNET_STATISTICS_update (statistics,
694                             "Record sets sent to clients",
695                             1,
696                             GNUNET_NO);
697   GNUNET_MQ_send (nc->mq,
698                   env);
699   if (rd != res)
700     GNUNET_free (res);
701 }
702
703
704 /**
705  * Send response to the store request to the client.
706  *
707  * @param client client to talk to
708  * @param res status of the operation
709  * @param rid client's request ID
710  */
711 static void
712 send_store_response (struct NamestoreClient *nc,
713                      int res,
714                      uint32_t rid)
715 {
716   struct GNUNET_MQ_Envelope *env;
717   struct RecordStoreResponseMessage *rcr_msg;
718
719   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720               "Sending RECORD_STORE_RESPONSE message\n");
721   GNUNET_STATISTICS_update (statistics,
722                             "Store requests completed",
723                             1,
724                             GNUNET_NO);
725   env = GNUNET_MQ_msg (rcr_msg,
726                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE_RESPONSE);
727   rcr_msg->gns_header.r_id = htonl (rid);
728   rcr_msg->op_result = htonl (res);
729   GNUNET_MQ_send (nc->mq,
730                   env);
731 }
732
733
734 /**
735  * Cache operation complete, clean up.
736  *
737  * @param cls the `struct CacheOperation`
738  * @param success success
739  * @param emsg error messages
740  */
741 static void
742 finish_cache_operation (void *cls,
743                         int32_t success,
744                         const char *emsg)
745 {
746   struct CacheOperation *cop = cls;
747
748   if (NULL != emsg)
749     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
750                 _("Failed to replicate block in namecache: %s\n"),
751                 emsg);
752   else
753     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
754                 "CACHE operation completed\n");
755   GNUNET_CONTAINER_DLL_remove (cop_head,
756                                cop_tail,
757                                cop);
758   if (NULL != cop->nc)
759     send_store_response (cop->nc,
760                          success,
761                          cop->rid);
762   GNUNET_free (cop);
763 }
764
765
766 /**
767  * We just touched the plaintext information about a name in our zone;
768  * refresh the corresponding (encrypted) block in the namecache.
769  *
770  * @param nc client responsible for the request, can be NULL
771  * @param rid request ID of the client
772  * @param zone_key private key of the zone
773  * @param name label for the records
774  * @param rd_count number of records
775  * @param rd records stored under the given @a name
776  */
777 static void
778 refresh_block (struct NamestoreClient *nc,
779                uint32_t rid,
780                const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
781                const char *name,
782                unsigned int rd_count,
783                const struct GNUNET_GNSRECORD_Data *rd)
784 {
785   struct GNUNET_GNSRECORD_Block *block;
786   struct CacheOperation *cop;
787   struct GNUNET_CRYPTO_EcdsaPublicKey pkey;
788   struct GNUNET_GNSRECORD_Data *nick;
789   struct GNUNET_GNSRECORD_Data *res;
790   unsigned int res_count;
791   struct GNUNET_TIME_Absolute exp_time;
792
793   nick = get_nick_record (zone_key);
794   res_count = rd_count;
795   res = (struct GNUNET_GNSRECORD_Data *) rd; /* fixme: a bit unclean... */
796   if (NULL != nick)
797   {
798     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
799     merge_with_nick_records (nick,
800                              rd_count,rd,
801                              &res_count,
802                              &res);
803     GNUNET_free (nick);
804   }
805   if (0 == res_count)
806   {
807     send_store_response (nc,
808                          GNUNET_OK,
809                          rid);
810     return; /* no data, no need to update cache */
811   }
812   if (GNUNET_YES == disable_namecache)
813   {
814     GNUNET_STATISTICS_update (statistics,
815                               "Namecache updates skipped (NC disabled)",
816                               1,
817                               GNUNET_NO);
818     send_store_response (nc,
819                          GNUNET_OK,
820                          rid);
821     return;
822   }
823   exp_time = GNUNET_GNSRECORD_record_get_expiration_time (res_count,
824                                                           res);
825   if (cache_keys)
826     block = GNUNET_GNSRECORD_block_create2 (zone_key,
827                                             exp_time,
828                                             name,
829                                             res,
830                                             res_count);
831   else
832     block = GNUNET_GNSRECORD_block_create (zone_key,
833                                            exp_time,
834                                            name,
835                                            res,
836                                            res_count);
837   GNUNET_assert (NULL != block);
838   GNUNET_CRYPTO_ecdsa_key_get_public (zone_key,
839                                       &pkey);
840   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
841               "Caching block for label `%s' with %u records and expiration %s in zone `%s' in namecache\n",
842               name,
843               res_count,
844               GNUNET_STRINGS_absolute_time_to_string (exp_time),
845               GNUNET_GNSRECORD_z2s (&pkey));
846   GNUNET_STATISTICS_update (statistics,
847                             "Namecache updates pushed",
848                             1,
849                             GNUNET_NO);
850   cop = GNUNET_new (struct CacheOperation);
851   cop->nc = nc;
852   cop->rid = rid;
853   GNUNET_CONTAINER_DLL_insert (cop_head,
854                                cop_tail,
855                                cop);
856   cop->qe = GNUNET_NAMECACHE_block_cache (namecache,
857                                           block,
858                                           &finish_cache_operation,
859                                           cop);
860   GNUNET_free (block);
861 }
862
863
864 /**
865  * Print a warning that one of our monitors is no longer reacting.
866  *
867  * @param cls a `struct ZoneMonitor` to warn about
868  */
869 static void
870 warn_monitor_slow (void *cls)
871 {
872   struct ZoneMonitor *zm = cls;
873
874   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
875               "No response from monitor since %s\n",
876               GNUNET_STRINGS_absolute_time_to_string (zm->sa_waiting_start));
877   zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
878                                                       &warn_monitor_slow,
879                                                       zm);
880 }
881
882
883 /**
884  * Continue processing the @a sa.
885  *
886  * @param sa store activity to process
887  */
888 static void
889 continue_store_activity (struct StoreActivity *sa)
890 {
891   const struct RecordStoreMessage *rp_msg = sa->rsm;
892   unsigned int rd_count;
893   size_t name_len;
894   size_t rd_ser_len;
895   uint32_t rid;
896   const char *name_tmp;
897   const char *rd_ser;
898
899   rid = ntohl (rp_msg->gns_header.r_id);
900   name_len = ntohs (rp_msg->name_len);
901   rd_count = ntohs (rp_msg->rd_count);
902   rd_ser_len = ntohs (rp_msg->rd_len);
903   name_tmp = (const char *) &rp_msg[1];
904   rd_ser = &name_tmp[name_len];
905   {
906     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
907
908     /* We did this before, must succeed again */
909     GNUNET_assert (GNUNET_OK ==
910                    GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
911                                                          rd_ser,
912                                                          rd_count,
913                                                          rd));
914
915     for (struct ZoneMonitor *zm = sa->zm_pos;
916          NULL != zm;
917          zm = sa->zm_pos)
918     {
919       if ( (0 != memcmp (&rp_msg->private_key,
920                          &zm->zone,
921                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) &&
922            (0 != memcmp (&zm->zone,
923                          &zero,
924                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) )
925         sa->zm_pos = zm->next; /* not interesting to this monitor */
926       if (zm->limit == zm->iteration_cnt)
927       {
928         zm->sa_waiting = GNUNET_YES;
929         zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
930         if (NULL != zm->sa_wait_warning)
931           GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
932         zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
933                                                             &warn_monitor_slow,
934                                                             zm);
935         return; /* blocked on zone monitor */
936       }
937       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938                   "Notifying monitor about changes under label `%s'\n",
939                   sa->conv_name);
940       zm->limit--;
941       send_lookup_response (zm->nc,
942                             0,
943                             &rp_msg->private_key,
944                             sa->conv_name,
945                             rd_count,
946                             rd);
947       sa->zm_pos = zm->next;
948     }
949     /* great, done with the monitors, unpack (again) for refresh_block operation */
950     refresh_block (sa->nc,
951                    rid,
952                    &rp_msg->private_key,
953                    sa->conv_name,
954                    rd_count,
955                    rd);
956   }
957   GNUNET_SERVICE_client_continue (sa->nc->client);
958   free_store_activity (sa);
959 }
960
961
962 /**
963  * Called whenever a client is disconnected.
964  * Frees our resources associated with that client.
965  *
966  * @param cls closure
967  * @param client identification of the client
968  * @param app_ctx the `struct NamestoreClient` of @a client
969  */
970 static void
971 client_disconnect_cb (void *cls,
972                       struct GNUNET_SERVICE_Client *client,
973                       void *app_ctx)
974 {
975   struct NamestoreClient *nc = app_ctx;
976   struct ZoneIteration *no;
977   struct CacheOperation *cop;
978
979   (void) cls;
980   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
981               "Client %p disconnected\n",
982               client);
983   for (struct ZoneMonitor *zm = monitor_head; NULL != zm; zm = zm->next)
984   {
985     struct StoreActivity *san;
986
987     if (nc != zm->nc)
988       continue;
989     GNUNET_CONTAINER_DLL_remove (monitor_head,
990                                  monitor_tail,
991                                  zm);
992     if (NULL != zm->task)
993     {
994       GNUNET_SCHEDULER_cancel (zm->task);
995       zm->task = NULL;
996     }
997     if (NULL != zm->sa_wait_warning)
998     {
999       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1000       zm->sa_wait_warning = NULL;
1001     }
1002     for (struct StoreActivity *sa = sa_head; NULL != sa; sa = san)
1003     {
1004       san = sa->next;
1005       if (zm == sa->zm_pos)
1006       {
1007         sa->zm_pos = zm->next;
1008         /* this may free sa */
1009         continue_store_activity (sa);
1010       }
1011     }
1012     GNUNET_free (zm);
1013     break;
1014   }
1015   for (struct StoreActivity *sa = sa_head; NULL != sa; sa = sa->next)
1016   {
1017     if (sa->nc == nc)
1018     {
1019       /* this may free sa */
1020       free_store_activity (sa);
1021       break; /* there can only be one per nc */
1022     }
1023   }
1024   while (NULL != (no = nc->op_head))
1025   {
1026     GNUNET_CONTAINER_DLL_remove (nc->op_head,
1027                                  nc->op_tail,
1028                                  no);
1029     GNUNET_free (no);
1030   }
1031   for (cop = cop_head; NULL != cop; cop = cop->next)
1032     if (nc == cop->nc)
1033       cop->nc = NULL;
1034   GNUNET_free (nc);
1035 }
1036
1037
1038 /**
1039  * Add a client to our list of active clients.
1040  *
1041  * @param cls NULL
1042  * @param client client to add
1043  * @param mq message queue for @a client
1044  * @return internal namestore client structure for this client
1045  */
1046 static void *
1047 client_connect_cb (void *cls,
1048                    struct GNUNET_SERVICE_Client *client,
1049                    struct GNUNET_MQ_Handle *mq)
1050 {
1051   struct NamestoreClient *nc;
1052
1053   (void) cls;
1054   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1055               "Client %p connected\n",
1056               client);
1057   nc = GNUNET_new (struct NamestoreClient);
1058   nc->client = client;
1059   nc->mq = mq;
1060   return nc;
1061 }
1062
1063
1064 /**
1065  * Closure for #lookup_it().
1066  */
1067 struct RecordLookupContext
1068 {
1069
1070   /**
1071    * FIXME.
1072    */
1073   const char *label;
1074
1075   /**
1076    * FIXME.
1077    */
1078   char *res_rd;
1079
1080   /**
1081    * FIXME.
1082    */
1083   struct GNUNET_GNSRECORD_Data *nick;
1084
1085   /**
1086    * FIXME.
1087    */
1088   int found;
1089
1090   /**
1091    * FIXME.
1092    */
1093   unsigned int res_rd_count;
1094
1095   /**
1096    * FIXME.
1097    */
1098   ssize_t rd_ser_len;
1099 };
1100
1101
1102 /**
1103  * FIXME.
1104  *
1105  * @param seq sequence number of the record
1106  */
1107 static void
1108 lookup_it (void *cls,
1109            uint64_t seq,
1110            const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
1111            const char *label,
1112            unsigned int rd_count,
1113            const struct GNUNET_GNSRECORD_Data *rd)
1114 {
1115   struct RecordLookupContext *rlc = cls;
1116
1117   (void) private_key;
1118   (void) seq;
1119   if (0 != strcmp (label,
1120                    rlc->label))
1121     return;
1122   rlc->found = GNUNET_YES;
1123   if (0 == rd_count)
1124   {
1125     rlc->rd_ser_len = 0;
1126     rlc->res_rd_count = 0;
1127     rlc->res_rd = NULL;
1128     return;
1129   }
1130   if ( (NULL != rlc->nick) &&
1131        (0 != strcmp (label,
1132                      GNUNET_GNS_EMPTY_LABEL_AT)) )
1133   {
1134     /* Merge */
1135     struct GNUNET_GNSRECORD_Data *rd_res;
1136     unsigned int rdc_res;
1137
1138     rd_res = NULL;
1139     rdc_res = 0;
1140     rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
1141     merge_with_nick_records (rlc->nick,
1142                              rd_count,
1143                              rd,
1144                              &rdc_res,
1145                              &rd_res);
1146     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res,
1147                                                          rd_res);
1148     if (rlc->rd_ser_len < 0)
1149     {
1150       GNUNET_break (0);
1151       GNUNET_free  (rd_res);
1152       rlc->found = GNUNET_NO;
1153       rlc->rd_ser_len = 0;
1154       return;
1155     }
1156     rlc->res_rd_count = rdc_res;
1157     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1158     if (rlc->rd_ser_len !=
1159         GNUNET_GNSRECORD_records_serialize (rdc_res,
1160                                             rd_res,
1161                                             rlc->rd_ser_len,
1162                                             rlc->res_rd))
1163     {
1164       GNUNET_break (0);
1165       GNUNET_free  (rlc->res_rd);
1166       rlc->res_rd = NULL;
1167       rlc->res_rd_count = 0;
1168       rlc->rd_ser_len = 0;
1169       GNUNET_free  (rd_res);
1170       rlc->found = GNUNET_NO;
1171       return;
1172     }
1173     GNUNET_free (rd_res);
1174     GNUNET_free (rlc->nick);
1175     rlc->nick = NULL;
1176   }
1177   else
1178   {
1179     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1180                                                          rd);
1181     if (rlc->rd_ser_len < 0)
1182     {
1183       GNUNET_break (0);
1184       rlc->found = GNUNET_NO;
1185       rlc->rd_ser_len = 0;
1186       return;
1187     }
1188     rlc->res_rd_count = rd_count;
1189     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1190     if (rlc->rd_ser_len !=
1191         GNUNET_GNSRECORD_records_serialize (rd_count,
1192                                             rd,
1193                                             rlc->rd_ser_len,
1194                                             rlc->res_rd))
1195     {
1196       GNUNET_break (0);
1197       GNUNET_free  (rlc->res_rd);
1198       rlc->res_rd = NULL;
1199       rlc->res_rd_count = 0;
1200       rlc->rd_ser_len = 0;
1201       rlc->found = GNUNET_NO;
1202       return;
1203     }
1204   }
1205 }
1206
1207
1208 /**
1209  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1210  *
1211  * @param cls client sending the message
1212  * @param ll_msg message of type `struct LabelLookupMessage`
1213  * @return #GNUNET_OK if @a ll_msg is well-formed
1214  */
1215 static int
1216 check_record_lookup (void *cls,
1217                      const struct LabelLookupMessage *ll_msg)
1218 {
1219   uint32_t name_len;
1220   size_t src_size;
1221   const char *name_tmp;
1222
1223   (void) cls;
1224   name_len = ntohl (ll_msg->label_len);
1225   src_size = ntohs (ll_msg->gns_header.header.size);
1226   if (name_len != src_size - sizeof (struct LabelLookupMessage))
1227   {
1228     GNUNET_break (0);
1229     return GNUNET_SYSERR;
1230   }
1231
1232   name_tmp = (const char *) &ll_msg[1];
1233   if ('\0' != name_tmp[name_len -1])
1234   {
1235     GNUNET_break (0);
1236     return GNUNET_SYSERR;
1237   }
1238   return GNUNET_OK;
1239 }
1240
1241
1242 /**
1243  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1244  *
1245  * @param cls client sending the message
1246  * @param ll_msg message of type `struct LabelLookupMessage`
1247  */
1248 static void
1249 handle_record_lookup (void *cls,
1250                       const struct LabelLookupMessage *ll_msg)
1251 {
1252   struct NamestoreClient *nc = cls;
1253   struct GNUNET_MQ_Envelope *env;
1254   struct LabelLookupResponseMessage *llr_msg;
1255   struct RecordLookupContext rlc;
1256   const char *name_tmp;
1257   char *res_name;
1258   char *conv_name;
1259   uint32_t name_len;
1260   int res;
1261
1262   name_len = ntohl (ll_msg->label_len);
1263   name_tmp = (const char *) &ll_msg[1];
1264   GNUNET_SERVICE_client_continue (nc->client);
1265   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266               "Received NAMESTORE_RECORD_LOOKUP message for name `%s'\n",
1267               name_tmp);
1268
1269   conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1270   if (NULL == conv_name)
1271   {
1272     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1273                 "Error converting name `%s'\n",
1274                 name_tmp);
1275     GNUNET_SERVICE_client_drop (nc->client);
1276     return;
1277   }
1278   rlc.label = conv_name;
1279   rlc.found = GNUNET_NO;
1280   rlc.res_rd_count = 0;
1281   rlc.res_rd = NULL;
1282   rlc.rd_ser_len = 0;
1283   rlc.nick = get_nick_record (&ll_msg->zone);
1284   res = GSN_database->lookup_records (GSN_database->cls,
1285                                       &ll_msg->zone,
1286                                       conv_name,
1287                                       &lookup_it,
1288                                       &rlc);
1289   GNUNET_free (conv_name);
1290   env = GNUNET_MQ_msg_extra (llr_msg,
1291                              name_len + rlc.rd_ser_len,
1292                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP_RESPONSE);
1293   llr_msg->gns_header.r_id = ll_msg->gns_header.r_id;
1294   llr_msg->private_key = ll_msg->zone;
1295   llr_msg->name_len = htons (name_len);
1296   llr_msg->rd_count = htons (rlc.res_rd_count);
1297   llr_msg->rd_len = htons (rlc.rd_ser_len);
1298   res_name = (char *) &llr_msg[1];
1299   if  ((GNUNET_YES == rlc.found) && (GNUNET_OK == res))
1300     llr_msg->found = ntohs (GNUNET_YES);
1301   else
1302     llr_msg->found = ntohs (GNUNET_NO);
1303   GNUNET_memcpy (&llr_msg[1],
1304                  name_tmp,
1305                  name_len);
1306   GNUNET_memcpy (&res_name[name_len],
1307                  rlc.res_rd,
1308                  rlc.rd_ser_len);
1309   GNUNET_MQ_send (nc->mq,
1310                   env);
1311   GNUNET_free_non_null (rlc.res_rd);
1312 }
1313
1314
1315 /**
1316  * Checks a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1317  *
1318  * @param cls client sending the message
1319  * @param rp_msg message of type `struct RecordStoreMessage`
1320  * @return #GNUNET_OK if @a rp_msg is well-formed
1321  */
1322 static int
1323 check_record_store (void *cls,
1324                     const struct RecordStoreMessage *rp_msg)
1325 {
1326   size_t name_len;
1327   size_t msg_size;
1328   size_t msg_size_exp;
1329   size_t rd_ser_len;
1330   const char *name_tmp;
1331
1332   (void) cls;
1333   name_len = ntohs (rp_msg->name_len);
1334   msg_size = ntohs (rp_msg->gns_header.header.size);
1335   rd_ser_len = ntohs (rp_msg->rd_len);
1336   msg_size_exp = sizeof (struct RecordStoreMessage) + name_len + rd_ser_len;
1337   if (msg_size != msg_size_exp)
1338   {
1339     GNUNET_break (0);
1340     return GNUNET_SYSERR;
1341   }
1342   if ( (0 == name_len) ||
1343        (name_len > MAX_NAME_LEN) )
1344   {
1345     GNUNET_break (0);
1346     return GNUNET_SYSERR;
1347   }
1348   name_tmp = (const char *) &rp_msg[1];
1349   if ('\0' != name_tmp[name_len -1])
1350   {
1351     GNUNET_break (0);
1352     return GNUNET_SYSERR;
1353   }
1354   return GNUNET_OK;
1355 }
1356
1357
1358 /**
1359  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1360  *
1361  * @param cls client sending the message
1362  * @param rp_msg message of type `struct RecordStoreMessage`
1363  */
1364 static void
1365 handle_record_store (void *cls,
1366                      const struct RecordStoreMessage *rp_msg)
1367 {
1368   struct NamestoreClient *nc = cls;
1369   size_t name_len;
1370   size_t rd_ser_len;
1371   uint32_t rid;
1372   const char *name_tmp;
1373   char *conv_name;
1374   const char *rd_ser;
1375   unsigned int rd_count;
1376   int res;
1377   struct StoreActivity *sa;
1378
1379   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1380               "Received NAMESTORE_RECORD_STORE message\n");
1381   rid = ntohl (rp_msg->gns_header.r_id);
1382   name_len = ntohs (rp_msg->name_len);
1383   rd_count = ntohs (rp_msg->rd_count);
1384   rd_ser_len = ntohs (rp_msg->rd_len);
1385   GNUNET_break (0 == ntohs (rp_msg->reserved));
1386   name_tmp = (const char *) &rp_msg[1];
1387   rd_ser = &name_tmp[name_len];
1388   {
1389     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1390
1391     if (GNUNET_OK !=
1392         GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1393                                               rd_ser,
1394                                               rd_count,
1395                                               rd))
1396     {
1397       GNUNET_break (0);
1398       GNUNET_SERVICE_client_drop (nc->client);
1399       return;
1400     }
1401
1402     /* Extracting and converting private key */
1403     conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1404     if (NULL == conv_name)
1405     {
1406       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1407                   "Error converting name `%s'\n",
1408                   name_tmp);
1409       GNUNET_SERVICE_client_drop (nc->client);
1410       return;
1411     }
1412     GNUNET_STATISTICS_update (statistics,
1413                               "Well-formed store requests received",
1414                               1,
1415                               GNUNET_NO);
1416     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1417                 "Creating %u records for name `%s'\n",
1418                 (unsigned int) rd_count,
1419                 conv_name);
1420     if ( (0 == rd_count) &&
1421          (GNUNET_NO ==
1422           GSN_database->lookup_records (GSN_database->cls,
1423                                         &rp_msg->private_key,
1424                                         conv_name,
1425                                         NULL,
1426                                         0)) )
1427     {
1428       /* This name does not exist, so cannot be removed */
1429       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1430                   "Name `%s' does not exist, no deletion required\n",
1431                   conv_name);
1432       res = GNUNET_NO;
1433     }
1434     else
1435     {
1436       /* remove "NICK" records, unless this is for the
1437          #GNUNET_GNS_EMPTY_LABEL_AT label */
1438       struct GNUNET_GNSRECORD_Data rd_clean[GNUNET_NZL(rd_count)];
1439       unsigned int rd_clean_off;
1440
1441       rd_clean_off = 0;
1442       for (unsigned int i=0;i<rd_count;i++)
1443       {
1444         rd_clean[rd_clean_off] = rd[i];
1445         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1446                            conv_name)) ||
1447              (GNUNET_GNSRECORD_TYPE_NICK != rd[i].record_type) )
1448           rd_clean_off++;
1449       }
1450       res = GSN_database->store_records (GSN_database->cls,
1451                                          &rp_msg->private_key,
1452                                          conv_name,
1453                                          rd_clean_off,
1454                                          rd_clean);
1455     }
1456
1457     if (GNUNET_OK != res)
1458     {
1459       /* store not successful, not need to tell monitors */
1460       send_store_response (nc,
1461                            res,
1462                            rid);
1463       GNUNET_SERVICE_client_continue (nc->client);
1464       GNUNET_free (conv_name);
1465       return;
1466     }
1467
1468     sa = GNUNET_malloc (sizeof (struct StoreActivity) +
1469                         ntohs (rp_msg->gns_header.header.size));
1470     GNUNET_CONTAINER_DLL_insert (sa_head,
1471                                  sa_tail,
1472                                  sa);
1473     sa->nc = nc;
1474     sa->rsm = (const struct RecordStoreMessage *) &sa[1];
1475     GNUNET_memcpy (&sa[1],
1476                    rp_msg,
1477                    ntohs (rp_msg->gns_header.header.size));
1478     sa->zm_pos = monitor_head;
1479     sa->conv_name = conv_name;
1480     continue_store_activity (sa);
1481   }
1482 }
1483
1484
1485 /**
1486  * Context for record remove operations passed from #handle_zone_to_name to
1487  * #handle_zone_to_name_it as closure
1488  */
1489 struct ZoneToNameCtx
1490 {
1491   /**
1492    * Namestore client
1493    */
1494   struct NamestoreClient *nc;
1495
1496   /**
1497    * Request id (to be used in the response to the client).
1498    */
1499   uint32_t rid;
1500
1501   /**
1502    * Set to #GNUNET_OK on success, #GNUNET_SYSERR on error.  Note that
1503    * not finding a name for the zone still counts as a 'success' here,
1504    * as this field is about the success of executing the IPC protocol.
1505    */
1506   int success;
1507 };
1508
1509
1510 /**
1511  * Zone to name iterator
1512  *
1513  * @param cls struct ZoneToNameCtx *
1514  * @param seq sequence number of the record
1515  * @param zone_key the zone key
1516  * @param name name
1517  * @param rd_count number of records in @a rd
1518  * @param rd record data
1519  */
1520 static void
1521 handle_zone_to_name_it (void *cls,
1522                         uint64_t seq,
1523                         const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1524                         const char *name,
1525                         unsigned int rd_count,
1526                         const struct GNUNET_GNSRECORD_Data *rd)
1527 {
1528   struct ZoneToNameCtx *ztn_ctx = cls;
1529   struct GNUNET_MQ_Envelope *env;
1530   struct ZoneToNameResponseMessage *ztnr_msg;
1531   int16_t res;
1532   size_t name_len;
1533   ssize_t rd_ser_len;
1534   size_t msg_size;
1535   char *name_tmp;
1536   char *rd_tmp;
1537
1538   (void) seq;
1539   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1540               "Found result for zone-to-name lookup: `%s'\n",
1541               name);
1542   res = GNUNET_YES;
1543   name_len = (NULL == name) ? 0 : strlen (name) + 1;
1544   rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1545                                                   rd);
1546   if (rd_ser_len < 0)
1547   {
1548     GNUNET_break (0);
1549     ztn_ctx->success = GNUNET_SYSERR;
1550     return;
1551   }
1552   msg_size = sizeof (struct ZoneToNameResponseMessage) + name_len + rd_ser_len;
1553   if (msg_size >= GNUNET_MAX_MESSAGE_SIZE)
1554   {
1555     GNUNET_break (0);
1556     ztn_ctx->success = GNUNET_SYSERR;
1557     return;
1558   }
1559   env = GNUNET_MQ_msg_extra (ztnr_msg,
1560                              name_len + rd_ser_len,
1561                              GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1562   ztnr_msg->gns_header.header.size = htons (msg_size);
1563   ztnr_msg->gns_header.r_id = htonl (ztn_ctx->rid);
1564   ztnr_msg->res = htons (res);
1565   ztnr_msg->rd_len = htons (rd_ser_len);
1566   ztnr_msg->rd_count = htons (rd_count);
1567   ztnr_msg->name_len = htons (name_len);
1568   ztnr_msg->zone = *zone_key;
1569   name_tmp = (char *) &ztnr_msg[1];
1570   GNUNET_memcpy (name_tmp,
1571                  name,
1572                  name_len);
1573   rd_tmp = &name_tmp[name_len];
1574   GNUNET_assert (rd_ser_len ==
1575                  GNUNET_GNSRECORD_records_serialize (rd_count,
1576                                                      rd,
1577                                                      rd_ser_len,
1578                                                      rd_tmp));
1579   ztn_ctx->success = GNUNET_OK;
1580   GNUNET_MQ_send (ztn_ctx->nc->mq,
1581                   env);
1582 }
1583
1584
1585 /**
1586  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME message
1587  *
1588  * @param cls client client sending the message
1589  * @param ztn_msg message of type 'struct ZoneToNameMessage'
1590  */
1591 static void
1592 handle_zone_to_name (void *cls,
1593                      const struct ZoneToNameMessage *ztn_msg)
1594 {
1595   struct NamestoreClient *nc = cls;
1596   struct ZoneToNameCtx ztn_ctx;
1597   struct GNUNET_MQ_Envelope *env;
1598   struct ZoneToNameResponseMessage *ztnr_msg;
1599
1600   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1601               "Received ZONE_TO_NAME message\n");
1602   ztn_ctx.rid = ntohl (ztn_msg->gns_header.r_id);
1603   ztn_ctx.nc = nc;
1604   ztn_ctx.success = GNUNET_NO;
1605   if (GNUNET_SYSERR ==
1606       GSN_database->zone_to_name (GSN_database->cls,
1607                                   &ztn_msg->zone,
1608                                   &ztn_msg->value_zone,
1609                                   &handle_zone_to_name_it, &ztn_ctx))
1610   {
1611     /* internal error, hang up instead of signalling something
1612        that might be wrong */
1613     GNUNET_break (0);
1614     GNUNET_SERVICE_client_drop (nc->client);
1615     return;
1616   }
1617   if (GNUNET_NO == ztn_ctx.success)
1618   {
1619     /* no result found, send empty response */
1620     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1621                 "Found no result for zone-to-name lookup.\n");
1622     env = GNUNET_MQ_msg (ztnr_msg,
1623                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1624     ztnr_msg->gns_header.r_id = ztn_msg->gns_header.r_id;
1625     ztnr_msg->res = htons (GNUNET_NO);
1626     GNUNET_MQ_send (nc->mq,
1627                     env);
1628   }
1629   GNUNET_SERVICE_client_continue (nc->client);
1630 }
1631
1632
1633 /**
1634  * Context for record remove operations passed from
1635  * #run_zone_iteration_round to #zone_iterate_proc as closure
1636  */
1637 struct ZoneIterationProcResult
1638 {
1639   /**
1640    * The zone iteration handle
1641    */
1642   struct ZoneIteration *zi;
1643
1644   /**
1645    * Number of results left to be returned in this iteration.
1646    */
1647   uint64_t limit;
1648
1649 };
1650
1651
1652 /**
1653  * Process results for zone iteration from database
1654  *
1655  * @param cls struct ZoneIterationProcResult
1656  * @param seq sequence number of the record
1657  * @param zone_key the zone key
1658  * @param name name
1659  * @param rd_count number of records for this name
1660  * @param rd record data
1661  */
1662 static void
1663 zone_iterate_proc (void *cls,
1664                    uint64_t seq,
1665                    const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1666                    const char *name,
1667                    unsigned int rd_count,
1668                    const struct GNUNET_GNSRECORD_Data *rd)
1669 {
1670   struct ZoneIterationProcResult *proc = cls;
1671   int do_refresh_block;
1672
1673   if ( (NULL == zone_key) &&
1674        (NULL == name) )
1675   {
1676     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1677                 "Iteration done\n");
1678     return;
1679   }
1680   if ( (NULL == zone_key) ||
1681        (NULL == name) )
1682   {
1683     /* what is this!? should never happen */
1684     GNUNET_break (0);
1685     return;
1686   }
1687   if (0 == proc->limit)
1688   {
1689     /* what is this!? should never happen */
1690     GNUNET_break (0);
1691     return;
1692   }
1693   proc->limit--;
1694   proc->zi->seq = seq;
1695   send_lookup_response (proc->zi->nc,
1696                         proc->zi->request_id,
1697                         zone_key,
1698                         name,
1699                         rd_count,
1700                         rd);
1701   do_refresh_block = GNUNET_NO;
1702   for (unsigned int i=0;i<rd_count;i++)
1703     if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
1704     {
1705       do_refresh_block = GNUNET_YES;
1706       break;
1707     }
1708   if (GNUNET_YES == do_refresh_block)
1709     refresh_block (NULL,
1710                    0,
1711                    zone_key,
1712                    name,
1713                    rd_count,
1714                    rd);
1715 }
1716
1717
1718 /**
1719  * Perform the next round of the zone iteration.
1720  *
1721  * @param zi zone iterator to process
1722  * @param limit number of results to return in one pass
1723  */
1724 static void
1725 run_zone_iteration_round (struct ZoneIteration *zi,
1726                           uint64_t limit)
1727 {
1728   struct ZoneIterationProcResult proc;
1729   struct GNUNET_MQ_Envelope *env;
1730   struct RecordResultMessage *rrm;
1731   struct GNUNET_TIME_Absolute start;
1732   struct GNUNET_TIME_Relative duration;
1733
1734   memset (&proc,
1735           0,
1736           sizeof (proc));
1737   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1738               "Asked to return up to %llu records at position %llu\n",
1739               (unsigned long long) limit,
1740               (unsigned long long) zi->seq);
1741   proc.zi = zi;
1742   proc.limit = limit;
1743   start = GNUNET_TIME_absolute_get ();
1744   GNUNET_break (GNUNET_SYSERR !=
1745                 GSN_database->iterate_records (GSN_database->cls,
1746                                                (0 == memcmp (&zi->zone,
1747                                                              &zero,
1748                                                              sizeof (zero)))
1749                                                ? NULL
1750                                                : &zi->zone,
1751                                                zi->seq,
1752                                                limit,
1753                                                &zone_iterate_proc,
1754                                                &proc));
1755   duration = GNUNET_TIME_absolute_get_duration (start);
1756   duration = GNUNET_TIME_relative_divide (duration,
1757                                           limit - proc.limit);
1758   GNUNET_STATISTICS_set (statistics,
1759                          "NAMESTORE iteration delay (μs/record)",
1760                          duration.rel_value_us,
1761                          GNUNET_NO);
1762   if (0 == proc.limit)
1763   {
1764     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765                 "Returned %llu results, more results available\n",
1766                 (unsigned long long) limit);
1767     return; /* more results later after we get the
1768                #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message */
1769   }
1770   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1771               "Completed iteration after %llu/%llu results\n",
1772               (unsigned long long) (limit - proc.limit),
1773               (unsigned long long) limit);
1774   /* send empty response to indicate end of list */
1775   env = GNUNET_MQ_msg (rrm,
1776                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
1777   rrm->gns_header.r_id = htonl (zi->request_id);
1778   GNUNET_MQ_send (zi->nc->mq,
1779                   env);
1780   GNUNET_CONTAINER_DLL_remove (zi->nc->op_head,
1781                                zi->nc->op_tail,
1782                                zi);
1783   GNUNET_free (zi);
1784 }
1785
1786
1787 /**
1788  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START message
1789  *
1790  * @param cls the client sending the message
1791  * @param zis_msg message from the client
1792  */
1793 static void
1794 handle_iteration_start (void *cls,
1795                         const struct ZoneIterationStartMessage *zis_msg)
1796 {
1797   struct NamestoreClient *nc = cls;
1798   struct ZoneIteration *zi;
1799
1800   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1801               "Received ZONE_ITERATION_START message\n");
1802   zi = GNUNET_new (struct ZoneIteration);
1803   zi->request_id = ntohl (zis_msg->gns_header.r_id);
1804   zi->offset = 0;
1805   zi->nc = nc;
1806   zi->zone = zis_msg->zone;
1807
1808   GNUNET_CONTAINER_DLL_insert (nc->op_head,
1809                                nc->op_tail,
1810                                zi);
1811   run_zone_iteration_round (zi,
1812                             1);
1813   GNUNET_SERVICE_client_continue (nc->client);
1814 }
1815
1816
1817 /**
1818  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP message
1819  *
1820  * @param cls the client sending the message
1821  * @param zis_msg message from the client
1822  */
1823 static void
1824 handle_iteration_stop (void *cls,
1825                        const struct ZoneIterationStopMessage *zis_msg)
1826 {
1827   struct NamestoreClient *nc = cls;
1828   struct ZoneIteration *zi;
1829   uint32_t rid;
1830
1831   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1832               "Received ZONE_ITERATION_STOP message\n");
1833   rid = ntohl (zis_msg->gns_header.r_id);
1834   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1835     if (zi->request_id == rid)
1836       break;
1837   if (NULL == zi)
1838   {
1839     GNUNET_break (0);
1840     GNUNET_SERVICE_client_drop (nc->client);
1841     return;
1842   }
1843   GNUNET_CONTAINER_DLL_remove (nc->op_head,
1844                                nc->op_tail,
1845                                zi);
1846   GNUNET_free (zi);
1847   GNUNET_SERVICE_client_continue (nc->client);
1848 }
1849
1850
1851 /**
1852  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message
1853  *
1854  * @param cls the client sending the message
1855  * @param message message from the client
1856  */
1857 static void
1858 handle_iteration_next (void *cls,
1859                        const struct ZoneIterationNextMessage *zis_msg)
1860 {
1861   struct NamestoreClient *nc = cls;
1862   struct ZoneIteration *zi;
1863   uint32_t rid;
1864   uint64_t limit;
1865
1866   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1867               "Received ZONE_ITERATION_NEXT message\n");
1868   GNUNET_STATISTICS_update (statistics,
1869                             "Iteration NEXT messages received",
1870                             1,
1871                             GNUNET_NO);
1872   rid = ntohl (zis_msg->gns_header.r_id);
1873   limit = GNUNET_ntohll (zis_msg->limit);
1874   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1875     if (zi->request_id == rid)
1876       break;
1877   if (NULL == zi)
1878   {
1879     GNUNET_break (0);
1880     GNUNET_SERVICE_client_drop (nc->client);
1881     return;
1882   }
1883   run_zone_iteration_round (zi,
1884                             limit);
1885   GNUNET_SERVICE_client_continue (nc->client);
1886 }
1887
1888
1889 /**
1890  * Function called when the monitor is ready for more data, and we
1891  * should thus unblock PUT operations that were blocked on the
1892  * monitor not being ready.
1893  */
1894 static void
1895 monitor_unblock (struct ZoneMonitor *zm)
1896 {
1897   struct StoreActivity *sa = sa_head;
1898
1899   while ( (NULL != sa) &&
1900           (zm->limit > zm->iteration_cnt) )
1901   {
1902     struct StoreActivity *sn = sa->next;
1903
1904     if (sa->zm_pos == zm)
1905       continue_store_activity (sa);
1906     sa = sn;
1907   }
1908   if (zm->limit > zm->iteration_cnt)
1909   {
1910     zm->sa_waiting = GNUNET_NO;
1911     if (NULL != zm->sa_wait_warning)
1912     {
1913       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1914       zm->sa_wait_warning = NULL;
1915     }
1916   }
1917   else if (GNUNET_YES == zm->sa_waiting)
1918   {
1919     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
1920     if (NULL != zm->sa_wait_warning)
1921       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1922     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1923                                                         &warn_monitor_slow,
1924                                                         zm);
1925   }
1926 }
1927
1928
1929 /**
1930  * Send 'sync' message to zone monitor, we're now in sync.
1931  *
1932  * @param zm monitor that is now in sync
1933  */
1934 static void
1935 monitor_sync (struct ZoneMonitor *zm)
1936 {
1937   struct GNUNET_MQ_Envelope *env;
1938   struct GNUNET_MessageHeader *sync;
1939
1940   env = GNUNET_MQ_msg (sync,
1941                        GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC);
1942   GNUNET_MQ_send (zm->nc->mq,
1943                   env);
1944   /* mark iteration done */
1945   zm->in_first_iteration = GNUNET_NO;
1946   zm->iteration_cnt = 0;
1947   if ( (zm->limit > 0) &&
1948        (zm->sa_waiting) )
1949     monitor_unblock (zm);
1950 }
1951
1952
1953 /**
1954  * Obtain the next datum during the zone monitor's zone initial iteration.
1955  *
1956  * @param cls zone monitor that does its initial iteration
1957  */
1958 static void
1959 monitor_iteration_next (void *cls);
1960
1961
1962 /**
1963  * A #GNUNET_NAMESTORE_RecordIterator for monitors.
1964  *
1965  * @param cls a 'struct ZoneMonitor *' with information about the monitor
1966  * @param seq sequence number of the record
1967  * @param zone_key zone key of the zone
1968  * @param name name
1969  * @param rd_count number of records in @a rd
1970  * @param rd array of records
1971  */
1972 static void
1973 monitor_iterate_cb (void *cls,
1974                     uint64_t seq,
1975                     const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1976                     const char *name,
1977                     unsigned int rd_count,
1978                     const struct GNUNET_GNSRECORD_Data *rd)
1979 {
1980   struct ZoneMonitor *zm = cls;
1981
1982   zm->seq = seq;
1983   if (NULL == name)
1984   {
1985     /* finished with iteration */
1986     monitor_sync (zm);
1987     return;
1988   }
1989   GNUNET_STATISTICS_update (statistics,
1990                             "Monitor notifications sent",
1991                             1,
1992                             GNUNET_NO);
1993   zm->limit--;
1994   zm->iteration_cnt--;
1995   send_lookup_response (zm->nc,
1996                         0,
1997                         zone_key,
1998                         name,
1999                         rd_count,
2000                         rd);
2001   if ( (0 == zm->iteration_cnt) &&
2002        (0 != zm->limit) )
2003   {
2004     /* We are done with the current iteration batch, AND the
2005        client would right now accept more, so go again! */
2006     GNUNET_assert (NULL == zm->task);
2007     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2008                                          zm);
2009   }
2010 }
2011
2012
2013 /**
2014  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START message
2015  *
2016  * @param cls the client sending the message
2017  * @param zis_msg message from the client
2018  */
2019 static void
2020 handle_monitor_start (void *cls,
2021                       const struct ZoneMonitorStartMessage *zis_msg)
2022 {
2023   struct NamestoreClient *nc = cls;
2024   struct ZoneMonitor *zm;
2025
2026   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2027               "Received ZONE_MONITOR_START message\n");
2028   zm = GNUNET_new (struct ZoneMonitor);
2029   zm->nc = nc;
2030   zm->zone = zis_msg->zone;
2031   zm->limit = 1;
2032   zm->in_first_iteration = (GNUNET_YES == ntohl (zis_msg->iterate_first));
2033   GNUNET_CONTAINER_DLL_insert (monitor_head,
2034                                monitor_tail,
2035                                zm);
2036   GNUNET_SERVICE_client_mark_monitor (nc->client);
2037   GNUNET_SERVICE_client_continue (nc->client);
2038   GNUNET_notification_context_add (monitor_nc,
2039                                    nc->mq);
2040   if (zm->in_first_iteration)
2041     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2042                                          zm);
2043   else
2044     monitor_sync (zm);
2045 }
2046
2047
2048 /**
2049  * Obtain the next datum during the zone monitor's zone initial iteration.
2050  *
2051  * @param cls zone monitor that does its initial iteration
2052  */
2053 static void
2054 monitor_iteration_next (void *cls)
2055 {
2056   struct ZoneMonitor *zm = cls;
2057   int ret;
2058
2059   zm->task = NULL;
2060   GNUNET_assert (0 == zm->iteration_cnt);
2061   if (zm->limit > 16)
2062     zm->iteration_cnt = zm->limit / 2; /* leave half for monitor events */
2063   else
2064     zm->iteration_cnt = zm->limit; /* use it all */
2065   ret = GSN_database->iterate_records (GSN_database->cls,
2066                                        (0 == memcmp (&zm->zone,
2067                                                      &zero,
2068                                                      sizeof (zero)))
2069                                        ? NULL
2070                                        : &zm->zone,
2071                                        zm->seq,
2072                                        zm->iteration_cnt,
2073                                        &monitor_iterate_cb,
2074                                        zm);
2075   if (GNUNET_SYSERR == ret)
2076   {
2077     GNUNET_SERVICE_client_drop (zm->nc->client);
2078     return;
2079   }
2080   if (GNUNET_NO == ret)
2081   {
2082     /* empty zone */
2083     monitor_sync (zm);
2084     return;
2085   }
2086 }
2087
2088
2089 /**
2090  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT message
2091  *
2092  * @param cls the client sending the message
2093  * @param nm message from the client
2094  */
2095 static void
2096 handle_monitor_next (void *cls,
2097                      const struct ZoneMonitorNextMessage *nm)
2098 {
2099   struct NamestoreClient *nc = cls;
2100   struct ZoneMonitor *zm;
2101   uint64_t inc;
2102
2103   inc = GNUNET_ntohll (nm->limit);
2104   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2105               "Received ZONE_MONITOR_NEXT message with limit %llu\n",
2106               (unsigned long long) inc);
2107   for (zm = monitor_head; NULL != zm; zm = zm->next)
2108     if (zm->nc == nc)
2109       break;
2110   if (NULL == zm)
2111   {
2112     GNUNET_break (0);
2113     GNUNET_SERVICE_client_drop (nc->client);
2114     return;
2115   }
2116   GNUNET_SERVICE_client_continue (nc->client);
2117   if (zm->limit + inc < zm->limit)
2118   {
2119     GNUNET_break (0);
2120     GNUNET_SERVICE_client_drop (nc->client);
2121     return;
2122   }
2123   zm->limit += inc;
2124   if ( (zm->in_first_iteration) &&
2125        (zm->limit == inc) )
2126   {
2127     /* We are still iterating, and the previous iteration must
2128        have stopped due to the client's limit, so continue it! */
2129     GNUNET_assert (NULL == zm->task);
2130     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2131                                          zm);
2132   }
2133   GNUNET_assert (zm->iteration_cnt <= zm->limit);
2134   if ( (zm->limit > zm->iteration_cnt) &&
2135        (zm->sa_waiting) )
2136   {
2137     monitor_unblock (zm);
2138   }
2139   else if (GNUNET_YES == zm->sa_waiting)
2140   {
2141     if (NULL != zm->sa_wait_warning)
2142       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2143     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2144     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2145                                                         &warn_monitor_slow,
2146                                                         zm);
2147   }
2148 }
2149
2150
2151 /**
2152  * Process namestore requests.
2153  *
2154  * @param cls closure
2155  * @param cfg configuration to use
2156  * @param service the initialized service
2157  */
2158 static void
2159 run (void *cls,
2160      const struct GNUNET_CONFIGURATION_Handle *cfg,
2161      struct GNUNET_SERVICE_Handle *service)
2162 {
2163   char *database;
2164
2165   (void) cls;
2166   (void) service;
2167   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2168               "Starting namestore service\n");
2169   cache_keys = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2170                                                      "namestore",
2171                                                      "CACHE_KEYS");
2172   disable_namecache = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2173                                                             "namecache",
2174                                                             "DISABLE");
2175   GSN_cfg = cfg;
2176   monitor_nc = GNUNET_notification_context_create (1);
2177   if (GNUNET_YES != disable_namecache)
2178   {
2179     namecache = GNUNET_NAMECACHE_connect (cfg);
2180     GNUNET_assert (NULL != namecache);
2181   }
2182   /* Loading database plugin */
2183   if (GNUNET_OK !=
2184       GNUNET_CONFIGURATION_get_value_string (cfg,
2185                                              "namestore",
2186                                              "database",
2187                                              &database))
2188     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2189                 "No database backend configured\n");
2190
2191   GNUNET_asprintf (&db_lib_name,
2192                    "libgnunet_plugin_namestore_%s",
2193                    database);
2194   GSN_database = GNUNET_PLUGIN_load (db_lib_name,
2195                                      (void *) GSN_cfg);
2196   GNUNET_free (database);
2197   statistics = GNUNET_STATISTICS_create ("namestore",
2198                                          cfg);
2199   GNUNET_SCHEDULER_add_shutdown (&cleanup_task,
2200                                  NULL);
2201   if (NULL == GSN_database)
2202   {
2203     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2204                 "Could not load database backend `%s'\n",
2205                 db_lib_name);
2206     GNUNET_SCHEDULER_shutdown ();
2207     return;
2208   }
2209 }
2210
2211
2212 /**
2213  * Define "main" method using service macro.
2214  */
2215 GNUNET_SERVICE_MAIN
2216 ("namestore",
2217  GNUNET_SERVICE_OPTION_NONE,
2218  &run,
2219  &client_connect_cb,
2220  &client_disconnect_cb,
2221  NULL,
2222  GNUNET_MQ_hd_var_size (record_store,
2223                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE,
2224                         struct RecordStoreMessage,
2225                         NULL),
2226  GNUNET_MQ_hd_var_size (record_lookup,
2227                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP,
2228                         struct LabelLookupMessage,
2229                         NULL),
2230  GNUNET_MQ_hd_fixed_size (zone_to_name,
2231                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME,
2232                           struct ZoneToNameMessage,
2233                           NULL),
2234  GNUNET_MQ_hd_fixed_size (iteration_start,
2235                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START,
2236                           struct ZoneIterationStartMessage,
2237                           NULL),
2238  GNUNET_MQ_hd_fixed_size (iteration_next,
2239                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT,
2240                           struct ZoneIterationNextMessage,
2241                           NULL),
2242  GNUNET_MQ_hd_fixed_size (iteration_stop,
2243                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP,
2244                           struct ZoneIterationStopMessage,
2245                           NULL),
2246  GNUNET_MQ_hd_fixed_size (monitor_start,
2247                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START,
2248                           struct ZoneMonitorStartMessage,
2249                           NULL),
2250  GNUNET_MQ_hd_fixed_size (monitor_next,
2251                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT,
2252                           struct ZoneMonitorNextMessage,
2253                           NULL),
2254  GNUNET_MQ_handler_end ());
2255
2256
2257 /* end of gnunet-service-namestore.c */