modify sqlite logic to accomodate rowcount starting from 1
[oweals/gnunet.git] / src / namestore / gnunet-service-namestore.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file namestore/gnunet-service-namestore.c
21  * @brief namestore for the GNUnet naming system
22  * @author Matthias Wachs
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - run testcases, make sure everything works!
27  */
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_dnsparser_lib.h"
31 #include "gnunet_gns_service.h"
32 #include "gnunet_namecache_service.h"
33 #include "gnunet_namestore_service.h"
34 #include "gnunet_namestore_plugin.h"
35 #include "gnunet_statistics_service.h"
36 #include "gnunet_signatures.h"
37 #include "namestore.h"
38
39 #define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
40
41 /**
42  * If a monitor takes more than 1 minute to process an event, print a warning.
43  */
44 #define MONITOR_STALL_WARN_DELAY GNUNET_TIME_UNIT_MINUTES
45
46
47 /**
48  * A namestore client
49  */
50 struct NamestoreClient;
51
52
53 /**
54  * A namestore iteration operation.
55  */
56 struct ZoneIteration
57 {
58   /**
59    * Next element in the DLL
60    */
61   struct ZoneIteration *next;
62
63   /**
64    * Previous element in the DLL
65    */
66   struct ZoneIteration *prev;
67
68   /**
69    * Namestore client which intiated this zone iteration
70    */
71   struct NamestoreClient *nc;
72
73   /**
74    * The nick to add to the records
75    */
76   struct GNUNET_GNSRECORD_Data *nick;
77
78   /**
79    * Key of the zone we are iterating over.
80    */
81   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
82
83   /**
84    * Last sequence number in the zone iteration used to address next
85    * result of the zone iteration in the store
86    *
87    * Initialy set to 0.
88    * Updated in #zone_iterate_proc()
89    */
90   uint64_t seq;
91
92   /**
93    * The operation id fot the zone iteration in the response for the client
94    */
95   uint32_t request_id;
96
97   /**
98    * Offset of the zone iteration used to address next result of the zone
99    * iteration in the store
100    *
101    * Initialy set to 0 in #handle_iteration_start
102    * Incremented with by every call to #handle_iteration_next
103    */
104   uint32_t offset;
105
106 };
107
108
109 /**
110  * A namestore client
111  */
112 struct NamestoreClient
113 {
114
115   /**
116    * The client
117    */
118   struct GNUNET_SERVICE_Client *client;
119
120   /**
121    * Message queue for transmission to @e client
122    */
123   struct GNUNET_MQ_Handle *mq;
124
125   /**
126    * Head of the DLL of
127    * Zone iteration operations in progress initiated by this client
128    */
129   struct ZoneIteration *op_head;
130
131   /**
132    * Tail of the DLL of
133    * Zone iteration operations in progress initiated by this client
134    */
135   struct ZoneIteration *op_tail;
136 };
137
138
139 /**
140  * A namestore monitor.
141  */
142 struct ZoneMonitor
143 {
144   /**
145    * Next element in the DLL
146    */
147   struct ZoneMonitor *next;
148
149   /**
150    * Previous element in the DLL
151    */
152   struct ZoneMonitor *prev;
153
154   /**
155    * Namestore client which intiated this zone monitor
156    */
157   struct NamestoreClient *nc;
158
159   /**
160    * Private key of the zone.
161    */
162   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
163
164   /**
165    * Task active during initial iteration.
166    */
167   struct GNUNET_SCHEDULER_Task *task;
168
169   /**
170    * Task to warn about slow monitors.
171    */
172   struct GNUNET_SCHEDULER_Task *sa_wait_warning;
173
174   /**
175    * Since when are we blocked on this monitor?
176    */
177   struct GNUNET_TIME_Absolute sa_waiting_start;
178
179   /**
180    * Last sequence number in the zone iteration used to address next
181    * result of the zone iteration in the store
182    *
183    * Initialy set to 0.
184    * Updated in #monitor_iterate_cb()
185    */
186   uint64_t seq;
187
188   /**
189    * Current limit of how many more messages we are allowed
190    * to queue to this monitor.
191    */
192   uint64_t limit;
193
194   /**
195    * How many more requests may we receive from the iterator
196    * before it is at the limit we gave it?  Will be below or
197    * equal to @e limit.  The effective limit for monitor
198    * events is thus @e iteration_cnt - @e limit!
199    */
200   uint64_t iteration_cnt;
201
202   /**
203    * Are we (still) in the initial iteration pass?
204    */
205   int in_first_iteration;
206
207   /**
208    * Is there a store activity waiting for this monitor?  We only raise the
209    * flag when it happens and search the DLL for the store activity when we
210    * had a limit increase.  If we cannot find any waiting store activity at
211    * that time, we clear the flag again.
212    */
213   int sa_waiting;
214
215 };
216
217
218 /**
219  * Pending operation on the namecache.
220  */
221 struct CacheOperation
222 {
223
224   /**
225    * Kept in a DLL.
226    */
227   struct CacheOperation *prev;
228
229   /**
230    * Kept in a DLL.
231    */
232   struct CacheOperation *next;
233
234   /**
235    * Handle to namecache queue.
236    */
237   struct GNUNET_NAMECACHE_QueueEntry *qe;
238
239   /**
240    * Client to notify about the result.
241    */
242   struct NamestoreClient *nc;
243
244   /**
245    * Client's request ID.
246    */
247   uint32_t rid;
248 };
249
250
251 /**
252  * Information for an ongoing #handle_record_store() operation.
253  * Needed as we may wait for monitors to be ready for the notification.
254  */
255 struct StoreActivity
256 {
257   /**
258    * Kept in a DLL.
259    */
260   struct StoreActivity *next;
261
262   /**
263    * Kept in a DLL.
264    */
265   struct StoreActivity *prev;
266
267   /**
268    * Which client triggered the store activity?
269    */
270   struct NamestoreClient *nc;
271
272   /**
273    * Copy of the original store message (as data fields in @e rd will
274    * point into it!).
275    */
276   const struct RecordStoreMessage *rsm;
277
278   /**
279    * Next zone monitor that still needs to be notified about this PUT.
280    */
281   struct ZoneMonitor *zm_pos;
282
283   /**
284    * Label nicely canonicalized (lower case).
285    */
286   char *conv_name;
287
288 };
289
290
291 /**
292  * Public key of all zeros.
293  */
294 static const struct GNUNET_CRYPTO_EcdsaPrivateKey zero;
295
296 /**
297  * Configuration handle.
298  */
299 static const struct GNUNET_CONFIGURATION_Handle *GSN_cfg;
300
301 /**
302  * Handle to the statistics service
303  */
304 static struct GNUNET_STATISTICS_Handle *statistics;
305
306 /**
307  * Namecache handle.
308  */
309 static struct GNUNET_NAMECACHE_Handle *namecache;
310
311 /**
312  * Database handle
313  */
314 static struct GNUNET_NAMESTORE_PluginFunctions *GSN_database;
315
316 /**
317  * Name of the database plugin
318  */
319 static char *db_lib_name;
320
321 /**
322  * Head of cop DLL.
323  */
324 static struct CacheOperation *cop_head;
325
326 /**
327  * Tail of cop DLL.
328  */
329 static struct CacheOperation *cop_tail;
330
331 /**
332  * First active zone monitor.
333  */
334 static struct ZoneMonitor *monitor_head;
335
336 /**
337  * Last active zone monitor.
338  */
339 static struct ZoneMonitor *monitor_tail;
340
341 /**
342  * Head of DLL of monitor-blocked store activities.
343  */
344 static struct StoreActivity *sa_head;
345
346 /**
347  * Tail of DLL of monitor-blocked store activities.
348  */
349 static struct StoreActivity *sa_tail;
350
351 /**
352  * Notification context shared by all monitors.
353  */
354 static struct GNUNET_NotificationContext *monitor_nc;
355
356 /**
357  * Optimize block insertion by caching map of private keys to
358  * public keys in memory?
359  */
360 static int cache_keys;
361
362 /**
363  * Use the namecache? Doing so creates additional cryptographic
364  * operations whenever we touch a record.
365  */
366 static int disable_namecache;
367
368
369 /**
370  * Task run during shutdown.
371  *
372  * @param cls unused
373  */
374 static void
375 cleanup_task (void *cls)
376 {
377   struct CacheOperation *cop;
378
379   (void) cls;
380   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
381               "Stopping namestore service\n");
382   while (NULL != (cop = cop_head))
383   {
384     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
385                 "Aborting incomplete namecache operation\n");
386     GNUNET_NAMECACHE_cancel (cop->qe);
387     GNUNET_CONTAINER_DLL_remove (cop_head,
388                                  cop_tail,
389                                  cop);
390     GNUNET_free (cop);
391   }
392   if (NULL != namecache)
393   {
394     GNUNET_NAMECACHE_disconnect (namecache);
395     namecache = NULL;
396   }
397   GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name,
398                                               GSN_database));
399   GNUNET_free (db_lib_name);
400   db_lib_name = NULL;
401   if (NULL != monitor_nc)
402   {
403     GNUNET_notification_context_destroy (monitor_nc);
404     monitor_nc = NULL;
405   }
406   if (NULL != statistics)
407   {
408     GNUNET_STATISTICS_destroy (statistics,
409                                GNUNET_NO);
410     statistics = NULL;
411   }
412 }
413
414
415 /**
416  * Release memory used by @a sa.
417  *
418  * @param sa activity to free
419  */
420 static void
421 free_store_activity (struct StoreActivity *sa)
422 {
423   GNUNET_CONTAINER_DLL_remove (sa_head,
424                                sa_tail,
425                                sa);
426   GNUNET_free (sa->conv_name);
427   GNUNET_free (sa);
428 }
429
430
431 /**
432  * Function called with the records for the #GNUNET_GNS_EMPTY_LABEL_AT
433  * label in the zone.  Used to locate the #GNUNET_GNSRECORD_TYPE_NICK
434  * record, which (if found) is then copied to @a cls for future use.
435  *
436  * @param cls a `struct GNUNET_GNSRECORD_Data **` for storing the nick (if found)
437  * @param seq sequence number of the record
438  * @param private_key the private key of the zone (unused)
439  * @param label should be #GNUNET_GNS_EMPTY_LABEL_AT
440  * @param rd_count number of records in @a rd
441  * @param rd records stored under @a label in the zone
442  */
443 static void
444 lookup_nick_it (void *cls,
445                 uint64_t seq,
446                 const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
447                 const char *label,
448                 unsigned int rd_count,
449                 const struct GNUNET_GNSRECORD_Data *rd)
450 {
451   struct GNUNET_GNSRECORD_Data **res = cls;
452
453   (void) private_key;
454   (void) seq;
455   if (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT))
456   {
457     GNUNET_break (0);
458     return;
459   }
460   for (unsigned int c = 0; c < rd_count; c++)
461   {
462     if (GNUNET_GNSRECORD_TYPE_NICK == rd[c].record_type)
463     {
464       (*res) = GNUNET_malloc (rd[c].data_size + sizeof (struct GNUNET_GNSRECORD_Data));
465       (*res)->data = &(*res)[1];
466       GNUNET_memcpy ((void *) (*res)->data,
467                      rd[c].data,
468                      rd[c].data_size);
469       (*res)->data_size = rd[c].data_size;
470       (*res)->expiration_time = rd[c].expiration_time;
471       (*res)->flags = rd[c].flags;
472       (*res)->record_type = GNUNET_GNSRECORD_TYPE_NICK;
473       return;
474     }
475   }
476   (*res) = NULL;
477 }
478
479
480 /**
481  * Return the NICK record for the zone (if it exists).
482  *
483  * @param zone private key for the zone to look for nick
484  * @return NULL if no NICK record was found
485  */
486 static struct GNUNET_GNSRECORD_Data *
487 get_nick_record (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone)
488 {
489   struct GNUNET_CRYPTO_EcdsaPublicKey pub;
490   struct GNUNET_GNSRECORD_Data *nick;
491   int res;
492
493   nick = NULL;
494   res = GSN_database->lookup_records (GSN_database->cls,
495                                       zone,
496                                       GNUNET_GNS_EMPTY_LABEL_AT,
497                                       &lookup_nick_it,
498                                       &nick);
499   if ( (GNUNET_OK != res) ||
500        (NULL == nick) )
501   {
502     GNUNET_CRYPTO_ecdsa_key_get_public (zone, &pub);
503     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
504                 "No nick name set for zone `%s'\n",
505                 GNUNET_GNSRECORD_z2s (&pub));
506     return NULL;
507   }
508   return nick;
509 }
510
511
512 /**
513  * Merge the nick record @a nick_rd with the rest of the
514  * record set given in @a rd2.  Store the result in @a rdc_res
515  * and @a rd_res.  The @a nick_rd's expiration time is set to
516  * the maximum expiration time of all of the records in @a rd2.
517  *
518  * @param nick_rd the nick record to integrate
519  * @param rd2_length length of the @a rd2 array
520  * @param rd2 array of records
521  * @param rdc_res[out] length of the resulting @a rd_res array
522  * @param rd_res[out] set to an array of records,
523  *                    including @a nick_rd and @a rd2;
524  *           all of the variable-size 'data' fields in @a rd2 are
525  *           allocated in the same chunk of memory!
526  */
527 static void
528 merge_with_nick_records (const struct GNUNET_GNSRECORD_Data *nick_rd,
529                          unsigned int rd2_length,
530                          const struct GNUNET_GNSRECORD_Data *rd2,
531                          unsigned int *rdc_res,
532                          struct GNUNET_GNSRECORD_Data **rd_res)
533 {
534   uint64_t latest_expiration;
535   size_t req;
536   char *data;
537   size_t data_offset;
538   struct GNUNET_GNSRECORD_Data *target;
539
540   (*rdc_res) = 1 + rd2_length;
541   if (0 == 1 + rd2_length)
542   {
543     GNUNET_break (0);
544     (*rd_res) = NULL;
545     return;
546   }
547   req = sizeof (struct GNUNET_GNSRECORD_Data) + nick_rd->data_size;
548   for (unsigned int i=0; i<rd2_length; i++)
549   {
550     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
551
552     if (req + sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size < req)
553     {
554       GNUNET_break (0);
555       (*rd_res) = NULL;
556       return;
557     }
558     req += sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size;
559   }
560   target = GNUNET_malloc (req);
561   (*rd_res) = target;
562   data = (char *) &target[1 + rd2_length];
563   data_offset = 0;
564   latest_expiration = 0;
565   for (unsigned int i=0;i<rd2_length;i++)
566   {
567     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
568
569     if (0 != (orig->flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
570     {
571       if ((GNUNET_TIME_absolute_get().abs_value_us + orig->expiration_time) >
572           latest_expiration)
573         latest_expiration = orig->expiration_time;
574     }
575     else if (orig->expiration_time > latest_expiration)
576       latest_expiration = orig->expiration_time;
577     target[i] = *orig;
578     target[i].data = (void *) &data[data_offset];
579     GNUNET_memcpy (&data[data_offset],
580                    orig->data,
581                    orig->data_size);
582     data_offset += orig->data_size;
583   }
584   /* append nick */
585   target[rd2_length] = *nick_rd;
586   target[rd2_length].expiration_time = latest_expiration;
587   target[rd2_length].data = (void *) &data[data_offset];
588   GNUNET_memcpy (&data[data_offset],
589                  nick_rd->data,
590                  nick_rd->data_size);
591   data_offset += nick_rd->data_size;
592   GNUNET_assert (req ==
593                  (sizeof (struct GNUNET_GNSRECORD_Data)) * (*rdc_res) + data_offset);
594 }
595
596
597 /**
598  * Generate a `struct LookupNameResponseMessage` and send it to the
599  * given client using the given notification context.
600  *
601  * @param nc client to unicast to
602  * @param request_id request ID to use
603  * @param zone_key zone key of the zone
604  * @param name name
605  * @param rd_count number of records in @a rd
606  * @param rd array of records
607  */
608 static void
609 send_lookup_response (struct NamestoreClient *nc,
610                       uint32_t request_id,
611                       const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
612                       const char *name,
613                       unsigned int rd_count,
614                       const struct GNUNET_GNSRECORD_Data *rd)
615 {
616   struct GNUNET_MQ_Envelope *env;
617   struct RecordResultMessage *zir_msg;
618   struct GNUNET_GNSRECORD_Data *nick;
619   struct GNUNET_GNSRECORD_Data *res;
620   unsigned int res_count;
621   size_t name_len;
622   ssize_t rd_ser_len;
623   char *name_tmp;
624   char *rd_ser;
625
626   nick = get_nick_record (zone_key);
627
628   GNUNET_assert (-1 !=
629                  GNUNET_GNSRECORD_records_get_size (rd_count,
630                                                     rd));
631
632   if ( (NULL != nick) &&
633        (0 != strcmp (name,
634                      GNUNET_GNS_EMPTY_LABEL_AT)))
635   {
636     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
637     merge_with_nick_records (nick,
638                              rd_count,
639                              rd,
640                              &res_count,
641                              &res);
642     GNUNET_free (nick);
643   }
644   else
645   {
646     res_count = rd_count;
647     res = (struct GNUNET_GNSRECORD_Data *) rd;
648   }
649
650   GNUNET_assert (-1 !=
651                  GNUNET_GNSRECORD_records_get_size (res_count,
652                                                     res));
653
654
655   name_len = strlen (name) + 1;
656   rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count,
657                                                   res);
658   if (rd_ser_len < 0)
659   {
660     GNUNET_break (0);
661     GNUNET_SERVICE_client_drop (nc->client);
662     return;
663   }
664   if (rd_ser_len >= UINT16_MAX - name_len - sizeof (*zir_msg))
665   {
666     GNUNET_break (0);
667     GNUNET_SERVICE_client_drop (nc->client);
668     return;
669   }
670   env = GNUNET_MQ_msg_extra (zir_msg,
671                              name_len + rd_ser_len,
672                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
673   zir_msg->gns_header.r_id = htonl (request_id);
674   zir_msg->name_len = htons (name_len);
675   zir_msg->rd_count = htons (res_count);
676   zir_msg->rd_len = htons ((uint16_t) rd_ser_len);
677   zir_msg->private_key = *zone_key;
678   name_tmp = (char *) &zir_msg[1];
679   GNUNET_memcpy (name_tmp,
680                  name,
681                  name_len);
682   rd_ser = &name_tmp[name_len];
683   GNUNET_assert (rd_ser_len ==
684                  GNUNET_GNSRECORD_records_serialize (res_count,
685                                                      res,
686                                                      rd_ser_len,
687                                                      rd_ser));
688   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
689               "Sending RECORD_RESULT message with %u records\n",
690               res_count);
691   GNUNET_STATISTICS_update (statistics,
692                             "Record sets sent to clients",
693                             1,
694                             GNUNET_NO);
695   GNUNET_MQ_send (nc->mq,
696                   env);
697   if (rd != res)
698     GNUNET_free (res);
699 }
700
701
702 /**
703  * Send response to the store request to the client.
704  *
705  * @param client client to talk to
706  * @param res status of the operation
707  * @param rid client's request ID
708  */
709 static void
710 send_store_response (struct NamestoreClient *nc,
711                      int res,
712                      uint32_t rid)
713 {
714   struct GNUNET_MQ_Envelope *env;
715   struct RecordStoreResponseMessage *rcr_msg;
716
717   GNUNET_assert (NULL != nc);
718   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719               "Sending RECORD_STORE_RESPONSE message\n");
720   GNUNET_STATISTICS_update (statistics,
721                             "Store requests completed",
722                             1,
723                             GNUNET_NO);
724   env = GNUNET_MQ_msg (rcr_msg,
725                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE_RESPONSE);
726   rcr_msg->gns_header.r_id = htonl (rid);
727   rcr_msg->op_result = htonl (res);
728   GNUNET_MQ_send (nc->mq,
729                   env);
730 }
731
732
733 /**
734  * Cache operation complete, clean up.
735  *
736  * @param cls the `struct CacheOperation`
737  * @param success success
738  * @param emsg error messages
739  */
740 static void
741 finish_cache_operation (void *cls,
742                         int32_t success,
743                         const char *emsg)
744 {
745   struct CacheOperation *cop = cls;
746
747   if (NULL != emsg)
748     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
749                 _("Failed to replicate block in namecache: %s\n"),
750                 emsg);
751   else
752     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
753                 "CACHE operation completed\n");
754   GNUNET_CONTAINER_DLL_remove (cop_head,
755                                cop_tail,
756                                cop);
757   if (NULL != cop->nc)
758     send_store_response (cop->nc,
759                          success,
760                          cop->rid);
761   GNUNET_free (cop);
762 }
763
764
765 /**
766  * We just touched the plaintext information about a name in our zone;
767  * refresh the corresponding (encrypted) block in the namecache.
768  *
769  * @param nc client responsible for the request, can be NULL
770  * @param rid request ID of the client
771  * @param zone_key private key of the zone
772  * @param name label for the records
773  * @param rd_count number of records
774  * @param rd records stored under the given @a name
775  */
776 static void
777 refresh_block (struct NamestoreClient *nc,
778                uint32_t rid,
779                const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
780                const char *name,
781                unsigned int rd_count,
782                const struct GNUNET_GNSRECORD_Data *rd)
783 {
784   struct GNUNET_GNSRECORD_Block *block;
785   struct CacheOperation *cop;
786   struct GNUNET_CRYPTO_EcdsaPublicKey pkey;
787   struct GNUNET_GNSRECORD_Data *nick;
788   struct GNUNET_GNSRECORD_Data *res;
789   unsigned int res_count;
790   struct GNUNET_TIME_Absolute exp_time;
791
792   nick = get_nick_record (zone_key);
793   res_count = rd_count;
794   res = (struct GNUNET_GNSRECORD_Data *) rd; /* fixme: a bit unclean... */
795   if (NULL != nick)
796   {
797     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
798     merge_with_nick_records (nick,
799                              rd_count,rd,
800                              &res_count,
801                              &res);
802     GNUNET_free (nick);
803   }
804   if (0 == res_count)
805   {
806     if (NULL != nc)
807       send_store_response (nc,
808                            GNUNET_OK,
809                            rid);
810     return; /* no data, no need to update cache */
811   }
812   if (GNUNET_YES == disable_namecache)
813   {
814     GNUNET_STATISTICS_update (statistics,
815                               "Namecache updates skipped (NC disabled)",
816                               1,
817                               GNUNET_NO);
818     if (NULL != nc)
819       send_store_response (nc,
820                            GNUNET_OK,
821                            rid);
822     return;
823   }
824   exp_time = GNUNET_GNSRECORD_record_get_expiration_time (res_count,
825                                                           res);
826   if (cache_keys)
827     block = GNUNET_GNSRECORD_block_create2 (zone_key,
828                                             exp_time,
829                                             name,
830                                             res,
831                                             res_count);
832   else
833     block = GNUNET_GNSRECORD_block_create (zone_key,
834                                            exp_time,
835                                            name,
836                                            res,
837                                            res_count);
838   GNUNET_assert (NULL != block);
839   GNUNET_CRYPTO_ecdsa_key_get_public (zone_key,
840                                       &pkey);
841   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842               "Caching block for label `%s' with %u records and expiration %s in zone `%s' in namecache\n",
843               name,
844               res_count,
845               GNUNET_STRINGS_absolute_time_to_string (exp_time),
846               GNUNET_GNSRECORD_z2s (&pkey));
847   GNUNET_STATISTICS_update (statistics,
848                             "Namecache updates pushed",
849                             1,
850                             GNUNET_NO);
851   cop = GNUNET_new (struct CacheOperation);
852   cop->nc = nc;
853   cop->rid = rid;
854   GNUNET_CONTAINER_DLL_insert (cop_head,
855                                cop_tail,
856                                cop);
857   cop->qe = GNUNET_NAMECACHE_block_cache (namecache,
858                                           block,
859                                           &finish_cache_operation,
860                                           cop);
861   GNUNET_free (block);
862 }
863
864
865 /**
866  * Print a warning that one of our monitors is no longer reacting.
867  *
868  * @param cls a `struct ZoneMonitor` to warn about
869  */
870 static void
871 warn_monitor_slow (void *cls)
872 {
873   struct ZoneMonitor *zm = cls;
874
875   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
876               "No response from monitor since %s\n",
877               GNUNET_STRINGS_absolute_time_to_string (zm->sa_waiting_start));
878   zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
879                                                       &warn_monitor_slow,
880                                                       zm);
881 }
882
883
884 /**
885  * Continue processing the @a sa.
886  *
887  * @param sa store activity to process
888  */
889 static void
890 continue_store_activity (struct StoreActivity *sa)
891 {
892   const struct RecordStoreMessage *rp_msg = sa->rsm;
893   unsigned int rd_count;
894   size_t name_len;
895   size_t rd_ser_len;
896   uint32_t rid;
897   const char *name_tmp;
898   const char *rd_ser;
899
900   rid = ntohl (rp_msg->gns_header.r_id);
901   name_len = ntohs (rp_msg->name_len);
902   rd_count = ntohs (rp_msg->rd_count);
903   rd_ser_len = ntohs (rp_msg->rd_len);
904   name_tmp = (const char *) &rp_msg[1];
905   rd_ser = &name_tmp[name_len];
906   {
907     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
908
909     /* We did this before, must succeed again */
910     GNUNET_assert (GNUNET_OK ==
911                    GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
912                                                          rd_ser,
913                                                          rd_count,
914                                                          rd));
915
916     for (struct ZoneMonitor *zm = sa->zm_pos;
917          NULL != zm;
918          zm = sa->zm_pos)
919     {
920       if ( (0 != memcmp (&rp_msg->private_key,
921                          &zm->zone,
922                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) &&
923            (0 != memcmp (&zm->zone,
924                          &zero,
925                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) )
926         {
927           sa->zm_pos = zm->next; /* not interesting to this monitor */
928           continue; // -- fails tests, but why not here?
929         }
930       if (zm->limit == zm->iteration_cnt)
931       {
932         zm->sa_waiting = GNUNET_YES;
933         zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
934         if (NULL != zm->sa_wait_warning)
935           GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
936         zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
937                                                             &warn_monitor_slow,
938                                                             zm);
939         return; /* blocked on zone monitor */
940       }
941       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942                   "Notifying monitor about changes under label `%s'\n",
943                   sa->conv_name);
944       zm->limit--;
945       send_lookup_response (zm->nc,
946                             0,
947                             &rp_msg->private_key,
948                             sa->conv_name,
949                             rd_count,
950                             rd);
951       sa->zm_pos = zm->next;
952     }
953     /* great, done with the monitors, unpack (again) for refresh_block operation */
954     refresh_block (sa->nc,
955                    rid,
956                    &rp_msg->private_key,
957                    sa->conv_name,
958                    rd_count,
959                    rd);
960   }
961   GNUNET_SERVICE_client_continue (sa->nc->client);
962   free_store_activity (sa);
963 }
964
965
966 /**
967  * Called whenever a client is disconnected.
968  * Frees our resources associated with that client.
969  *
970  * @param cls closure
971  * @param client identification of the client
972  * @param app_ctx the `struct NamestoreClient` of @a client
973  */
974 static void
975 client_disconnect_cb (void *cls,
976                       struct GNUNET_SERVICE_Client *client,
977                       void *app_ctx)
978 {
979   struct NamestoreClient *nc = app_ctx;
980   struct ZoneIteration *no;
981   struct CacheOperation *cop;
982
983   (void) cls;
984   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985               "Client %p disconnected\n",
986               client);
987   for (struct ZoneMonitor *zm = monitor_head; NULL != zm; zm = zm->next)
988   {
989     struct StoreActivity *san;
990
991     if (nc != zm->nc)
992       continue;
993     GNUNET_CONTAINER_DLL_remove (monitor_head,
994                                  monitor_tail,
995                                  zm);
996     if (NULL != zm->task)
997     {
998       GNUNET_SCHEDULER_cancel (zm->task);
999       zm->task = NULL;
1000     }
1001     if (NULL != zm->sa_wait_warning)
1002     {
1003       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1004       zm->sa_wait_warning = NULL;
1005     }
1006     for (struct StoreActivity *sa = sa_head; NULL != sa; sa = san)
1007     {
1008       san = sa->next;
1009       if (zm == sa->zm_pos)
1010       {
1011         sa->zm_pos = zm->next;
1012         /* this may free sa */
1013         continue_store_activity (sa);
1014       }
1015     }
1016     GNUNET_free (zm);
1017     break;
1018   }
1019   for (struct StoreActivity *sa = sa_head; NULL != sa; sa = sa->next)
1020   {
1021     if (sa->nc == nc)
1022     {
1023       /* this may free sa */
1024       free_store_activity (sa);
1025       break; /* there can only be one per nc */
1026     }
1027   }
1028   while (NULL != (no = nc->op_head))
1029   {
1030     GNUNET_CONTAINER_DLL_remove (nc->op_head,
1031                                  nc->op_tail,
1032                                  no);
1033     GNUNET_free (no);
1034   }
1035   for (cop = cop_head; NULL != cop; cop = cop->next)
1036     if (nc == cop->nc)
1037       cop->nc = NULL;
1038   GNUNET_free (nc);
1039 }
1040
1041
1042 /**
1043  * Add a client to our list of active clients.
1044  *
1045  * @param cls NULL
1046  * @param client client to add
1047  * @param mq message queue for @a client
1048  * @return internal namestore client structure for this client
1049  */
1050 static void *
1051 client_connect_cb (void *cls,
1052                    struct GNUNET_SERVICE_Client *client,
1053                    struct GNUNET_MQ_Handle *mq)
1054 {
1055   struct NamestoreClient *nc;
1056
1057   (void) cls;
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059               "Client %p connected\n",
1060               client);
1061   nc = GNUNET_new (struct NamestoreClient);
1062   nc->client = client;
1063   nc->mq = mq;
1064   return nc;
1065 }
1066
1067
1068 /**
1069  * Closure for #lookup_it().
1070  */
1071 struct RecordLookupContext
1072 {
1073
1074   /**
1075    * FIXME.
1076    */
1077   const char *label;
1078
1079   /**
1080    * FIXME.
1081    */
1082   char *res_rd;
1083
1084   /**
1085    * FIXME.
1086    */
1087   struct GNUNET_GNSRECORD_Data *nick;
1088
1089   /**
1090    * FIXME.
1091    */
1092   int found;
1093
1094   /**
1095    * FIXME.
1096    */
1097   unsigned int res_rd_count;
1098
1099   /**
1100    * FIXME.
1101    */
1102   ssize_t rd_ser_len;
1103 };
1104
1105
1106 /**
1107  * FIXME.
1108  *
1109  * @param seq sequence number of the record
1110  */
1111 static void
1112 lookup_it (void *cls,
1113            uint64_t seq,
1114            const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
1115            const char *label,
1116            unsigned int rd_count,
1117            const struct GNUNET_GNSRECORD_Data *rd)
1118 {
1119   struct RecordLookupContext *rlc = cls;
1120
1121   (void) private_key;
1122   (void) seq;
1123   if (0 != strcmp (label,
1124                    rlc->label))
1125     return;
1126   rlc->found = GNUNET_YES;
1127   if (0 == rd_count)
1128   {
1129     rlc->rd_ser_len = 0;
1130     rlc->res_rd_count = 0;
1131     rlc->res_rd = NULL;
1132     return;
1133   }
1134   if ( (NULL != rlc->nick) &&
1135        (0 != strcmp (label,
1136                      GNUNET_GNS_EMPTY_LABEL_AT)) )
1137   {
1138     /* Merge */
1139     struct GNUNET_GNSRECORD_Data *rd_res;
1140     unsigned int rdc_res;
1141
1142     rd_res = NULL;
1143     rdc_res = 0;
1144     rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
1145     merge_with_nick_records (rlc->nick,
1146                              rd_count,
1147                              rd,
1148                              &rdc_res,
1149                              &rd_res);
1150     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res,
1151                                                          rd_res);
1152     if (rlc->rd_ser_len < 0)
1153     {
1154       GNUNET_break (0);
1155       GNUNET_free  (rd_res);
1156       rlc->found = GNUNET_NO;
1157       rlc->rd_ser_len = 0;
1158       return;
1159     }
1160     rlc->res_rd_count = rdc_res;
1161     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1162     if (rlc->rd_ser_len !=
1163         GNUNET_GNSRECORD_records_serialize (rdc_res,
1164                                             rd_res,
1165                                             rlc->rd_ser_len,
1166                                             rlc->res_rd))
1167     {
1168       GNUNET_break (0);
1169       GNUNET_free  (rlc->res_rd);
1170       rlc->res_rd = NULL;
1171       rlc->res_rd_count = 0;
1172       rlc->rd_ser_len = 0;
1173       GNUNET_free  (rd_res);
1174       rlc->found = GNUNET_NO;
1175       return;
1176     }
1177     GNUNET_free (rd_res);
1178     GNUNET_free (rlc->nick);
1179     rlc->nick = NULL;
1180   }
1181   else
1182   {
1183     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1184                                                          rd);
1185     if (rlc->rd_ser_len < 0)
1186     {
1187       GNUNET_break (0);
1188       rlc->found = GNUNET_NO;
1189       rlc->rd_ser_len = 0;
1190       return;
1191     }
1192     rlc->res_rd_count = rd_count;
1193     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1194     if (rlc->rd_ser_len !=
1195         GNUNET_GNSRECORD_records_serialize (rd_count,
1196                                             rd,
1197                                             rlc->rd_ser_len,
1198                                             rlc->res_rd))
1199     {
1200       GNUNET_break (0);
1201       GNUNET_free  (rlc->res_rd);
1202       rlc->res_rd = NULL;
1203       rlc->res_rd_count = 0;
1204       rlc->rd_ser_len = 0;
1205       rlc->found = GNUNET_NO;
1206       return;
1207     }
1208   }
1209 }
1210
1211
1212 /**
1213  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1214  *
1215  * @param cls client sending the message
1216  * @param ll_msg message of type `struct LabelLookupMessage`
1217  * @return #GNUNET_OK if @a ll_msg is well-formed
1218  */
1219 static int
1220 check_record_lookup (void *cls,
1221                      const struct LabelLookupMessage *ll_msg)
1222 {
1223   uint32_t name_len;
1224   size_t src_size;
1225   const char *name_tmp;
1226
1227   (void) cls;
1228   name_len = ntohl (ll_msg->label_len);
1229   src_size = ntohs (ll_msg->gns_header.header.size);
1230   if (name_len != src_size - sizeof (struct LabelLookupMessage))
1231   {
1232     GNUNET_break (0);
1233     return GNUNET_SYSERR;
1234   }
1235
1236   name_tmp = (const char *) &ll_msg[1];
1237   if ('\0' != name_tmp[name_len -1])
1238   {
1239     GNUNET_break (0);
1240     return GNUNET_SYSERR;
1241   }
1242   return GNUNET_OK;
1243 }
1244
1245
1246 /**
1247  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1248  *
1249  * @param cls client sending the message
1250  * @param ll_msg message of type `struct LabelLookupMessage`
1251  */
1252 static void
1253 handle_record_lookup (void *cls,
1254                       const struct LabelLookupMessage *ll_msg)
1255 {
1256   struct NamestoreClient *nc = cls;
1257   struct GNUNET_MQ_Envelope *env;
1258   struct LabelLookupResponseMessage *llr_msg;
1259   struct RecordLookupContext rlc;
1260   const char *name_tmp;
1261   char *res_name;
1262   char *conv_name;
1263   uint32_t name_len;
1264   int res;
1265
1266   name_len = ntohl (ll_msg->label_len);
1267   name_tmp = (const char *) &ll_msg[1];
1268   GNUNET_SERVICE_client_continue (nc->client);
1269   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1270               "Received NAMESTORE_RECORD_LOOKUP message for name `%s'\n",
1271               name_tmp);
1272
1273   conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1274   if (NULL == conv_name)
1275   {
1276     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1277                 "Error converting name `%s'\n",
1278                 name_tmp);
1279     GNUNET_SERVICE_client_drop (nc->client);
1280     return;
1281   }
1282   rlc.label = conv_name;
1283   rlc.found = GNUNET_NO;
1284   rlc.res_rd_count = 0;
1285   rlc.res_rd = NULL;
1286   rlc.rd_ser_len = 0;
1287   rlc.nick = get_nick_record (&ll_msg->zone);
1288   res = GSN_database->lookup_records (GSN_database->cls,
1289                                       &ll_msg->zone,
1290                                       conv_name,
1291                                       &lookup_it,
1292                                       &rlc);
1293   GNUNET_free (conv_name);
1294   env = GNUNET_MQ_msg_extra (llr_msg,
1295                              name_len + rlc.rd_ser_len,
1296                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP_RESPONSE);
1297   llr_msg->gns_header.r_id = ll_msg->gns_header.r_id;
1298   llr_msg->private_key = ll_msg->zone;
1299   llr_msg->name_len = htons (name_len);
1300   llr_msg->rd_count = htons (rlc.res_rd_count);
1301   llr_msg->rd_len = htons (rlc.rd_ser_len);
1302   res_name = (char *) &llr_msg[1];
1303   if  ((GNUNET_YES == rlc.found) && (GNUNET_OK == res))
1304     llr_msg->found = ntohs (GNUNET_YES);
1305   else
1306     llr_msg->found = ntohs (GNUNET_NO);
1307   GNUNET_memcpy (&llr_msg[1],
1308                  name_tmp,
1309                  name_len);
1310   GNUNET_memcpy (&res_name[name_len],
1311                  rlc.res_rd,
1312                  rlc.rd_ser_len);
1313   GNUNET_MQ_send (nc->mq,
1314                   env);
1315   GNUNET_free_non_null (rlc.res_rd);
1316 }
1317
1318
1319 /**
1320  * Checks a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1321  *
1322  * @param cls client sending the message
1323  * @param rp_msg message of type `struct RecordStoreMessage`
1324  * @return #GNUNET_OK if @a rp_msg is well-formed
1325  */
1326 static int
1327 check_record_store (void *cls,
1328                     const struct RecordStoreMessage *rp_msg)
1329 {
1330   size_t name_len;
1331   size_t msg_size;
1332   size_t msg_size_exp;
1333   size_t rd_ser_len;
1334   const char *name_tmp;
1335
1336   (void) cls;
1337   name_len = ntohs (rp_msg->name_len);
1338   msg_size = ntohs (rp_msg->gns_header.header.size);
1339   rd_ser_len = ntohs (rp_msg->rd_len);
1340   msg_size_exp = sizeof (struct RecordStoreMessage) + name_len + rd_ser_len;
1341   if (msg_size != msg_size_exp)
1342   {
1343     GNUNET_break (0);
1344     return GNUNET_SYSERR;
1345   }
1346   if ( (0 == name_len) ||
1347        (name_len > MAX_NAME_LEN) )
1348   {
1349     GNUNET_break (0);
1350     return GNUNET_SYSERR;
1351   }
1352   name_tmp = (const char *) &rp_msg[1];
1353   if ('\0' != name_tmp[name_len -1])
1354   {
1355     GNUNET_break (0);
1356     return GNUNET_SYSERR;
1357   }
1358   return GNUNET_OK;
1359 }
1360
1361
1362 /**
1363  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1364  *
1365  * @param cls client sending the message
1366  * @param rp_msg message of type `struct RecordStoreMessage`
1367  */
1368 static void
1369 handle_record_store (void *cls,
1370                      const struct RecordStoreMessage *rp_msg)
1371 {
1372   struct NamestoreClient *nc = cls;
1373   size_t name_len;
1374   size_t rd_ser_len;
1375   uint32_t rid;
1376   const char *name_tmp;
1377   char *conv_name;
1378   const char *rd_ser;
1379   unsigned int rd_count;
1380   int res;
1381   struct StoreActivity *sa;
1382
1383   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1384               "Received NAMESTORE_RECORD_STORE message\n");
1385   rid = ntohl (rp_msg->gns_header.r_id);
1386   name_len = ntohs (rp_msg->name_len);
1387   rd_count = ntohs (rp_msg->rd_count);
1388   rd_ser_len = ntohs (rp_msg->rd_len);
1389   GNUNET_break (0 == ntohs (rp_msg->reserved));
1390   name_tmp = (const char *) &rp_msg[1];
1391   rd_ser = &name_tmp[name_len];
1392   {
1393     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1394
1395     if (GNUNET_OK !=
1396         GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1397                                               rd_ser,
1398                                               rd_count,
1399                                               rd))
1400     {
1401       GNUNET_break (0);
1402       GNUNET_SERVICE_client_drop (nc->client);
1403       return;
1404     }
1405
1406     /* Extracting and converting private key */
1407     conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1408     if (NULL == conv_name)
1409     {
1410       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1411                   "Error converting name `%s'\n",
1412                   name_tmp);
1413       GNUNET_SERVICE_client_drop (nc->client);
1414       return;
1415     }
1416     GNUNET_STATISTICS_update (statistics,
1417                               "Well-formed store requests received",
1418                               1,
1419                               GNUNET_NO);
1420     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1421                 "Creating %u records for name `%s'\n",
1422                 (unsigned int) rd_count,
1423                 conv_name);
1424     if ( (0 == rd_count) &&
1425          (GNUNET_NO ==
1426           GSN_database->lookup_records (GSN_database->cls,
1427                                         &rp_msg->private_key,
1428                                         conv_name,
1429                                         NULL,
1430                                         0)) )
1431     {
1432       /* This name does not exist, so cannot be removed */
1433       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1434                   "Name `%s' does not exist, no deletion required\n",
1435                   conv_name);
1436       res = GNUNET_NO;
1437     }
1438     else
1439     {
1440       /* remove "NICK" records, unless this is for the
1441          #GNUNET_GNS_EMPTY_LABEL_AT label */
1442       struct GNUNET_GNSRECORD_Data rd_clean[GNUNET_NZL(rd_count)];
1443       unsigned int rd_clean_off;
1444
1445       rd_clean_off = 0;
1446       for (unsigned int i=0;i<rd_count;i++)
1447       {
1448         rd_clean[rd_clean_off] = rd[i];
1449         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1450                            conv_name)) ||
1451              (GNUNET_GNSRECORD_TYPE_NICK != rd[i].record_type) )
1452           rd_clean_off++;
1453       }
1454       res = GSN_database->store_records (GSN_database->cls,
1455                                          &rp_msg->private_key,
1456                                          conv_name,
1457                                          rd_clean_off,
1458                                          rd_clean);
1459     }
1460
1461     if (GNUNET_OK != res)
1462     {
1463       /* store not successful, not need to tell monitors */
1464       send_store_response (nc,
1465                            res,
1466                            rid);
1467       GNUNET_SERVICE_client_continue (nc->client);
1468       GNUNET_free (conv_name);
1469       return;
1470     }
1471
1472     sa = GNUNET_malloc (sizeof (struct StoreActivity) +
1473                         ntohs (rp_msg->gns_header.header.size));
1474     GNUNET_CONTAINER_DLL_insert (sa_head,
1475                                  sa_tail,
1476                                  sa);
1477     sa->nc = nc;
1478     sa->rsm = (const struct RecordStoreMessage *) &sa[1];
1479     GNUNET_memcpy (&sa[1],
1480                    rp_msg,
1481                    ntohs (rp_msg->gns_header.header.size));
1482     sa->zm_pos = monitor_head;
1483     sa->conv_name = conv_name;
1484     continue_store_activity (sa);
1485   }
1486 }
1487
1488
1489 /**
1490  * Context for record remove operations passed from #handle_zone_to_name to
1491  * #handle_zone_to_name_it as closure
1492  */
1493 struct ZoneToNameCtx
1494 {
1495   /**
1496    * Namestore client
1497    */
1498   struct NamestoreClient *nc;
1499
1500   /**
1501    * Request id (to be used in the response to the client).
1502    */
1503   uint32_t rid;
1504
1505   /**
1506    * Set to #GNUNET_OK on success, #GNUNET_SYSERR on error.  Note that
1507    * not finding a name for the zone still counts as a 'success' here,
1508    * as this field is about the success of executing the IPC protocol.
1509    */
1510   int success;
1511 };
1512
1513
1514 /**
1515  * Zone to name iterator
1516  *
1517  * @param cls struct ZoneToNameCtx *
1518  * @param seq sequence number of the record
1519  * @param zone_key the zone key
1520  * @param name name
1521  * @param rd_count number of records in @a rd
1522  * @param rd record data
1523  */
1524 static void
1525 handle_zone_to_name_it (void *cls,
1526                         uint64_t seq,
1527                         const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1528                         const char *name,
1529                         unsigned int rd_count,
1530                         const struct GNUNET_GNSRECORD_Data *rd)
1531 {
1532   struct ZoneToNameCtx *ztn_ctx = cls;
1533   struct GNUNET_MQ_Envelope *env;
1534   struct ZoneToNameResponseMessage *ztnr_msg;
1535   int16_t res;
1536   size_t name_len;
1537   ssize_t rd_ser_len;
1538   size_t msg_size;
1539   char *name_tmp;
1540   char *rd_tmp;
1541
1542   (void) seq;
1543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1544               "Found result for zone-to-name lookup: `%s'\n",
1545               name);
1546   res = GNUNET_YES;
1547   name_len = (NULL == name) ? 0 : strlen (name) + 1;
1548   rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1549                                                   rd);
1550   if (rd_ser_len < 0)
1551   {
1552     GNUNET_break (0);
1553     ztn_ctx->success = GNUNET_SYSERR;
1554     return;
1555   }
1556   msg_size = sizeof (struct ZoneToNameResponseMessage) + name_len + rd_ser_len;
1557   if (msg_size >= GNUNET_MAX_MESSAGE_SIZE)
1558   {
1559     GNUNET_break (0);
1560     ztn_ctx->success = GNUNET_SYSERR;
1561     return;
1562   }
1563   env = GNUNET_MQ_msg_extra (ztnr_msg,
1564                              name_len + rd_ser_len,
1565                              GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1566   ztnr_msg->gns_header.header.size = htons (msg_size);
1567   ztnr_msg->gns_header.r_id = htonl (ztn_ctx->rid);
1568   ztnr_msg->res = htons (res);
1569   ztnr_msg->rd_len = htons (rd_ser_len);
1570   ztnr_msg->rd_count = htons (rd_count);
1571   ztnr_msg->name_len = htons (name_len);
1572   ztnr_msg->zone = *zone_key;
1573   name_tmp = (char *) &ztnr_msg[1];
1574   GNUNET_memcpy (name_tmp,
1575                  name,
1576                  name_len);
1577   rd_tmp = &name_tmp[name_len];
1578   GNUNET_assert (rd_ser_len ==
1579                  GNUNET_GNSRECORD_records_serialize (rd_count,
1580                                                      rd,
1581                                                      rd_ser_len,
1582                                                      rd_tmp));
1583   ztn_ctx->success = GNUNET_OK;
1584   GNUNET_MQ_send (ztn_ctx->nc->mq,
1585                   env);
1586 }
1587
1588
1589 /**
1590  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME message
1591  *
1592  * @param cls client client sending the message
1593  * @param ztn_msg message of type 'struct ZoneToNameMessage'
1594  */
1595 static void
1596 handle_zone_to_name (void *cls,
1597                      const struct ZoneToNameMessage *ztn_msg)
1598 {
1599   struct NamestoreClient *nc = cls;
1600   struct ZoneToNameCtx ztn_ctx;
1601   struct GNUNET_MQ_Envelope *env;
1602   struct ZoneToNameResponseMessage *ztnr_msg;
1603
1604   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1605               "Received ZONE_TO_NAME message\n");
1606   ztn_ctx.rid = ntohl (ztn_msg->gns_header.r_id);
1607   ztn_ctx.nc = nc;
1608   ztn_ctx.success = GNUNET_NO;
1609   if (GNUNET_SYSERR ==
1610       GSN_database->zone_to_name (GSN_database->cls,
1611                                   &ztn_msg->zone,
1612                                   &ztn_msg->value_zone,
1613                                   &handle_zone_to_name_it, &ztn_ctx))
1614   {
1615     /* internal error, hang up instead of signalling something
1616        that might be wrong */
1617     GNUNET_break (0);
1618     GNUNET_SERVICE_client_drop (nc->client);
1619     return;
1620   }
1621   if (GNUNET_NO == ztn_ctx.success)
1622   {
1623     /* no result found, send empty response */
1624     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1625                 "Found no result for zone-to-name lookup.\n");
1626     env = GNUNET_MQ_msg (ztnr_msg,
1627                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1628     ztnr_msg->gns_header.r_id = ztn_msg->gns_header.r_id;
1629     ztnr_msg->res = htons (GNUNET_NO);
1630     GNUNET_MQ_send (nc->mq,
1631                     env);
1632   }
1633   GNUNET_SERVICE_client_continue (nc->client);
1634 }
1635
1636
1637 /**
1638  * Context for record remove operations passed from
1639  * #run_zone_iteration_round to #zone_iterate_proc as closure
1640  */
1641 struct ZoneIterationProcResult
1642 {
1643   /**
1644    * The zone iteration handle
1645    */
1646   struct ZoneIteration *zi;
1647
1648   /**
1649    * Number of results left to be returned in this iteration.
1650    */
1651   uint64_t limit;
1652
1653 };
1654
1655
1656 /**
1657  * Process results for zone iteration from database
1658  *
1659  * @param cls struct ZoneIterationProcResult
1660  * @param seq sequence number of the record
1661  * @param zone_key the zone key
1662  * @param name name
1663  * @param rd_count number of records for this name
1664  * @param rd record data
1665  */
1666 static void
1667 zone_iterate_proc (void *cls,
1668                    uint64_t seq,
1669                    const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1670                    const char *name,
1671                    unsigned int rd_count,
1672                    const struct GNUNET_GNSRECORD_Data *rd)
1673 {
1674   struct ZoneIterationProcResult *proc = cls;
1675   int do_refresh_block;
1676
1677   if ( (NULL == zone_key) &&
1678        (NULL == name) )
1679   {
1680     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1681                 "Iteration done\n");
1682     return;
1683   }
1684   if ( (NULL == zone_key) ||
1685        (NULL == name) )
1686   {
1687     /* what is this!? should never happen */
1688     GNUNET_break (0);
1689     return;
1690   }
1691   if (0 == proc->limit)
1692   {
1693     /* what is this!? should never happen */
1694     GNUNET_break (0);
1695     return;
1696   }
1697   proc->limit--;
1698   proc->zi->seq = seq;
1699   send_lookup_response (proc->zi->nc,
1700                         proc->zi->request_id,
1701                         zone_key,
1702                         name,
1703                         rd_count,
1704                         rd);
1705
1706
1707   do_refresh_block = GNUNET_NO;
1708   for (unsigned int i=0;i<rd_count;i++)
1709     if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
1710     {
1711       do_refresh_block = GNUNET_YES;
1712       break;
1713     }
1714   if (GNUNET_YES == do_refresh_block)
1715     refresh_block (NULL,
1716                    0,
1717                    zone_key,
1718                    name,
1719                    rd_count,
1720                    rd);
1721 }
1722
1723
1724 /**
1725  * Perform the next round of the zone iteration.
1726  *
1727  * @param zi zone iterator to process
1728  * @param limit number of results to return in one pass
1729  */
1730 static void
1731 run_zone_iteration_round (struct ZoneIteration *zi,
1732                           uint64_t limit)
1733 {
1734   struct ZoneIterationProcResult proc;
1735   struct GNUNET_MQ_Envelope *env;
1736   struct RecordResultMessage *rrm;
1737   struct GNUNET_TIME_Absolute start;
1738   struct GNUNET_TIME_Relative duration;
1739
1740   memset (&proc,
1741           0,
1742           sizeof (proc));
1743   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1744               "Asked to return up to %llu records at position %llu\n",
1745               (unsigned long long) limit,
1746               (unsigned long long) zi->seq);
1747   proc.zi = zi;
1748   proc.limit = limit;
1749   start = GNUNET_TIME_absolute_get ();
1750   GNUNET_break (GNUNET_SYSERR !=
1751                 GSN_database->iterate_records (GSN_database->cls,
1752                                                (0 == memcmp (&zi->zone,
1753                                                              &zero,
1754                                                              sizeof (zero)))
1755                                                ? NULL
1756                                                : &zi->zone,
1757                                                zi->seq,
1758                                                limit,
1759                                                &zone_iterate_proc,
1760                                                &proc));
1761   duration = GNUNET_TIME_absolute_get_duration (start);
1762   duration = GNUNET_TIME_relative_divide (duration,
1763                                           limit - proc.limit);
1764   GNUNET_STATISTICS_set (statistics,
1765                          "NAMESTORE iteration delay (μs/record)",
1766                          duration.rel_value_us,
1767                          GNUNET_NO);
1768   if (0 == proc.limit)
1769   {
1770     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1771                 "Returned %llu results, more results available\n",
1772                 (unsigned long long) limit);
1773     return; /* more results later after we get the
1774 #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message */
1775   }
1776   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1777               "Completed iteration after %llu/%llu results\n",
1778               (unsigned long long) (limit - proc.limit),
1779               (unsigned long long) limit);
1780   /* send empty response to indicate end of list */
1781   env = GNUNET_MQ_msg (rrm,
1782                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
1783   rrm->gns_header.r_id = htonl (zi->request_id);
1784   GNUNET_MQ_send (zi->nc->mq,
1785                   env);
1786   GNUNET_CONTAINER_DLL_remove (zi->nc->op_head,
1787                                zi->nc->op_tail,
1788                                zi);
1789   GNUNET_free (zi);
1790 }
1791
1792
1793 /**
1794  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START message
1795  *
1796  * @param cls the client sending the message
1797  * @param zis_msg message from the client
1798  */
1799 static void
1800 handle_iteration_start (void *cls,
1801                         const struct ZoneIterationStartMessage *zis_msg)
1802 {
1803   struct NamestoreClient *nc = cls;
1804   struct ZoneIteration *zi;
1805
1806   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1807               "Received ZONE_ITERATION_START message\n");
1808   zi = GNUNET_new (struct ZoneIteration);
1809   zi->request_id = ntohl (zis_msg->gns_header.r_id);
1810   zi->offset = 0;
1811   zi->nc = nc;
1812   zi->zone = zis_msg->zone;
1813
1814   GNUNET_CONTAINER_DLL_insert (nc->op_head,
1815                                nc->op_tail,
1816                                zi);
1817   run_zone_iteration_round (zi,
1818                             1);
1819   GNUNET_SERVICE_client_continue (nc->client);
1820 }
1821
1822
1823 /**
1824  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP message
1825  *
1826  * @param cls the client sending the message
1827  * @param zis_msg message from the client
1828  */
1829 static void
1830 handle_iteration_stop (void *cls,
1831                        const struct ZoneIterationStopMessage *zis_msg)
1832 {
1833   struct NamestoreClient *nc = cls;
1834   struct ZoneIteration *zi;
1835   uint32_t rid;
1836
1837   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1838               "Received ZONE_ITERATION_STOP message\n");
1839   rid = ntohl (zis_msg->gns_header.r_id);
1840   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1841     if (zi->request_id == rid)
1842       break;
1843   if (NULL == zi)
1844   {
1845     GNUNET_break (0);
1846     GNUNET_SERVICE_client_drop (nc->client);
1847     return;
1848   }
1849   GNUNET_CONTAINER_DLL_remove (nc->op_head,
1850                                nc->op_tail,
1851                                zi);
1852   GNUNET_free (zi);
1853   GNUNET_SERVICE_client_continue (nc->client);
1854 }
1855
1856
1857 /**
1858  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message
1859  *
1860  * @param cls the client sending the message
1861  * @param message message from the client
1862  */
1863 static void
1864 handle_iteration_next (void *cls,
1865                        const struct ZoneIterationNextMessage *zis_msg)
1866 {
1867   struct NamestoreClient *nc = cls;
1868   struct ZoneIteration *zi;
1869   uint32_t rid;
1870   uint64_t limit;
1871
1872   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1873               "Received ZONE_ITERATION_NEXT message\n");
1874   GNUNET_STATISTICS_update (statistics,
1875                             "Iteration NEXT messages received",
1876                             1,
1877                             GNUNET_NO);
1878   rid = ntohl (zis_msg->gns_header.r_id);
1879   limit = GNUNET_ntohll (zis_msg->limit);
1880   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1881     if (zi->request_id == rid)
1882       break;
1883   if (NULL == zi)
1884   {
1885     GNUNET_break (0);
1886     GNUNET_SERVICE_client_drop (nc->client);
1887     return;
1888   }
1889   run_zone_iteration_round (zi,
1890                             limit);
1891   GNUNET_SERVICE_client_continue (nc->client);
1892 }
1893
1894
1895 /**
1896  * Function called when the monitor is ready for more data, and we
1897  * should thus unblock PUT operations that were blocked on the
1898  * monitor not being ready.
1899  */
1900 static void
1901 monitor_unblock (struct ZoneMonitor *zm)
1902 {
1903   struct StoreActivity *sa = sa_head;
1904
1905   while ( (NULL != sa) &&
1906           (zm->limit > zm->iteration_cnt) )
1907   {
1908     struct StoreActivity *sn = sa->next;
1909
1910     if (sa->zm_pos == zm)
1911       continue_store_activity (sa);
1912     sa = sn;
1913   }
1914   if (zm->limit > zm->iteration_cnt)
1915   {
1916     zm->sa_waiting = GNUNET_NO;
1917     if (NULL != zm->sa_wait_warning)
1918     {
1919       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1920       zm->sa_wait_warning = NULL;
1921     }
1922   }
1923   else if (GNUNET_YES == zm->sa_waiting)
1924   {
1925     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
1926     if (NULL != zm->sa_wait_warning)
1927       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1928     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1929                                                         &warn_monitor_slow,
1930                                                         zm);
1931   }
1932 }
1933
1934
1935 /**
1936  * Send 'sync' message to zone monitor, we're now in sync.
1937  *
1938  * @param zm monitor that is now in sync
1939  */
1940 static void
1941 monitor_sync (struct ZoneMonitor *zm)
1942 {
1943   struct GNUNET_MQ_Envelope *env;
1944   struct GNUNET_MessageHeader *sync;
1945
1946   env = GNUNET_MQ_msg (sync,
1947                        GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC);
1948   GNUNET_MQ_send (zm->nc->mq,
1949                   env);
1950   /* mark iteration done */
1951   zm->in_first_iteration = GNUNET_NO;
1952   zm->iteration_cnt = 0;
1953   if ( (zm->limit > 0) &&
1954        (zm->sa_waiting) )
1955     monitor_unblock (zm);
1956 }
1957
1958
1959 /**
1960  * Obtain the next datum during the zone monitor's zone initial iteration.
1961  *
1962  * @param cls zone monitor that does its initial iteration
1963  */
1964 static void
1965 monitor_iteration_next (void *cls);
1966
1967
1968 /**
1969  * A #GNUNET_NAMESTORE_RecordIterator for monitors.
1970  *
1971  * @param cls a 'struct ZoneMonitor *' with information about the monitor
1972  * @param seq sequence number of the record
1973  * @param zone_key zone key of the zone
1974  * @param name name
1975  * @param rd_count number of records in @a rd
1976  * @param rd array of records
1977  */
1978 static void
1979 monitor_iterate_cb (void *cls,
1980                     uint64_t seq,
1981                     const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1982                     const char *name,
1983                     unsigned int rd_count,
1984                     const struct GNUNET_GNSRECORD_Data *rd)
1985 {
1986   struct ZoneMonitor *zm = cls;
1987
1988   zm->seq = seq;
1989   GNUNET_assert (NULL != name);
1990   GNUNET_STATISTICS_update (statistics,
1991                             "Monitor notifications sent",
1992                             1,
1993                             GNUNET_NO);
1994   zm->limit--;
1995   zm->iteration_cnt--;
1996   send_lookup_response (zm->nc,
1997                         0,
1998                         zone_key,
1999                         name,
2000                         rd_count,
2001                         rd);
2002   if ( (0 == zm->iteration_cnt) &&
2003        (0 != zm->limit) )
2004   {
2005     /* We are done with the current iteration batch, AND the
2006        client would right now accept more, so go again! */
2007     GNUNET_assert (NULL == zm->task);
2008     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2009                                          zm);
2010   }
2011 }
2012
2013
2014 /**
2015  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START message
2016  *
2017  * @param cls the client sending the message
2018  * @param zis_msg message from the client
2019  */
2020 static void
2021 handle_monitor_start (void *cls,
2022                       const struct ZoneMonitorStartMessage *zis_msg)
2023 {
2024   struct NamestoreClient *nc = cls;
2025   struct ZoneMonitor *zm;
2026
2027   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2028               "Received ZONE_MONITOR_START message\n");
2029   zm = GNUNET_new (struct ZoneMonitor);
2030   zm->nc = nc;
2031   zm->zone = zis_msg->zone;
2032   zm->limit = 1;
2033   zm->in_first_iteration = (GNUNET_YES == ntohl (zis_msg->iterate_first));
2034   GNUNET_CONTAINER_DLL_insert (monitor_head,
2035                                monitor_tail,
2036                                zm);
2037   GNUNET_SERVICE_client_mark_monitor (nc->client);
2038   GNUNET_SERVICE_client_continue (nc->client);
2039   GNUNET_notification_context_add (monitor_nc,
2040                                    nc->mq);
2041   if (zm->in_first_iteration)
2042     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2043                                          zm);
2044   else
2045     monitor_sync (zm);
2046 }
2047
2048
2049 /**
2050  * Obtain the next datum during the zone monitor's zone initial iteration.
2051  *
2052  * @param cls zone monitor that does its initial iteration
2053  */
2054 static void
2055 monitor_iteration_next (void *cls)
2056 {
2057   struct ZoneMonitor *zm = cls;
2058   int ret;
2059
2060   zm->task = NULL;
2061   GNUNET_assert (0 == zm->iteration_cnt);
2062   if (zm->limit > 16)
2063     zm->iteration_cnt = zm->limit / 2; /* leave half for monitor events */
2064   else
2065     zm->iteration_cnt = zm->limit; /* use it all */
2066   ret = GSN_database->iterate_records (GSN_database->cls,
2067                                        (0 == memcmp (&zm->zone,
2068                                                      &zero,
2069                                                      sizeof (zero)))
2070                                        ? NULL
2071                                        : &zm->zone,
2072                                        zm->seq,
2073                                        zm->iteration_cnt,
2074                                        &monitor_iterate_cb,
2075                                        zm);
2076   if (GNUNET_SYSERR == ret)
2077   {
2078     GNUNET_SERVICE_client_drop (zm->nc->client);
2079     return;
2080   }
2081   if (GNUNET_NO == ret)
2082   {
2083     /* empty zone */
2084     monitor_sync (zm);
2085     return;
2086   }
2087 }
2088
2089
2090 /**
2091  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT message
2092  *
2093  * @param cls the client sending the message
2094  * @param nm message from the client
2095  */
2096 static void
2097 handle_monitor_next (void *cls,
2098                      const struct ZoneMonitorNextMessage *nm)
2099 {
2100   struct NamestoreClient *nc = cls;
2101   struct ZoneMonitor *zm;
2102   uint64_t inc;
2103
2104   inc = GNUNET_ntohll (nm->limit);
2105   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2106               "Received ZONE_MONITOR_NEXT message with limit %llu\n",
2107               (unsigned long long) inc);
2108   for (zm = monitor_head; NULL != zm; zm = zm->next)
2109     if (zm->nc == nc)
2110       break;
2111   if (NULL == zm)
2112   {
2113     GNUNET_break (0);
2114     GNUNET_SERVICE_client_drop (nc->client);
2115     return;
2116   }
2117   GNUNET_SERVICE_client_continue (nc->client);
2118   if (zm->limit + inc < zm->limit)
2119   {
2120     GNUNET_break (0);
2121     GNUNET_SERVICE_client_drop (nc->client);
2122     return;
2123   }
2124   zm->limit += inc;
2125   if ( (zm->in_first_iteration) &&
2126        (zm->limit == inc) )
2127   {
2128     /* We are still iterating, and the previous iteration must
2129        have stopped due to the client's limit, so continue it! */
2130     GNUNET_assert (NULL == zm->task);
2131     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2132                                          zm);
2133   }
2134   GNUNET_assert (zm->iteration_cnt <= zm->limit);
2135   if ( (zm->limit > zm->iteration_cnt) &&
2136        (zm->sa_waiting) )
2137   {
2138     monitor_unblock (zm);
2139   }
2140   else if (GNUNET_YES == zm->sa_waiting)
2141   {
2142     if (NULL != zm->sa_wait_warning)
2143       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2144     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2145     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2146                                                         &warn_monitor_slow,
2147                                                         zm);
2148   }
2149 }
2150
2151
2152 /**
2153  * Process namestore requests.
2154  *
2155  * @param cls closure
2156  * @param cfg configuration to use
2157  * @param service the initialized service
2158  */
2159 static void
2160 run (void *cls,
2161      const struct GNUNET_CONFIGURATION_Handle *cfg,
2162      struct GNUNET_SERVICE_Handle *service)
2163 {
2164   char *database;
2165
2166   (void) cls;
2167   (void) service;
2168   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2169               "Starting namestore service\n");
2170   cache_keys = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2171                                                      "namestore",
2172                                                      "CACHE_KEYS");
2173   disable_namecache = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2174                                                             "namecache",
2175                                                             "DISABLE");
2176   GSN_cfg = cfg;
2177   monitor_nc = GNUNET_notification_context_create (1);
2178   if (GNUNET_YES != disable_namecache)
2179   {
2180     namecache = GNUNET_NAMECACHE_connect (cfg);
2181     GNUNET_assert (NULL != namecache);
2182   }
2183   /* Loading database plugin */
2184   if (GNUNET_OK !=
2185       GNUNET_CONFIGURATION_get_value_string (cfg,
2186                                              "namestore",
2187                                              "database",
2188                                              &database))
2189     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2190                 "No database backend configured\n");
2191
2192   GNUNET_asprintf (&db_lib_name,
2193                    "libgnunet_plugin_namestore_%s",
2194                    database);
2195   GSN_database = GNUNET_PLUGIN_load (db_lib_name,
2196                                      (void *) GSN_cfg);
2197   GNUNET_free (database);
2198   statistics = GNUNET_STATISTICS_create ("namestore",
2199                                          cfg);
2200   GNUNET_SCHEDULER_add_shutdown (&cleanup_task,
2201                                  NULL);
2202   if (NULL == GSN_database)
2203   {
2204     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2205                 "Could not load database backend `%s'\n",
2206                 db_lib_name);
2207     GNUNET_SCHEDULER_shutdown ();
2208     return;
2209   }
2210 }
2211
2212
2213 /**
2214  * Define "main" method using service macro.
2215  */
2216 GNUNET_SERVICE_MAIN
2217 ("namestore",
2218  GNUNET_SERVICE_OPTION_NONE,
2219  &run,
2220  &client_connect_cb,
2221  &client_disconnect_cb,
2222  NULL,
2223  GNUNET_MQ_hd_var_size (record_store,
2224                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE,
2225                         struct RecordStoreMessage,
2226                         NULL),
2227  GNUNET_MQ_hd_var_size (record_lookup,
2228                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP,
2229                         struct LabelLookupMessage,
2230                         NULL),
2231  GNUNET_MQ_hd_fixed_size (zone_to_name,
2232                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME,
2233                           struct ZoneToNameMessage,
2234                           NULL),
2235  GNUNET_MQ_hd_fixed_size (iteration_start,
2236                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START,
2237                           struct ZoneIterationStartMessage,
2238                           NULL),
2239  GNUNET_MQ_hd_fixed_size (iteration_next,
2240                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT,
2241                           struct ZoneIterationNextMessage,
2242                           NULL),
2243  GNUNET_MQ_hd_fixed_size (iteration_stop,
2244                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP,
2245                           struct ZoneIterationStopMessage,
2246                           NULL),
2247  GNUNET_MQ_hd_fixed_size (monitor_start,
2248                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START,
2249                           struct ZoneMonitorStartMessage,
2250                           NULL),
2251  GNUNET_MQ_hd_fixed_size (monitor_next,
2252                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT,
2253                           struct ZoneMonitorNextMessage,
2254                           NULL),
2255  GNUNET_MQ_handler_end ());
2256
2257
2258 /* end of gnunet-service-namestore.c */