todo on perf
[oweals/gnunet.git] / src / namestore / gnunet-service-namestore.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file namestore/gnunet-service-namestore.c
21  * @brief namestore for the GNUnet naming system
22  * @author Matthias Wachs
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - "get_nick_record" is a bottleneck, introduce a cache to
27  *   avoid looking it up again and again (for the same few
28  *   zones that the user will typically manage!)
29  * - run testcases, make sure everything works!
30  */
31 #include "platform.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_dnsparser_lib.h"
34 #include "gnunet_gns_service.h"
35 #include "gnunet_namecache_service.h"
36 #include "gnunet_namestore_service.h"
37 #include "gnunet_namestore_plugin.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_signatures.h"
40 #include "namestore.h"
41
42 #define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
43
44 /**
45  * If a monitor takes more than 1 minute to process an event, print a warning.
46  */
47 #define MONITOR_STALL_WARN_DELAY GNUNET_TIME_UNIT_MINUTES
48
49
50 /**
51  * A namestore client
52  */
53 struct NamestoreClient;
54
55
56 /**
57  * A namestore iteration operation.
58  */
59 struct ZoneIteration
60 {
61   /**
62    * Next element in the DLL
63    */
64   struct ZoneIteration *next;
65
66   /**
67    * Previous element in the DLL
68    */
69   struct ZoneIteration *prev;
70
71   /**
72    * Namestore client which intiated this zone iteration
73    */
74   struct NamestoreClient *nc;
75
76   /**
77    * The nick to add to the records
78    */
79   struct GNUNET_GNSRECORD_Data *nick;
80
81   /**
82    * Key of the zone we are iterating over.
83    */
84   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
85
86   /**
87    * Last sequence number in the zone iteration used to address next
88    * result of the zone iteration in the store
89    *
90    * Initialy set to 0.
91    * Updated in #zone_iterate_proc()
92    */
93   uint64_t seq;
94
95   /**
96    * The operation id fot the zone iteration in the response for the client
97    */
98   uint32_t request_id;
99
100   /**
101    * Offset of the zone iteration used to address next result of the zone
102    * iteration in the store
103    *
104    * Initialy set to 0 in #handle_iteration_start
105    * Incremented with by every call to #handle_iteration_next
106    */
107   uint32_t offset;
108
109 };
110
111
112 /**
113  * A namestore client
114  */
115 struct NamestoreClient
116 {
117
118   /**
119    * The client
120    */
121   struct GNUNET_SERVICE_Client *client;
122
123   /**
124    * Message queue for transmission to @e client
125    */
126   struct GNUNET_MQ_Handle *mq;
127
128   /**
129    * Head of the DLL of
130    * Zone iteration operations in progress initiated by this client
131    */
132   struct ZoneIteration *op_head;
133
134   /**
135    * Tail of the DLL of
136    * Zone iteration operations in progress initiated by this client
137    */
138   struct ZoneIteration *op_tail;
139 };
140
141
142 /**
143  * A namestore monitor.
144  */
145 struct ZoneMonitor
146 {
147   /**
148    * Next element in the DLL
149    */
150   struct ZoneMonitor *next;
151
152   /**
153    * Previous element in the DLL
154    */
155   struct ZoneMonitor *prev;
156
157   /**
158    * Namestore client which intiated this zone monitor
159    */
160   struct NamestoreClient *nc;
161
162   /**
163    * Private key of the zone.
164    */
165   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
166
167   /**
168    * Task active during initial iteration.
169    */
170   struct GNUNET_SCHEDULER_Task *task;
171
172   /**
173    * Task to warn about slow monitors.
174    */
175   struct GNUNET_SCHEDULER_Task *sa_wait_warning;
176
177   /**
178    * Since when are we blocked on this monitor?
179    */
180   struct GNUNET_TIME_Absolute sa_waiting_start;
181
182   /**
183    * Last sequence number in the zone iteration used to address next
184    * result of the zone iteration in the store
185    *
186    * Initialy set to 0.
187    * Updated in #monitor_iterate_cb()
188    */
189   uint64_t seq;
190
191   /**
192    * Current limit of how many more messages we are allowed
193    * to queue to this monitor.
194    */
195   uint64_t limit;
196
197   /**
198    * How many more requests may we receive from the iterator
199    * before it is at the limit we gave it?  Will be below or
200    * equal to @e limit.  The effective limit for monitor
201    * events is thus @e iteration_cnt - @e limit!
202    */
203   uint64_t iteration_cnt;
204
205   /**
206    * Are we (still) in the initial iteration pass?
207    */
208   int in_first_iteration;
209
210   /**
211    * Is there a store activity waiting for this monitor?  We only raise the
212    * flag when it happens and search the DLL for the store activity when we
213    * had a limit increase.  If we cannot find any waiting store activity at
214    * that time, we clear the flag again.
215    */
216   int sa_waiting;
217
218 };
219
220
221 /**
222  * Pending operation on the namecache.
223  */
224 struct CacheOperation
225 {
226
227   /**
228    * Kept in a DLL.
229    */
230   struct CacheOperation *prev;
231
232   /**
233    * Kept in a DLL.
234    */
235   struct CacheOperation *next;
236
237   /**
238    * Handle to namecache queue.
239    */
240   struct GNUNET_NAMECACHE_QueueEntry *qe;
241
242   /**
243    * Client to notify about the result.
244    */
245   struct NamestoreClient *nc;
246
247   /**
248    * Client's request ID.
249    */
250   uint32_t rid;
251 };
252
253
254 /**
255  * Information for an ongoing #handle_record_store() operation.
256  * Needed as we may wait for monitors to be ready for the notification.
257  */
258 struct StoreActivity
259 {
260   /**
261    * Kept in a DLL.
262    */
263   struct StoreActivity *next;
264
265   /**
266    * Kept in a DLL.
267    */
268   struct StoreActivity *prev;
269
270   /**
271    * Which client triggered the store activity?
272    */
273   struct NamestoreClient *nc;
274
275   /**
276    * Copy of the original store message (as data fields in @e rd will
277    * point into it!).
278    */
279   const struct RecordStoreMessage *rsm;
280
281   /**
282    * Next zone monitor that still needs to be notified about this PUT.
283    */
284   struct ZoneMonitor *zm_pos;
285
286   /**
287    * Label nicely canonicalized (lower case).
288    */
289   char *conv_name;
290
291 };
292
293
294 /**
295  * Public key of all zeros.
296  */
297 static const struct GNUNET_CRYPTO_EcdsaPrivateKey zero;
298
299 /**
300  * Configuration handle.
301  */
302 static const struct GNUNET_CONFIGURATION_Handle *GSN_cfg;
303
304 /**
305  * Handle to the statistics service
306  */
307 static struct GNUNET_STATISTICS_Handle *statistics;
308
309 /**
310  * Namecache handle.
311  */
312 static struct GNUNET_NAMECACHE_Handle *namecache;
313
314 /**
315  * Database handle
316  */
317 static struct GNUNET_NAMESTORE_PluginFunctions *GSN_database;
318
319 /**
320  * Name of the database plugin
321  */
322 static char *db_lib_name;
323
324 /**
325  * Head of cop DLL.
326  */
327 static struct CacheOperation *cop_head;
328
329 /**
330  * Tail of cop DLL.
331  */
332 static struct CacheOperation *cop_tail;
333
334 /**
335  * First active zone monitor.
336  */
337 static struct ZoneMonitor *monitor_head;
338
339 /**
340  * Last active zone monitor.
341  */
342 static struct ZoneMonitor *monitor_tail;
343
344 /**
345  * Head of DLL of monitor-blocked store activities.
346  */
347 static struct StoreActivity *sa_head;
348
349 /**
350  * Tail of DLL of monitor-blocked store activities.
351  */
352 static struct StoreActivity *sa_tail;
353
354 /**
355  * Notification context shared by all monitors.
356  */
357 static struct GNUNET_NotificationContext *monitor_nc;
358
359 /**
360  * Optimize block insertion by caching map of private keys to
361  * public keys in memory?
362  */
363 static int cache_keys;
364
365 /**
366  * Use the namecache? Doing so creates additional cryptographic
367  * operations whenever we touch a record.
368  */
369 static int disable_namecache;
370
371
372 /**
373  * Task run during shutdown.
374  *
375  * @param cls unused
376  */
377 static void
378 cleanup_task (void *cls)
379 {
380   struct CacheOperation *cop;
381
382   (void) cls;
383   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384               "Stopping namestore service\n");
385   while (NULL != (cop = cop_head))
386   {
387     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
388                 "Aborting incomplete namecache operation\n");
389     GNUNET_NAMECACHE_cancel (cop->qe);
390     GNUNET_CONTAINER_DLL_remove (cop_head,
391                                  cop_tail,
392                                  cop);
393     GNUNET_free (cop);
394   }
395   if (NULL != namecache)
396   {
397     GNUNET_NAMECACHE_disconnect (namecache);
398     namecache = NULL;
399   }
400   GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name,
401                                               GSN_database));
402   GNUNET_free (db_lib_name);
403   db_lib_name = NULL;
404   if (NULL != monitor_nc)
405   {
406     GNUNET_notification_context_destroy (monitor_nc);
407     monitor_nc = NULL;
408   }
409   if (NULL != statistics)
410   {
411     GNUNET_STATISTICS_destroy (statistics,
412                                GNUNET_NO);
413     statistics = NULL;
414   }
415 }
416
417
418 /**
419  * Release memory used by @a sa.
420  *
421  * @param sa activity to free
422  */
423 static void
424 free_store_activity (struct StoreActivity *sa)
425 {
426   GNUNET_CONTAINER_DLL_remove (sa_head,
427                                sa_tail,
428                                sa);
429   GNUNET_free (sa->conv_name);
430   GNUNET_free (sa);
431 }
432
433
434 /**
435  * Function called with the records for the #GNUNET_GNS_EMPTY_LABEL_AT
436  * label in the zone.  Used to locate the #GNUNET_GNSRECORD_TYPE_NICK
437  * record, which (if found) is then copied to @a cls for future use.
438  *
439  * @param cls a `struct GNUNET_GNSRECORD_Data **` for storing the nick (if found)
440  * @param seq sequence number of the record
441  * @param private_key the private key of the zone (unused)
442  * @param label should be #GNUNET_GNS_EMPTY_LABEL_AT
443  * @param rd_count number of records in @a rd
444  * @param rd records stored under @a label in the zone
445  */
446 static void
447 lookup_nick_it (void *cls,
448                 uint64_t seq,
449                 const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
450                 const char *label,
451                 unsigned int rd_count,
452                 const struct GNUNET_GNSRECORD_Data *rd)
453 {
454   struct GNUNET_GNSRECORD_Data **res = cls;
455
456   (void) private_key;
457   (void) seq;
458   if (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT))
459   {
460     GNUNET_break (0);
461     return;
462   }
463   for (unsigned int c = 0; c < rd_count; c++)
464   {
465     if (GNUNET_GNSRECORD_TYPE_NICK == rd[c].record_type)
466     {
467       (*res) = GNUNET_malloc (rd[c].data_size + sizeof (struct GNUNET_GNSRECORD_Data));
468       (*res)->data = &(*res)[1];
469       GNUNET_memcpy ((void *) (*res)->data,
470                      rd[c].data,
471                      rd[c].data_size);
472       (*res)->data_size = rd[c].data_size;
473       (*res)->expiration_time = rd[c].expiration_time;
474       (*res)->flags = rd[c].flags;
475       (*res)->record_type = GNUNET_GNSRECORD_TYPE_NICK;
476       return;
477     }
478   }
479   (*res) = NULL;
480 }
481
482
483 /**
484  * Return the NICK record for the zone (if it exists).
485  *
486  * @param zone private key for the zone to look for nick
487  * @return NULL if no NICK record was found
488  */
489 static struct GNUNET_GNSRECORD_Data *
490 get_nick_record (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone)
491 {
492   struct GNUNET_CRYPTO_EcdsaPublicKey pub;
493   struct GNUNET_GNSRECORD_Data *nick;
494   int res;
495
496   nick = NULL;
497   res = GSN_database->lookup_records (GSN_database->cls,
498                                       zone,
499                                       GNUNET_GNS_EMPTY_LABEL_AT,
500                                       &lookup_nick_it,
501                                       &nick);
502   if ( (GNUNET_OK != res) ||
503        (NULL == nick) )
504   {
505     GNUNET_CRYPTO_ecdsa_key_get_public (zone, &pub);
506     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
507                 "No nick name set for zone `%s'\n",
508                 GNUNET_GNSRECORD_z2s (&pub));
509     return NULL;
510   }
511   return nick;
512 }
513
514
515 /**
516  * Merge the nick record @a nick_rd with the rest of the
517  * record set given in @a rd2.  Store the result in @a rdc_res
518  * and @a rd_res.  The @a nick_rd's expiration time is set to
519  * the maximum expiration time of all of the records in @a rd2.
520  *
521  * @param nick_rd the nick record to integrate
522  * @param rd2_length length of the @a rd2 array
523  * @param rd2 array of records
524  * @param rdc_res[out] length of the resulting @a rd_res array
525  * @param rd_res[out] set to an array of records,
526  *                    including @a nick_rd and @a rd2;
527  *           all of the variable-size 'data' fields in @a rd2 are
528  *           allocated in the same chunk of memory!
529  */
530 static void
531 merge_with_nick_records (const struct GNUNET_GNSRECORD_Data *nick_rd,
532                          unsigned int rd2_length,
533                          const struct GNUNET_GNSRECORD_Data *rd2,
534                          unsigned int *rdc_res,
535                          struct GNUNET_GNSRECORD_Data **rd_res)
536 {
537   uint64_t latest_expiration;
538   size_t req;
539   char *data;
540   size_t data_offset;
541   struct GNUNET_GNSRECORD_Data *target;
542
543   (*rdc_res) = 1 + rd2_length;
544   if (0 == 1 + rd2_length)
545   {
546     GNUNET_break (0);
547     (*rd_res) = NULL;
548     return;
549   }
550   req = sizeof (struct GNUNET_GNSRECORD_Data) + nick_rd->data_size;
551   for (unsigned int i=0; i<rd2_length; i++)
552   {
553     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
554
555     if (req + sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size < req)
556     {
557       GNUNET_break (0);
558       (*rd_res) = NULL;
559       return;
560     }
561     req += sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size;
562   }
563   target = GNUNET_malloc (req);
564   (*rd_res) = target;
565   data = (char *) &target[1 + rd2_length];
566   data_offset = 0;
567   latest_expiration = 0;
568   for (unsigned int i=0;i<rd2_length;i++)
569   {
570     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
571
572     if (0 != (orig->flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
573     {
574       if ((GNUNET_TIME_absolute_get().abs_value_us + orig->expiration_time) >
575           latest_expiration)
576         latest_expiration = orig->expiration_time;
577     }
578     else if (orig->expiration_time > latest_expiration)
579       latest_expiration = orig->expiration_time;
580     target[i] = *orig;
581     target[i].data = (void *) &data[data_offset];
582     GNUNET_memcpy (&data[data_offset],
583                    orig->data,
584                    orig->data_size);
585     data_offset += orig->data_size;
586   }
587   /* append nick */
588   target[rd2_length] = *nick_rd;
589   target[rd2_length].expiration_time = latest_expiration;
590   target[rd2_length].data = (void *) &data[data_offset];
591   GNUNET_memcpy (&data[data_offset],
592                  nick_rd->data,
593                  nick_rd->data_size);
594   data_offset += nick_rd->data_size;
595   GNUNET_assert (req ==
596                  (sizeof (struct GNUNET_GNSRECORD_Data)) * (*rdc_res) + data_offset);
597 }
598
599
600 /**
601  * Generate a `struct LookupNameResponseMessage` and send it to the
602  * given client using the given notification context.
603  *
604  * @param nc client to unicast to
605  * @param request_id request ID to use
606  * @param zone_key zone key of the zone
607  * @param name name
608  * @param rd_count number of records in @a rd
609  * @param rd array of records
610  */
611 static void
612 send_lookup_response (struct NamestoreClient *nc,
613                       uint32_t request_id,
614                       const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
615                       const char *name,
616                       unsigned int rd_count,
617                       const struct GNUNET_GNSRECORD_Data *rd)
618 {
619   struct GNUNET_MQ_Envelope *env;
620   struct RecordResultMessage *zir_msg;
621   struct GNUNET_GNSRECORD_Data *nick;
622   struct GNUNET_GNSRECORD_Data *res;
623   unsigned int res_count;
624   size_t name_len;
625   ssize_t rd_ser_len;
626   char *name_tmp;
627   char *rd_ser;
628
629   nick = get_nick_record (zone_key);
630   GNUNET_assert (-1 !=
631                  GNUNET_GNSRECORD_records_get_size (rd_count,
632                                                     rd));
633
634   if ( (NULL != nick) &&
635        (0 != strcmp (name,
636                      GNUNET_GNS_EMPTY_LABEL_AT)))
637   {
638     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
639     merge_with_nick_records (nick,
640                              rd_count,
641                              rd,
642                              &res_count,
643                              &res);
644     GNUNET_free (nick);
645   }
646   else
647   {
648     res_count = rd_count;
649     res = (struct GNUNET_GNSRECORD_Data *) rd;
650   }
651
652   GNUNET_assert (-1 !=
653                  GNUNET_GNSRECORD_records_get_size (res_count,
654                                                     res));
655
656
657   name_len = strlen (name) + 1;
658   rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count,
659                                                   res);
660   if (rd_ser_len < 0)
661   {
662     GNUNET_break (0);
663     GNUNET_SERVICE_client_drop (nc->client);
664     return;
665   }
666   if (rd_ser_len >= UINT16_MAX - name_len - sizeof (*zir_msg))
667   {
668     GNUNET_break (0);
669     GNUNET_SERVICE_client_drop (nc->client);
670     return;
671   }
672   env = GNUNET_MQ_msg_extra (zir_msg,
673                              name_len + rd_ser_len,
674                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
675   zir_msg->gns_header.r_id = htonl (request_id);
676   zir_msg->name_len = htons (name_len);
677   zir_msg->rd_count = htons (res_count);
678   zir_msg->rd_len = htons ((uint16_t) rd_ser_len);
679   zir_msg->private_key = *zone_key;
680   name_tmp = (char *) &zir_msg[1];
681   GNUNET_memcpy (name_tmp,
682                  name,
683                  name_len);
684   rd_ser = &name_tmp[name_len];
685   GNUNET_assert (rd_ser_len ==
686                  GNUNET_GNSRECORD_records_serialize (res_count,
687                                                      res,
688                                                      rd_ser_len,
689                                                      rd_ser));
690   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
691               "Sending RECORD_RESULT message with %u records\n",
692               res_count);
693   GNUNET_STATISTICS_update (statistics,
694                             "Record sets sent to clients",
695                             1,
696                             GNUNET_NO);
697   GNUNET_MQ_send (nc->mq,
698                   env);
699   if (rd != res)
700     GNUNET_free (res);
701 }
702
703
704 /**
705  * Send response to the store request to the client.
706  *
707  * @param client client to talk to
708  * @param res status of the operation
709  * @param rid client's request ID
710  */
711 static void
712 send_store_response (struct NamestoreClient *nc,
713                      int res,
714                      uint32_t rid)
715 {
716   struct GNUNET_MQ_Envelope *env;
717   struct RecordStoreResponseMessage *rcr_msg;
718
719   GNUNET_assert (NULL != nc);
720   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
721               "Sending RECORD_STORE_RESPONSE message\n");
722   GNUNET_STATISTICS_update (statistics,
723                             "Store requests completed",
724                             1,
725                             GNUNET_NO);
726   env = GNUNET_MQ_msg (rcr_msg,
727                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE_RESPONSE);
728   rcr_msg->gns_header.r_id = htonl (rid);
729   rcr_msg->op_result = htonl (res);
730   GNUNET_MQ_send (nc->mq,
731                   env);
732 }
733
734
735 /**
736  * Cache operation complete, clean up.
737  *
738  * @param cls the `struct CacheOperation`
739  * @param success success
740  * @param emsg error messages
741  */
742 static void
743 finish_cache_operation (void *cls,
744                         int32_t success,
745                         const char *emsg)
746 {
747   struct CacheOperation *cop = cls;
748
749   if (NULL != emsg)
750     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
751                 _("Failed to replicate block in namecache: %s\n"),
752                 emsg);
753   else
754     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
755                 "CACHE operation completed\n");
756   GNUNET_CONTAINER_DLL_remove (cop_head,
757                                cop_tail,
758                                cop);
759   if (NULL != cop->nc)
760     send_store_response (cop->nc,
761                          success,
762                          cop->rid);
763   GNUNET_free (cop);
764 }
765
766
767 /**
768  * We just touched the plaintext information about a name in our zone;
769  * refresh the corresponding (encrypted) block in the namecache.
770  *
771  * @param nc client responsible for the request, can be NULL
772  * @param rid request ID of the client
773  * @param zone_key private key of the zone
774  * @param name label for the records
775  * @param rd_count number of records
776  * @param rd records stored under the given @a name
777  */
778 static void
779 refresh_block (struct NamestoreClient *nc,
780                uint32_t rid,
781                const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
782                const char *name,
783                unsigned int rd_count,
784                const struct GNUNET_GNSRECORD_Data *rd)
785 {
786   struct GNUNET_GNSRECORD_Block *block;
787   struct CacheOperation *cop;
788   struct GNUNET_CRYPTO_EcdsaPublicKey pkey;
789   struct GNUNET_GNSRECORD_Data *nick;
790   struct GNUNET_GNSRECORD_Data *res;
791   unsigned int res_count;
792   struct GNUNET_TIME_Absolute exp_time;
793
794   nick = get_nick_record (zone_key);
795   res_count = rd_count;
796   res = (struct GNUNET_GNSRECORD_Data *) rd; /* fixme: a bit unclean... */
797   if (NULL != nick)
798   {
799     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
800     merge_with_nick_records (nick,
801                              rd_count,rd,
802                              &res_count,
803                              &res);
804     GNUNET_free (nick);
805   }
806   if (0 == res_count)
807   {
808     if (NULL != nc)
809       send_store_response (nc,
810                            GNUNET_OK,
811                            rid);
812     return; /* no data, no need to update cache */
813   }
814   if (GNUNET_YES == disable_namecache)
815   {
816     GNUNET_STATISTICS_update (statistics,
817                               "Namecache updates skipped (NC disabled)",
818                               1,
819                               GNUNET_NO);
820     if (NULL != nc)
821       send_store_response (nc,
822                            GNUNET_OK,
823                            rid);
824     return;
825   }
826   exp_time = GNUNET_GNSRECORD_record_get_expiration_time (res_count,
827                                                           res);
828   if (cache_keys)
829     block = GNUNET_GNSRECORD_block_create2 (zone_key,
830                                             exp_time,
831                                             name,
832                                             res,
833                                             res_count);
834   else
835     block = GNUNET_GNSRECORD_block_create (zone_key,
836                                            exp_time,
837                                            name,
838                                            res,
839                                            res_count);
840   GNUNET_assert (NULL != block);
841   GNUNET_CRYPTO_ecdsa_key_get_public (zone_key,
842                                       &pkey);
843   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844               "Caching block for label `%s' with %u records and expiration %s in zone `%s' in namecache\n",
845               name,
846               res_count,
847               GNUNET_STRINGS_absolute_time_to_string (exp_time),
848               GNUNET_GNSRECORD_z2s (&pkey));
849   GNUNET_STATISTICS_update (statistics,
850                             "Namecache updates pushed",
851                             1,
852                             GNUNET_NO);
853   cop = GNUNET_new (struct CacheOperation);
854   cop->nc = nc;
855   cop->rid = rid;
856   GNUNET_CONTAINER_DLL_insert (cop_head,
857                                cop_tail,
858                                cop);
859   cop->qe = GNUNET_NAMECACHE_block_cache (namecache,
860                                           block,
861                                           &finish_cache_operation,
862                                           cop);
863   GNUNET_free (block);
864 }
865
866
867 /**
868  * Print a warning that one of our monitors is no longer reacting.
869  *
870  * @param cls a `struct ZoneMonitor` to warn about
871  */
872 static void
873 warn_monitor_slow (void *cls)
874 {
875   struct ZoneMonitor *zm = cls;
876
877   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
878               "No response from monitor since %s\n",
879               GNUNET_STRINGS_absolute_time_to_string (zm->sa_waiting_start));
880   zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
881                                                       &warn_monitor_slow,
882                                                       zm);
883 }
884
885
886 /**
887  * Continue processing the @a sa.
888  *
889  * @param sa store activity to process
890  */
891 static void
892 continue_store_activity (struct StoreActivity *sa)
893 {
894   const struct RecordStoreMessage *rp_msg = sa->rsm;
895   unsigned int rd_count;
896   size_t name_len;
897   size_t rd_ser_len;
898   uint32_t rid;
899   const char *name_tmp;
900   const char *rd_ser;
901
902   rid = ntohl (rp_msg->gns_header.r_id);
903   name_len = ntohs (rp_msg->name_len);
904   rd_count = ntohs (rp_msg->rd_count);
905   rd_ser_len = ntohs (rp_msg->rd_len);
906   name_tmp = (const char *) &rp_msg[1];
907   rd_ser = &name_tmp[name_len];
908   {
909     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
910
911     /* We did this before, must succeed again */
912     GNUNET_assert (GNUNET_OK ==
913                    GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
914                                                          rd_ser,
915                                                          rd_count,
916                                                          rd));
917
918     for (struct ZoneMonitor *zm = sa->zm_pos;
919          NULL != zm;
920          zm = sa->zm_pos)
921     {
922       if ( (0 != memcmp (&rp_msg->private_key,
923                          &zm->zone,
924                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) &&
925            (0 != memcmp (&zm->zone,
926                          &zero,
927                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) )
928         {
929           sa->zm_pos = zm->next; /* not interesting to this monitor */
930           continue;
931         }
932       if (zm->limit == zm->iteration_cnt)
933       {
934         zm->sa_waiting = GNUNET_YES;
935         zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
936         if (NULL != zm->sa_wait_warning)
937           GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
938         zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
939                                                             &warn_monitor_slow,
940                                                             zm);
941         return; /* blocked on zone monitor */
942       }
943       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
944                   "Notifying monitor about changes under label `%s'\n",
945                   sa->conv_name);
946       zm->limit--;
947       send_lookup_response (zm->nc,
948                             0,
949                             &rp_msg->private_key,
950                             sa->conv_name,
951                             rd_count,
952                             rd);
953       sa->zm_pos = zm->next;
954     }
955     /* great, done with the monitors, unpack (again) for refresh_block operation */
956     refresh_block (sa->nc,
957                    rid,
958                    &rp_msg->private_key,
959                    sa->conv_name,
960                    rd_count,
961                    rd);
962   }
963   GNUNET_SERVICE_client_continue (sa->nc->client);
964   free_store_activity (sa);
965 }
966
967
968 /**
969  * Called whenever a client is disconnected.
970  * Frees our resources associated with that client.
971  *
972  * @param cls closure
973  * @param client identification of the client
974  * @param app_ctx the `struct NamestoreClient` of @a client
975  */
976 static void
977 client_disconnect_cb (void *cls,
978                       struct GNUNET_SERVICE_Client *client,
979                       void *app_ctx)
980 {
981   struct NamestoreClient *nc = app_ctx;
982   struct ZoneIteration *no;
983   struct CacheOperation *cop;
984
985   (void) cls;
986   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
987               "Client %p disconnected\n",
988               client);
989   for (struct ZoneMonitor *zm = monitor_head; NULL != zm; zm = zm->next)
990   {
991     struct StoreActivity *san;
992
993     if (nc != zm->nc)
994       continue;
995     GNUNET_CONTAINER_DLL_remove (monitor_head,
996                                  monitor_tail,
997                                  zm);
998     if (NULL != zm->task)
999     {
1000       GNUNET_SCHEDULER_cancel (zm->task);
1001       zm->task = NULL;
1002     }
1003     if (NULL != zm->sa_wait_warning)
1004     {
1005       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1006       zm->sa_wait_warning = NULL;
1007     }
1008     for (struct StoreActivity *sa = sa_head; NULL != sa; sa = san)
1009     {
1010       san = sa->next;
1011       if (zm == sa->zm_pos)
1012       {
1013         sa->zm_pos = zm->next;
1014         /* this may free sa */
1015         continue_store_activity (sa);
1016       }
1017     }
1018     GNUNET_free (zm);
1019     break;
1020   }
1021   for (struct StoreActivity *sa = sa_head; NULL != sa; sa = sa->next)
1022   {
1023     if (sa->nc == nc)
1024     {
1025       /* this may free sa */
1026       free_store_activity (sa);
1027       break; /* there can only be one per nc */
1028     }
1029   }
1030   while (NULL != (no = nc->op_head))
1031   {
1032     GNUNET_CONTAINER_DLL_remove (nc->op_head,
1033                                  nc->op_tail,
1034                                  no);
1035     GNUNET_free (no);
1036   }
1037   for (cop = cop_head; NULL != cop; cop = cop->next)
1038     if (nc == cop->nc)
1039       cop->nc = NULL;
1040   GNUNET_free (nc);
1041 }
1042
1043
1044 /**
1045  * Add a client to our list of active clients.
1046  *
1047  * @param cls NULL
1048  * @param client client to add
1049  * @param mq message queue for @a client
1050  * @return internal namestore client structure for this client
1051  */
1052 static void *
1053 client_connect_cb (void *cls,
1054                    struct GNUNET_SERVICE_Client *client,
1055                    struct GNUNET_MQ_Handle *mq)
1056 {
1057   struct NamestoreClient *nc;
1058
1059   (void) cls;
1060   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1061               "Client %p connected\n",
1062               client);
1063   nc = GNUNET_new (struct NamestoreClient);
1064   nc->client = client;
1065   nc->mq = mq;
1066   return nc;
1067 }
1068
1069
1070 /**
1071  * Closure for #lookup_it().
1072  */
1073 struct RecordLookupContext
1074 {
1075
1076   /**
1077    * FIXME.
1078    */
1079   const char *label;
1080
1081   /**
1082    * FIXME.
1083    */
1084   char *res_rd;
1085
1086   /**
1087    * FIXME.
1088    */
1089   struct GNUNET_GNSRECORD_Data *nick;
1090
1091   /**
1092    * FIXME.
1093    */
1094   int found;
1095
1096   /**
1097    * FIXME.
1098    */
1099   unsigned int res_rd_count;
1100
1101   /**
1102    * FIXME.
1103    */
1104   ssize_t rd_ser_len;
1105 };
1106
1107
1108 /**
1109  * FIXME.
1110  *
1111  * @param seq sequence number of the record
1112  */
1113 static void
1114 lookup_it (void *cls,
1115            uint64_t seq,
1116            const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
1117            const char *label,
1118            unsigned int rd_count,
1119            const struct GNUNET_GNSRECORD_Data *rd)
1120 {
1121   struct RecordLookupContext *rlc = cls;
1122
1123   (void) private_key;
1124   (void) seq;
1125   if (0 != strcmp (label,
1126                    rlc->label))
1127     return;
1128   rlc->found = GNUNET_YES;
1129   if (0 == rd_count)
1130   {
1131     rlc->rd_ser_len = 0;
1132     rlc->res_rd_count = 0;
1133     rlc->res_rd = NULL;
1134     return;
1135   }
1136   if ( (NULL != rlc->nick) &&
1137        (0 != strcmp (label,
1138                      GNUNET_GNS_EMPTY_LABEL_AT)) )
1139   {
1140     /* Merge */
1141     struct GNUNET_GNSRECORD_Data *rd_res;
1142     unsigned int rdc_res;
1143
1144     rd_res = NULL;
1145     rdc_res = 0;
1146     rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
1147     merge_with_nick_records (rlc->nick,
1148                              rd_count,
1149                              rd,
1150                              &rdc_res,
1151                              &rd_res);
1152     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res,
1153                                                          rd_res);
1154     if (rlc->rd_ser_len < 0)
1155     {
1156       GNUNET_break (0);
1157       GNUNET_free  (rd_res);
1158       rlc->found = GNUNET_NO;
1159       rlc->rd_ser_len = 0;
1160       return;
1161     }
1162     rlc->res_rd_count = rdc_res;
1163     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1164     if (rlc->rd_ser_len !=
1165         GNUNET_GNSRECORD_records_serialize (rdc_res,
1166                                             rd_res,
1167                                             rlc->rd_ser_len,
1168                                             rlc->res_rd))
1169     {
1170       GNUNET_break (0);
1171       GNUNET_free  (rlc->res_rd);
1172       rlc->res_rd = NULL;
1173       rlc->res_rd_count = 0;
1174       rlc->rd_ser_len = 0;
1175       GNUNET_free  (rd_res);
1176       rlc->found = GNUNET_NO;
1177       return;
1178     }
1179     GNUNET_free (rd_res);
1180     GNUNET_free (rlc->nick);
1181     rlc->nick = NULL;
1182   }
1183   else
1184   {
1185     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1186                                                          rd);
1187     if (rlc->rd_ser_len < 0)
1188     {
1189       GNUNET_break (0);
1190       rlc->found = GNUNET_NO;
1191       rlc->rd_ser_len = 0;
1192       return;
1193     }
1194     rlc->res_rd_count = rd_count;
1195     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1196     if (rlc->rd_ser_len !=
1197         GNUNET_GNSRECORD_records_serialize (rd_count,
1198                                             rd,
1199                                             rlc->rd_ser_len,
1200                                             rlc->res_rd))
1201     {
1202       GNUNET_break (0);
1203       GNUNET_free  (rlc->res_rd);
1204       rlc->res_rd = NULL;
1205       rlc->res_rd_count = 0;
1206       rlc->rd_ser_len = 0;
1207       rlc->found = GNUNET_NO;
1208       return;
1209     }
1210   }
1211 }
1212
1213
1214 /**
1215  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1216  *
1217  * @param cls client sending the message
1218  * @param ll_msg message of type `struct LabelLookupMessage`
1219  * @return #GNUNET_OK if @a ll_msg is well-formed
1220  */
1221 static int
1222 check_record_lookup (void *cls,
1223                      const struct LabelLookupMessage *ll_msg)
1224 {
1225   uint32_t name_len;
1226   size_t src_size;
1227
1228   (void) cls;
1229   name_len = ntohl (ll_msg->label_len);
1230   src_size = ntohs (ll_msg->gns_header.header.size);
1231   if (name_len != src_size - sizeof (struct LabelLookupMessage))
1232   {
1233     GNUNET_break (0);
1234     return GNUNET_SYSERR;
1235   }
1236   GNUNET_MQ_check_zero_termination (ll_msg);
1237   return GNUNET_OK;
1238 }
1239
1240
1241 /**
1242  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1243  *
1244  * @param cls client sending the message
1245  * @param ll_msg message of type `struct LabelLookupMessage`
1246  */
1247 static void
1248 handle_record_lookup (void *cls,
1249                       const struct LabelLookupMessage *ll_msg)
1250 {
1251   struct NamestoreClient *nc = cls;
1252   struct GNUNET_MQ_Envelope *env;
1253   struct LabelLookupResponseMessage *llr_msg;
1254   struct RecordLookupContext rlc;
1255   const char *name_tmp;
1256   char *res_name;
1257   char *conv_name;
1258   uint32_t name_len;
1259   int res;
1260
1261   name_len = ntohl (ll_msg->label_len);
1262   name_tmp = (const char *) &ll_msg[1];
1263   GNUNET_SERVICE_client_continue (nc->client);
1264   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1265               "Received NAMESTORE_RECORD_LOOKUP message for name `%s'\n",
1266               name_tmp);
1267
1268   conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1269   if (NULL == conv_name)
1270   {
1271     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1272                 "Error converting name `%s'\n",
1273                 name_tmp);
1274     GNUNET_SERVICE_client_drop (nc->client);
1275     return;
1276   }
1277   rlc.label = conv_name;
1278   rlc.found = GNUNET_NO;
1279   rlc.res_rd_count = 0;
1280   rlc.res_rd = NULL;
1281   rlc.rd_ser_len = 0;
1282   rlc.nick = get_nick_record (&ll_msg->zone);
1283   res = GSN_database->lookup_records (GSN_database->cls,
1284                                       &ll_msg->zone,
1285                                       conv_name,
1286                                       &lookup_it,
1287                                       &rlc);
1288   GNUNET_free (conv_name);
1289   env = GNUNET_MQ_msg_extra (llr_msg,
1290                              name_len + rlc.rd_ser_len,
1291                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP_RESPONSE);
1292   llr_msg->gns_header.r_id = ll_msg->gns_header.r_id;
1293   llr_msg->private_key = ll_msg->zone;
1294   llr_msg->name_len = htons (name_len);
1295   llr_msg->rd_count = htons (rlc.res_rd_count);
1296   llr_msg->rd_len = htons (rlc.rd_ser_len);
1297   res_name = (char *) &llr_msg[1];
1298   if  ((GNUNET_YES == rlc.found) && (GNUNET_OK == res))
1299     llr_msg->found = ntohs (GNUNET_YES);
1300   else
1301     llr_msg->found = ntohs (GNUNET_NO);
1302   GNUNET_memcpy (&llr_msg[1],
1303                  name_tmp,
1304                  name_len);
1305   GNUNET_memcpy (&res_name[name_len],
1306                  rlc.res_rd,
1307                  rlc.rd_ser_len);
1308   GNUNET_MQ_send (nc->mq,
1309                   env);
1310   GNUNET_free_non_null (rlc.res_rd);
1311 }
1312
1313
1314 /**
1315  * Checks a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1316  *
1317  * @param cls client sending the message
1318  * @param rp_msg message of type `struct RecordStoreMessage`
1319  * @return #GNUNET_OK if @a rp_msg is well-formed
1320  */
1321 static int
1322 check_record_store (void *cls,
1323                     const struct RecordStoreMessage *rp_msg)
1324 {
1325   size_t name_len;
1326   size_t msg_size;
1327   size_t msg_size_exp;
1328   size_t rd_ser_len;
1329   const char *name_tmp;
1330
1331   (void) cls;
1332   name_len = ntohs (rp_msg->name_len);
1333   msg_size = ntohs (rp_msg->gns_header.header.size);
1334   rd_ser_len = ntohs (rp_msg->rd_len);
1335   msg_size_exp = sizeof (struct RecordStoreMessage) + name_len + rd_ser_len;
1336   if (msg_size != msg_size_exp)
1337   {
1338     GNUNET_break (0);
1339     return GNUNET_SYSERR;
1340   }
1341   if ( (0 == name_len) ||
1342        (name_len > MAX_NAME_LEN) )
1343   {
1344     GNUNET_break (0);
1345     return GNUNET_SYSERR;
1346   }
1347   name_tmp = (const char *) &rp_msg[1];
1348   if ('\0' != name_tmp[name_len -1])
1349   {
1350     GNUNET_break (0);
1351     return GNUNET_SYSERR;
1352   }
1353   return GNUNET_OK;
1354 }
1355
1356
1357 /**
1358  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1359  *
1360  * @param cls client sending the message
1361  * @param rp_msg message of type `struct RecordStoreMessage`
1362  */
1363 static void
1364 handle_record_store (void *cls,
1365                      const struct RecordStoreMessage *rp_msg)
1366 {
1367   struct NamestoreClient *nc = cls;
1368   size_t name_len;
1369   size_t rd_ser_len;
1370   uint32_t rid;
1371   const char *name_tmp;
1372   char *conv_name;
1373   const char *rd_ser;
1374   unsigned int rd_count;
1375   int res;
1376   struct StoreActivity *sa;
1377
1378   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1379               "Received NAMESTORE_RECORD_STORE message\n");
1380   rid = ntohl (rp_msg->gns_header.r_id);
1381   name_len = ntohs (rp_msg->name_len);
1382   rd_count = ntohs (rp_msg->rd_count);
1383   rd_ser_len = ntohs (rp_msg->rd_len);
1384   GNUNET_break (0 == ntohs (rp_msg->reserved));
1385   name_tmp = (const char *) &rp_msg[1];
1386   rd_ser = &name_tmp[name_len];
1387   {
1388     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1389
1390     if (GNUNET_OK !=
1391         GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1392                                               rd_ser,
1393                                               rd_count,
1394                                               rd))
1395     {
1396       GNUNET_break (0);
1397       GNUNET_SERVICE_client_drop (nc->client);
1398       return;
1399     }
1400
1401     /* Extracting and converting private key */
1402     conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1403     if (NULL == conv_name)
1404     {
1405       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1406                   "Error converting name `%s'\n",
1407                   name_tmp);
1408       GNUNET_SERVICE_client_drop (nc->client);
1409       return;
1410     }
1411     GNUNET_STATISTICS_update (statistics,
1412                               "Well-formed store requests received",
1413                               1,
1414                               GNUNET_NO);
1415     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1416                 "Creating %u records for name `%s'\n",
1417                 (unsigned int) rd_count,
1418                 conv_name);
1419     if ( (0 == rd_count) &&
1420          (GNUNET_NO ==
1421           GSN_database->lookup_records (GSN_database->cls,
1422                                         &rp_msg->private_key,
1423                                         conv_name,
1424                                         NULL,
1425                                         0)) )
1426     {
1427       /* This name does not exist, so cannot be removed */
1428       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1429                   "Name `%s' does not exist, no deletion required\n",
1430                   conv_name);
1431       res = GNUNET_NO;
1432     }
1433     else
1434     {
1435       /* remove "NICK" records, unless this is for the
1436          #GNUNET_GNS_EMPTY_LABEL_AT label */
1437       struct GNUNET_GNSRECORD_Data rd_clean[GNUNET_NZL(rd_count)];
1438       unsigned int rd_clean_off;
1439
1440       rd_clean_off = 0;
1441       for (unsigned int i=0;i<rd_count;i++)
1442       {
1443         rd_clean[rd_clean_off] = rd[i];
1444         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1445                            conv_name)) ||
1446              (GNUNET_GNSRECORD_TYPE_NICK != rd[i].record_type) )
1447           rd_clean_off++;
1448       }
1449       res = GSN_database->store_records (GSN_database->cls,
1450                                          &rp_msg->private_key,
1451                                          conv_name,
1452                                          rd_clean_off,
1453                                          rd_clean);
1454     }
1455
1456     if (GNUNET_OK != res)
1457     {
1458       /* store not successful, not need to tell monitors */
1459       send_store_response (nc,
1460                            res,
1461                            rid);
1462       GNUNET_SERVICE_client_continue (nc->client);
1463       GNUNET_free (conv_name);
1464       return;
1465     }
1466
1467     sa = GNUNET_malloc (sizeof (struct StoreActivity) +
1468                         ntohs (rp_msg->gns_header.header.size));
1469     GNUNET_CONTAINER_DLL_insert (sa_head,
1470                                  sa_tail,
1471                                  sa);
1472     sa->nc = nc;
1473     sa->rsm = (const struct RecordStoreMessage *) &sa[1];
1474     GNUNET_memcpy (&sa[1],
1475                    rp_msg,
1476                    ntohs (rp_msg->gns_header.header.size));
1477     sa->zm_pos = monitor_head;
1478     sa->conv_name = conv_name;
1479     continue_store_activity (sa);
1480   }
1481 }
1482
1483
1484 /**
1485  * Context for record remove operations passed from #handle_zone_to_name to
1486  * #handle_zone_to_name_it as closure
1487  */
1488 struct ZoneToNameCtx
1489 {
1490   /**
1491    * Namestore client
1492    */
1493   struct NamestoreClient *nc;
1494
1495   /**
1496    * Request id (to be used in the response to the client).
1497    */
1498   uint32_t rid;
1499
1500   /**
1501    * Set to #GNUNET_OK on success, #GNUNET_SYSERR on error.  Note that
1502    * not finding a name for the zone still counts as a 'success' here,
1503    * as this field is about the success of executing the IPC protocol.
1504    */
1505   int success;
1506 };
1507
1508
1509 /**
1510  * Zone to name iterator
1511  *
1512  * @param cls struct ZoneToNameCtx *
1513  * @param seq sequence number of the record
1514  * @param zone_key the zone key
1515  * @param name name
1516  * @param rd_count number of records in @a rd
1517  * @param rd record data
1518  */
1519 static void
1520 handle_zone_to_name_it (void *cls,
1521                         uint64_t seq,
1522                         const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1523                         const char *name,
1524                         unsigned int rd_count,
1525                         const struct GNUNET_GNSRECORD_Data *rd)
1526 {
1527   struct ZoneToNameCtx *ztn_ctx = cls;
1528   struct GNUNET_MQ_Envelope *env;
1529   struct ZoneToNameResponseMessage *ztnr_msg;
1530   int16_t res;
1531   size_t name_len;
1532   ssize_t rd_ser_len;
1533   size_t msg_size;
1534   char *name_tmp;
1535   char *rd_tmp;
1536
1537   (void) seq;
1538   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1539               "Found result for zone-to-name lookup: `%s'\n",
1540               name);
1541   res = GNUNET_YES;
1542   name_len = (NULL == name) ? 0 : strlen (name) + 1;
1543   rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1544                                                   rd);
1545   if (rd_ser_len < 0)
1546   {
1547     GNUNET_break (0);
1548     ztn_ctx->success = GNUNET_SYSERR;
1549     return;
1550   }
1551   msg_size = sizeof (struct ZoneToNameResponseMessage) + name_len + rd_ser_len;
1552   if (msg_size >= GNUNET_MAX_MESSAGE_SIZE)
1553   {
1554     GNUNET_break (0);
1555     ztn_ctx->success = GNUNET_SYSERR;
1556     return;
1557   }
1558   env = GNUNET_MQ_msg_extra (ztnr_msg,
1559                              name_len + rd_ser_len,
1560                              GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1561   ztnr_msg->gns_header.header.size = htons (msg_size);
1562   ztnr_msg->gns_header.r_id = htonl (ztn_ctx->rid);
1563   ztnr_msg->res = htons (res);
1564   ztnr_msg->rd_len = htons (rd_ser_len);
1565   ztnr_msg->rd_count = htons (rd_count);
1566   ztnr_msg->name_len = htons (name_len);
1567   ztnr_msg->zone = *zone_key;
1568   name_tmp = (char *) &ztnr_msg[1];
1569   GNUNET_memcpy (name_tmp,
1570                  name,
1571                  name_len);
1572   rd_tmp = &name_tmp[name_len];
1573   GNUNET_assert (rd_ser_len ==
1574                  GNUNET_GNSRECORD_records_serialize (rd_count,
1575                                                      rd,
1576                                                      rd_ser_len,
1577                                                      rd_tmp));
1578   ztn_ctx->success = GNUNET_OK;
1579   GNUNET_MQ_send (ztn_ctx->nc->mq,
1580                   env);
1581 }
1582
1583
1584 /**
1585  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME message
1586  *
1587  * @param cls client client sending the message
1588  * @param ztn_msg message of type 'struct ZoneToNameMessage'
1589  */
1590 static void
1591 handle_zone_to_name (void *cls,
1592                      const struct ZoneToNameMessage *ztn_msg)
1593 {
1594   struct NamestoreClient *nc = cls;
1595   struct ZoneToNameCtx ztn_ctx;
1596   struct GNUNET_MQ_Envelope *env;
1597   struct ZoneToNameResponseMessage *ztnr_msg;
1598
1599   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1600               "Received ZONE_TO_NAME message\n");
1601   ztn_ctx.rid = ntohl (ztn_msg->gns_header.r_id);
1602   ztn_ctx.nc = nc;
1603   ztn_ctx.success = GNUNET_NO;
1604   if (GNUNET_SYSERR ==
1605       GSN_database->zone_to_name (GSN_database->cls,
1606                                   &ztn_msg->zone,
1607                                   &ztn_msg->value_zone,
1608                                   &handle_zone_to_name_it, &ztn_ctx))
1609   {
1610     /* internal error, hang up instead of signalling something
1611        that might be wrong */
1612     GNUNET_break (0);
1613     GNUNET_SERVICE_client_drop (nc->client);
1614     return;
1615   }
1616   if (GNUNET_NO == ztn_ctx.success)
1617   {
1618     /* no result found, send empty response */
1619     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1620                 "Found no result for zone-to-name lookup.\n");
1621     env = GNUNET_MQ_msg (ztnr_msg,
1622                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1623     ztnr_msg->gns_header.r_id = ztn_msg->gns_header.r_id;
1624     ztnr_msg->res = htons (GNUNET_NO);
1625     GNUNET_MQ_send (nc->mq,
1626                     env);
1627   }
1628   GNUNET_SERVICE_client_continue (nc->client);
1629 }
1630
1631
1632 /**
1633  * Context for record remove operations passed from
1634  * #run_zone_iteration_round to #zone_iterate_proc as closure
1635  */
1636 struct ZoneIterationProcResult
1637 {
1638   /**
1639    * The zone iteration handle
1640    */
1641   struct ZoneIteration *zi;
1642
1643   /**
1644    * Number of results left to be returned in this iteration.
1645    */
1646   uint64_t limit;
1647
1648 };
1649
1650
1651 /**
1652  * Process results for zone iteration from database
1653  *
1654  * @param cls struct ZoneIterationProcResult
1655  * @param seq sequence number of the record
1656  * @param zone_key the zone key
1657  * @param name name
1658  * @param rd_count number of records for this name
1659  * @param rd record data
1660  */
1661 static void
1662 zone_iterate_proc (void *cls,
1663                    uint64_t seq,
1664                    const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1665                    const char *name,
1666                    unsigned int rd_count,
1667                    const struct GNUNET_GNSRECORD_Data *rd)
1668 {
1669   struct ZoneIterationProcResult *proc = cls;
1670   int do_refresh_block;
1671
1672   if ( (NULL == zone_key) &&
1673        (NULL == name) )
1674   {
1675     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1676                 "Iteration done\n");
1677     return;
1678   }
1679   if ( (NULL == zone_key) ||
1680        (NULL == name) )
1681   {
1682     /* what is this!? should never happen */
1683     GNUNET_break (0);
1684     return;
1685   }
1686   if (0 == proc->limit)
1687   {
1688     /* what is this!? should never happen */
1689     GNUNET_break (0);
1690     return;
1691   }
1692   proc->limit--;
1693   proc->zi->seq = seq;
1694   send_lookup_response (proc->zi->nc,
1695                         proc->zi->request_id,
1696                         zone_key,
1697                         name,
1698                         rd_count,
1699                         rd);
1700
1701
1702   do_refresh_block = GNUNET_NO;
1703   for (unsigned int i=0;i<rd_count;i++)
1704     if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
1705     {
1706       do_refresh_block = GNUNET_YES;
1707       break;
1708     }
1709   if (GNUNET_YES == do_refresh_block)
1710     refresh_block (NULL,
1711                    0,
1712                    zone_key,
1713                    name,
1714                    rd_count,
1715                    rd);
1716 }
1717
1718
1719 /**
1720  * Perform the next round of the zone iteration.
1721  *
1722  * @param zi zone iterator to process
1723  * @param limit number of results to return in one pass
1724  */
1725 static void
1726 run_zone_iteration_round (struct ZoneIteration *zi,
1727                           uint64_t limit)
1728 {
1729   struct ZoneIterationProcResult proc;
1730   struct GNUNET_MQ_Envelope *env;
1731   struct GNUNET_NAMESTORE_Header *em;
1732   struct GNUNET_TIME_Absolute start;
1733   struct GNUNET_TIME_Relative duration;
1734
1735   memset (&proc,
1736           0,
1737           sizeof (proc));
1738   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1739               "Asked to return up to %llu records at position %llu\n",
1740               (unsigned long long) limit,
1741               (unsigned long long) zi->seq);
1742   proc.zi = zi;
1743   proc.limit = limit;
1744   start = GNUNET_TIME_absolute_get ();
1745   GNUNET_break (GNUNET_SYSERR !=
1746                 GSN_database->iterate_records (GSN_database->cls,
1747                                                (0 == memcmp (&zi->zone,
1748                                                              &zero,
1749                                                              sizeof (zero)))
1750                                                ? NULL
1751                                                : &zi->zone,
1752                                                zi->seq,
1753                                                limit,
1754                                                &zone_iterate_proc,
1755                                                &proc));
1756   duration = GNUNET_TIME_absolute_get_duration (start);
1757   duration = GNUNET_TIME_relative_divide (duration,
1758                                           limit - proc.limit);
1759   GNUNET_STATISTICS_set (statistics,
1760                          "NAMESTORE iteration delay (μs/record)",
1761                          duration.rel_value_us,
1762                          GNUNET_NO);
1763   if (0 == proc.limit)
1764   {
1765     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1766                 "Returned %llu results, more results available\n",
1767                 (unsigned long long) limit);
1768     return; /* more results later after we get the
1769                #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message */
1770   }
1771   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1772               "Completed iteration after %llu/%llu results\n",
1773               (unsigned long long) (limit - proc.limit),
1774               (unsigned long long) limit);
1775   /* send empty response to indicate end of list */
1776   env = GNUNET_MQ_msg (em,
1777                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END);
1778   em->r_id = htonl (zi->request_id);
1779   GNUNET_MQ_send (zi->nc->mq,
1780                   env);
1781   GNUNET_CONTAINER_DLL_remove (zi->nc->op_head,
1782                                zi->nc->op_tail,
1783                                zi);
1784   GNUNET_free (zi);
1785 }
1786
1787
1788 /**
1789  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START message
1790  *
1791  * @param cls the client sending the message
1792  * @param zis_msg message from the client
1793  */
1794 static void
1795 handle_iteration_start (void *cls,
1796                         const struct ZoneIterationStartMessage *zis_msg)
1797 {
1798   struct NamestoreClient *nc = cls;
1799   struct ZoneIteration *zi;
1800
1801   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1802               "Received ZONE_ITERATION_START message\n");
1803   zi = GNUNET_new (struct ZoneIteration);
1804   zi->request_id = ntohl (zis_msg->gns_header.r_id);
1805   zi->offset = 0;
1806   zi->nc = nc;
1807   zi->zone = zis_msg->zone;
1808
1809   GNUNET_CONTAINER_DLL_insert (nc->op_head,
1810                                nc->op_tail,
1811                                zi);
1812   run_zone_iteration_round (zi,
1813                             1);
1814   GNUNET_SERVICE_client_continue (nc->client);
1815 }
1816
1817
1818 /**
1819  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP message
1820  *
1821  * @param cls the client sending the message
1822  * @param zis_msg message from the client
1823  */
1824 static void
1825 handle_iteration_stop (void *cls,
1826                        const struct ZoneIterationStopMessage *zis_msg)
1827 {
1828   struct NamestoreClient *nc = cls;
1829   struct ZoneIteration *zi;
1830   uint32_t rid;
1831
1832   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1833               "Received ZONE_ITERATION_STOP message\n");
1834   rid = ntohl (zis_msg->gns_header.r_id);
1835   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1836     if (zi->request_id == rid)
1837       break;
1838   if (NULL == zi)
1839   {
1840     GNUNET_break (0);
1841     GNUNET_SERVICE_client_drop (nc->client);
1842     return;
1843   }
1844   GNUNET_CONTAINER_DLL_remove (nc->op_head,
1845                                nc->op_tail,
1846                                zi);
1847   GNUNET_free (zi);
1848   GNUNET_SERVICE_client_continue (nc->client);
1849 }
1850
1851
1852 /**
1853  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message
1854  *
1855  * @param cls the client sending the message
1856  * @param message message from the client
1857  */
1858 static void
1859 handle_iteration_next (void *cls,
1860                        const struct ZoneIterationNextMessage *zis_msg)
1861 {
1862   struct NamestoreClient *nc = cls;
1863   struct ZoneIteration *zi;
1864   uint32_t rid;
1865   uint64_t limit;
1866
1867   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1868               "Received ZONE_ITERATION_NEXT message\n");
1869   GNUNET_STATISTICS_update (statistics,
1870                             "Iteration NEXT messages received",
1871                             1,
1872                             GNUNET_NO);
1873   rid = ntohl (zis_msg->gns_header.r_id);
1874   limit = GNUNET_ntohll (zis_msg->limit);
1875   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1876     if (zi->request_id == rid)
1877       break;
1878   if (NULL == zi)
1879   {
1880     GNUNET_break (0);
1881     GNUNET_SERVICE_client_drop (nc->client);
1882     return;
1883   }
1884   run_zone_iteration_round (zi,
1885                             limit);
1886   GNUNET_SERVICE_client_continue (nc->client);
1887 }
1888
1889
1890 /**
1891  * Function called when the monitor is ready for more data, and we
1892  * should thus unblock PUT operations that were blocked on the
1893  * monitor not being ready.
1894  */
1895 static void
1896 monitor_unblock (struct ZoneMonitor *zm)
1897 {
1898   struct StoreActivity *sa = sa_head;
1899
1900   while ( (NULL != sa) &&
1901           (zm->limit > zm->iteration_cnt) )
1902   {
1903     struct StoreActivity *sn = sa->next;
1904
1905     if (sa->zm_pos == zm)
1906       continue_store_activity (sa);
1907     sa = sn;
1908   }
1909   if (zm->limit > zm->iteration_cnt)
1910   {
1911     zm->sa_waiting = GNUNET_NO;
1912     if (NULL != zm->sa_wait_warning)
1913     {
1914       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1915       zm->sa_wait_warning = NULL;
1916     }
1917   }
1918   else if (GNUNET_YES == zm->sa_waiting)
1919   {
1920     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
1921     if (NULL != zm->sa_wait_warning)
1922       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1923     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1924                                                         &warn_monitor_slow,
1925                                                         zm);
1926   }
1927 }
1928
1929
1930 /**
1931  * Send 'sync' message to zone monitor, we're now in sync.
1932  *
1933  * @param zm monitor that is now in sync
1934  */
1935 static void
1936 monitor_sync (struct ZoneMonitor *zm)
1937 {
1938   struct GNUNET_MQ_Envelope *env;
1939   struct GNUNET_MessageHeader *sync;
1940
1941   env = GNUNET_MQ_msg (sync,
1942                        GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC);
1943   GNUNET_MQ_send (zm->nc->mq,
1944                   env);
1945   /* mark iteration done */
1946   zm->in_first_iteration = GNUNET_NO;
1947   zm->iteration_cnt = 0;
1948   if ( (zm->limit > 0) &&
1949        (zm->sa_waiting) )
1950     monitor_unblock (zm);
1951 }
1952
1953
1954 /**
1955  * Obtain the next datum during the zone monitor's zone initial iteration.
1956  *
1957  * @param cls zone monitor that does its initial iteration
1958  */
1959 static void
1960 monitor_iteration_next (void *cls);
1961
1962
1963 /**
1964  * A #GNUNET_NAMESTORE_RecordIterator for monitors.
1965  *
1966  * @param cls a 'struct ZoneMonitor *' with information about the monitor
1967  * @param seq sequence number of the record
1968  * @param zone_key zone key of the zone
1969  * @param name name
1970  * @param rd_count number of records in @a rd
1971  * @param rd array of records
1972  */
1973 static void
1974 monitor_iterate_cb (void *cls,
1975                     uint64_t seq,
1976                     const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1977                     const char *name,
1978                     unsigned int rd_count,
1979                     const struct GNUNET_GNSRECORD_Data *rd)
1980 {
1981   struct ZoneMonitor *zm = cls;
1982
1983   zm->seq = seq;
1984   GNUNET_assert (NULL != name);
1985   GNUNET_STATISTICS_update (statistics,
1986                             "Monitor notifications sent",
1987                             1,
1988                             GNUNET_NO);
1989   zm->limit--;
1990   zm->iteration_cnt--;
1991   send_lookup_response (zm->nc,
1992                         0,
1993                         zone_key,
1994                         name,
1995                         rd_count,
1996                         rd);
1997   if ( (0 == zm->iteration_cnt) &&
1998        (0 != zm->limit) )
1999   {
2000     /* We are done with the current iteration batch, AND the
2001        client would right now accept more, so go again! */
2002     GNUNET_assert (NULL == zm->task);
2003     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2004                                          zm);
2005   }
2006 }
2007
2008
2009 /**
2010  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START message
2011  *
2012  * @param cls the client sending the message
2013  * @param zis_msg message from the client
2014  */
2015 static void
2016 handle_monitor_start (void *cls,
2017                       const struct ZoneMonitorStartMessage *zis_msg)
2018 {
2019   struct NamestoreClient *nc = cls;
2020   struct ZoneMonitor *zm;
2021
2022   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2023               "Received ZONE_MONITOR_START message\n");
2024   zm = GNUNET_new (struct ZoneMonitor);
2025   zm->nc = nc;
2026   zm->zone = zis_msg->zone;
2027   zm->limit = 1;
2028   zm->in_first_iteration = (GNUNET_YES == ntohl (zis_msg->iterate_first));
2029   GNUNET_CONTAINER_DLL_insert (monitor_head,
2030                                monitor_tail,
2031                                zm);
2032   GNUNET_SERVICE_client_mark_monitor (nc->client);
2033   GNUNET_SERVICE_client_continue (nc->client);
2034   GNUNET_notification_context_add (monitor_nc,
2035                                    nc->mq);
2036   if (zm->in_first_iteration)
2037     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2038                                          zm);
2039   else
2040     monitor_sync (zm);
2041 }
2042
2043
2044 /**
2045  * Obtain the next datum during the zone monitor's zone initial iteration.
2046  *
2047  * @param cls zone monitor that does its initial iteration
2048  */
2049 static void
2050 monitor_iteration_next (void *cls)
2051 {
2052   struct ZoneMonitor *zm = cls;
2053   int ret;
2054
2055   zm->task = NULL;
2056   GNUNET_assert (0 == zm->iteration_cnt);
2057   if (zm->limit > 16)
2058     zm->iteration_cnt = zm->limit / 2; /* leave half for monitor events */
2059   else
2060     zm->iteration_cnt = zm->limit; /* use it all */
2061   ret = GSN_database->iterate_records (GSN_database->cls,
2062                                        (0 == memcmp (&zm->zone,
2063                                                      &zero,
2064                                                      sizeof (zero)))
2065                                        ? NULL
2066                                        : &zm->zone,
2067                                        zm->seq,
2068                                        zm->iteration_cnt,
2069                                        &monitor_iterate_cb,
2070                                        zm);
2071   if (GNUNET_SYSERR == ret)
2072   {
2073     GNUNET_SERVICE_client_drop (zm->nc->client);
2074     return;
2075   }
2076   if (GNUNET_NO == ret)
2077   {
2078     /* empty zone */
2079     monitor_sync (zm);
2080     return;
2081   }
2082 }
2083
2084
2085 /**
2086  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT message
2087  *
2088  * @param cls the client sending the message
2089  * @param nm message from the client
2090  */
2091 static void
2092 handle_monitor_next (void *cls,
2093                      const struct ZoneMonitorNextMessage *nm)
2094 {
2095   struct NamestoreClient *nc = cls;
2096   struct ZoneMonitor *zm;
2097   uint64_t inc;
2098
2099   inc = GNUNET_ntohll (nm->limit);
2100   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2101               "Received ZONE_MONITOR_NEXT message with limit %llu\n",
2102               (unsigned long long) inc);
2103   for (zm = monitor_head; NULL != zm; zm = zm->next)
2104     if (zm->nc == nc)
2105       break;
2106   if (NULL == zm)
2107   {
2108     GNUNET_break (0);
2109     GNUNET_SERVICE_client_drop (nc->client);
2110     return;
2111   }
2112   GNUNET_SERVICE_client_continue (nc->client);
2113   if (zm->limit + inc < zm->limit)
2114   {
2115     GNUNET_break (0);
2116     GNUNET_SERVICE_client_drop (nc->client);
2117     return;
2118   }
2119   zm->limit += inc;
2120   if ( (zm->in_first_iteration) &&
2121        (zm->limit == inc) )
2122   {
2123     /* We are still iterating, and the previous iteration must
2124        have stopped due to the client's limit, so continue it! */
2125     GNUNET_assert (NULL == zm->task);
2126     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2127                                          zm);
2128   }
2129   GNUNET_assert (zm->iteration_cnt <= zm->limit);
2130   if ( (zm->limit > zm->iteration_cnt) &&
2131        (zm->sa_waiting) )
2132   {
2133     monitor_unblock (zm);
2134   }
2135   else if (GNUNET_YES == zm->sa_waiting)
2136   {
2137     if (NULL != zm->sa_wait_warning)
2138       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2139     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2140     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2141                                                         &warn_monitor_slow,
2142                                                         zm);
2143   }
2144 }
2145
2146
2147 /**
2148  * Process namestore requests.
2149  *
2150  * @param cls closure
2151  * @param cfg configuration to use
2152  * @param service the initialized service
2153  */
2154 static void
2155 run (void *cls,
2156      const struct GNUNET_CONFIGURATION_Handle *cfg,
2157      struct GNUNET_SERVICE_Handle *service)
2158 {
2159   char *database;
2160
2161   (void) cls;
2162   (void) service;
2163   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2164               "Starting namestore service\n");
2165   cache_keys = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2166                                                      "namestore",
2167                                                      "CACHE_KEYS");
2168   disable_namecache = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2169                                                             "namecache",
2170                                                             "DISABLE");
2171   GSN_cfg = cfg;
2172   monitor_nc = GNUNET_notification_context_create (1);
2173   if (GNUNET_YES != disable_namecache)
2174   {
2175     namecache = GNUNET_NAMECACHE_connect (cfg);
2176     GNUNET_assert (NULL != namecache);
2177   }
2178   /* Loading database plugin */
2179   if (GNUNET_OK !=
2180       GNUNET_CONFIGURATION_get_value_string (cfg,
2181                                              "namestore",
2182                                              "database",
2183                                              &database))
2184     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2185                 "No database backend configured\n");
2186
2187   GNUNET_asprintf (&db_lib_name,
2188                    "libgnunet_plugin_namestore_%s",
2189                    database);
2190   GSN_database = GNUNET_PLUGIN_load (db_lib_name,
2191                                      (void *) GSN_cfg);
2192   GNUNET_free (database);
2193   statistics = GNUNET_STATISTICS_create ("namestore",
2194                                          cfg);
2195   GNUNET_SCHEDULER_add_shutdown (&cleanup_task,
2196                                  NULL);
2197   if (NULL == GSN_database)
2198   {
2199     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2200                 "Could not load database backend `%s'\n",
2201                 db_lib_name);
2202     GNUNET_SCHEDULER_shutdown ();
2203     return;
2204   }
2205 }
2206
2207
2208 /**
2209  * Define "main" method using service macro.
2210  */
2211 GNUNET_SERVICE_MAIN
2212 ("namestore",
2213  GNUNET_SERVICE_OPTION_NONE,
2214  &run,
2215  &client_connect_cb,
2216  &client_disconnect_cb,
2217  NULL,
2218  GNUNET_MQ_hd_var_size (record_store,
2219                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE,
2220                         struct RecordStoreMessage,
2221                         NULL),
2222  GNUNET_MQ_hd_var_size (record_lookup,
2223                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP,
2224                         struct LabelLookupMessage,
2225                         NULL),
2226  GNUNET_MQ_hd_fixed_size (zone_to_name,
2227                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME,
2228                           struct ZoneToNameMessage,
2229                           NULL),
2230  GNUNET_MQ_hd_fixed_size (iteration_start,
2231                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START,
2232                           struct ZoneIterationStartMessage,
2233                           NULL),
2234  GNUNET_MQ_hd_fixed_size (iteration_next,
2235                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT,
2236                           struct ZoneIterationNextMessage,
2237                           NULL),
2238  GNUNET_MQ_hd_fixed_size (iteration_stop,
2239                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP,
2240                           struct ZoneIterationStopMessage,
2241                           NULL),
2242  GNUNET_MQ_hd_fixed_size (monitor_start,
2243                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START,
2244                           struct ZoneMonitorStartMessage,
2245                           NULL),
2246  GNUNET_MQ_hd_fixed_size (monitor_next,
2247                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT,
2248                           struct ZoneMonitorNextMessage,
2249                           NULL),
2250  GNUNET_MQ_handler_end ());
2251
2252
2253 /* end of gnunet-service-namestore.c */