cache NICK results in namestore to avoid unnecessary load on the DB; improve sqlite...
[oweals/gnunet.git] / src / namestore / gnunet-service-namestore.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file namestore/gnunet-service-namestore.c
21  * @brief namestore for the GNUnet naming system
22  * @author Matthias Wachs
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - "get_nick_record" is a bottleneck, introduce a cache to
27  *   avoid looking it up again and again (for the same few
28  *   zones that the user will typically manage!)
29  * - run testcases, make sure everything works!
30  */
31 #include "platform.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_dnsparser_lib.h"
34 #include "gnunet_gns_service.h"
35 #include "gnunet_namecache_service.h"
36 #include "gnunet_namestore_service.h"
37 #include "gnunet_namestore_plugin.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_signatures.h"
40 #include "namestore.h"
41
42 #define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
43
44 /**
45  * If a monitor takes more than 1 minute to process an event, print a warning.
46  */
47 #define MONITOR_STALL_WARN_DELAY GNUNET_TIME_UNIT_MINUTES
48
49 /**
50  * Size of the cache used by #get_nick_record()
51  */
52 #define NC_SIZE 16
53
54 /**
55  * A namestore client
56  */
57 struct NamestoreClient;
58
59
60 /**
61  * A namestore iteration operation.
62  */
63 struct ZoneIteration
64 {
65   /**
66    * Next element in the DLL
67    */
68   struct ZoneIteration *next;
69
70   /**
71    * Previous element in the DLL
72    */
73   struct ZoneIteration *prev;
74
75   /**
76    * Namestore client which intiated this zone iteration
77    */
78   struct NamestoreClient *nc;
79
80   /**
81    * The nick to add to the records
82    */
83   struct GNUNET_GNSRECORD_Data *nick;
84
85   /**
86    * Key of the zone we are iterating over.
87    */
88   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
89
90   /**
91    * Last sequence number in the zone iteration used to address next
92    * result of the zone iteration in the store
93    *
94    * Initialy set to 0.
95    * Updated in #zone_iterate_proc()
96    */
97   uint64_t seq;
98
99   /**
100    * The operation id fot the zone iteration in the response for the client
101    */
102   uint32_t request_id;
103
104   /**
105    * Offset of the zone iteration used to address next result of the zone
106    * iteration in the store
107    *
108    * Initialy set to 0 in #handle_iteration_start
109    * Incremented with by every call to #handle_iteration_next
110    */
111   uint32_t offset;
112
113 };
114
115
116 /**
117  * A namestore client
118  */
119 struct NamestoreClient
120 {
121
122   /**
123    * The client
124    */
125   struct GNUNET_SERVICE_Client *client;
126
127   /**
128    * Message queue for transmission to @e client
129    */
130   struct GNUNET_MQ_Handle *mq;
131
132   /**
133    * Head of the DLL of
134    * Zone iteration operations in progress initiated by this client
135    */
136   struct ZoneIteration *op_head;
137
138   /**
139    * Tail of the DLL of
140    * Zone iteration operations in progress initiated by this client
141    */
142   struct ZoneIteration *op_tail;
143 };
144
145
146 /**
147  * A namestore monitor.
148  */
149 struct ZoneMonitor
150 {
151   /**
152    * Next element in the DLL
153    */
154   struct ZoneMonitor *next;
155
156   /**
157    * Previous element in the DLL
158    */
159   struct ZoneMonitor *prev;
160
161   /**
162    * Namestore client which intiated this zone monitor
163    */
164   struct NamestoreClient *nc;
165
166   /**
167    * Private key of the zone.
168    */
169   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
170
171   /**
172    * Task active during initial iteration.
173    */
174   struct GNUNET_SCHEDULER_Task *task;
175
176   /**
177    * Task to warn about slow monitors.
178    */
179   struct GNUNET_SCHEDULER_Task *sa_wait_warning;
180
181   /**
182    * Since when are we blocked on this monitor?
183    */
184   struct GNUNET_TIME_Absolute sa_waiting_start;
185
186   /**
187    * Last sequence number in the zone iteration used to address next
188    * result of the zone iteration in the store
189    *
190    * Initialy set to 0.
191    * Updated in #monitor_iterate_cb()
192    */
193   uint64_t seq;
194
195   /**
196    * Current limit of how many more messages we are allowed
197    * to queue to this monitor.
198    */
199   uint64_t limit;
200
201   /**
202    * How many more requests may we receive from the iterator
203    * before it is at the limit we gave it?  Will be below or
204    * equal to @e limit.  The effective limit for monitor
205    * events is thus @e iteration_cnt - @e limit!
206    */
207   uint64_t iteration_cnt;
208
209   /**
210    * Are we (still) in the initial iteration pass?
211    */
212   int in_first_iteration;
213
214   /**
215    * Is there a store activity waiting for this monitor?  We only raise the
216    * flag when it happens and search the DLL for the store activity when we
217    * had a limit increase.  If we cannot find any waiting store activity at
218    * that time, we clear the flag again.
219    */
220   int sa_waiting;
221
222 };
223
224
225 /**
226  * Pending operation on the namecache.
227  */
228 struct CacheOperation
229 {
230
231   /**
232    * Kept in a DLL.
233    */
234   struct CacheOperation *prev;
235
236   /**
237    * Kept in a DLL.
238    */
239   struct CacheOperation *next;
240
241   /**
242    * Handle to namecache queue.
243    */
244   struct GNUNET_NAMECACHE_QueueEntry *qe;
245
246   /**
247    * Client to notify about the result.
248    */
249   struct NamestoreClient *nc;
250
251   /**
252    * Client's request ID.
253    */
254   uint32_t rid;
255 };
256
257
258 /**
259  * Information for an ongoing #handle_record_store() operation.
260  * Needed as we may wait for monitors to be ready for the notification.
261  */
262 struct StoreActivity
263 {
264   /**
265    * Kept in a DLL.
266    */
267   struct StoreActivity *next;
268
269   /**
270    * Kept in a DLL.
271    */
272   struct StoreActivity *prev;
273
274   /**
275    * Which client triggered the store activity?
276    */
277   struct NamestoreClient *nc;
278
279   /**
280    * Copy of the original store message (as data fields in @e rd will
281    * point into it!).
282    */
283   const struct RecordStoreMessage *rsm;
284
285   /**
286    * Next zone monitor that still needs to be notified about this PUT.
287    */
288   struct ZoneMonitor *zm_pos;
289
290   /**
291    * Label nicely canonicalized (lower case).
292    */
293   char *conv_name;
294
295 };
296
297
298 /**
299  * Entry in list of cached nick resolutions.
300  */ 
301 struct NickCache
302 {
303   /**
304    * Zone the cache entry is for.
305    */ 
306   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
307
308   /**
309    * Cached record data.
310    */
311   struct GNUNET_GNSRECORD_Data *rd;
312
313   /**
314    * Timestamp when this cache entry was used last.
315    */
316   struct GNUNET_TIME_Absolute last_used;
317 };
318
319
320 /**
321  * We cache nick records to reduce DB load. 
322  */
323 static struct NickCache nick_cache[NC_SIZE];
324
325 /**
326  * Public key of all zeros.
327  */
328 static const struct GNUNET_CRYPTO_EcdsaPrivateKey zero;
329
330 /**
331  * Configuration handle.
332  */
333 static const struct GNUNET_CONFIGURATION_Handle *GSN_cfg;
334
335 /**
336  * Handle to the statistics service
337  */
338 static struct GNUNET_STATISTICS_Handle *statistics;
339
340 /**
341  * Namecache handle.
342  */
343 static struct GNUNET_NAMECACHE_Handle *namecache;
344
345 /**
346  * Database handle
347  */
348 static struct GNUNET_NAMESTORE_PluginFunctions *GSN_database;
349
350 /**
351  * Name of the database plugin
352  */
353 static char *db_lib_name;
354
355 /**
356  * Head of cop DLL.
357  */
358 static struct CacheOperation *cop_head;
359
360 /**
361  * Tail of cop DLL.
362  */
363 static struct CacheOperation *cop_tail;
364
365 /**
366  * First active zone monitor.
367  */
368 static struct ZoneMonitor *monitor_head;
369
370 /**
371  * Last active zone monitor.
372  */
373 static struct ZoneMonitor *monitor_tail;
374
375 /**
376  * Head of DLL of monitor-blocked store activities.
377  */
378 static struct StoreActivity *sa_head;
379
380 /**
381  * Tail of DLL of monitor-blocked store activities.
382  */
383 static struct StoreActivity *sa_tail;
384
385 /**
386  * Notification context shared by all monitors.
387  */
388 static struct GNUNET_NotificationContext *monitor_nc;
389
390 /**
391  * Optimize block insertion by caching map of private keys to
392  * public keys in memory?
393  */
394 static int cache_keys;
395
396 /**
397  * Use the namecache? Doing so creates additional cryptographic
398  * operations whenever we touch a record.
399  */
400 static int disable_namecache;
401
402
403 /**
404  * Task run during shutdown.
405  *
406  * @param cls unused
407  */
408 static void
409 cleanup_task (void *cls)
410 {
411   struct CacheOperation *cop;
412
413   (void) cls;
414   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
415               "Stopping namestore service\n");
416   while (NULL != (cop = cop_head))
417   {
418     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
419                 "Aborting incomplete namecache operation\n");
420     GNUNET_NAMECACHE_cancel (cop->qe);
421     GNUNET_CONTAINER_DLL_remove (cop_head,
422                                  cop_tail,
423                                  cop);
424     GNUNET_free (cop);
425   }
426   if (NULL != namecache)
427   {
428     GNUNET_NAMECACHE_disconnect (namecache);
429     namecache = NULL;
430   }
431   GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name,
432                                               GSN_database));
433   GNUNET_free (db_lib_name);
434   db_lib_name = NULL;
435   if (NULL != monitor_nc)
436   {
437     GNUNET_notification_context_destroy (monitor_nc);
438     monitor_nc = NULL;
439   }
440   if (NULL != statistics)
441   {
442     GNUNET_STATISTICS_destroy (statistics,
443                                GNUNET_NO);
444     statistics = NULL;
445   }
446 }
447
448
449 /**
450  * Release memory used by @a sa.
451  *
452  * @param sa activity to free
453  */
454 static void
455 free_store_activity (struct StoreActivity *sa)
456 {
457   GNUNET_CONTAINER_DLL_remove (sa_head,
458                                sa_tail,
459                                sa);
460   GNUNET_free (sa->conv_name);
461   GNUNET_free (sa);
462 }
463
464
465 /**
466  * Function called with the records for the #GNUNET_GNS_EMPTY_LABEL_AT
467  * label in the zone.  Used to locate the #GNUNET_GNSRECORD_TYPE_NICK
468  * record, which (if found) is then copied to @a cls for future use.
469  *
470  * @param cls a `struct GNUNET_GNSRECORD_Data **` for storing the nick (if found)
471  * @param seq sequence number of the record
472  * @param private_key the private key of the zone (unused)
473  * @param label should be #GNUNET_GNS_EMPTY_LABEL_AT
474  * @param rd_count number of records in @a rd
475  * @param rd records stored under @a label in the zone
476  */
477 static void
478 lookup_nick_it (void *cls,
479                 uint64_t seq,
480                 const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
481                 const char *label,
482                 unsigned int rd_count,
483                 const struct GNUNET_GNSRECORD_Data *rd)
484 {
485   struct GNUNET_GNSRECORD_Data **res = cls;
486
487   (void) private_key;
488   (void) seq;
489   if (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT))
490   {
491     GNUNET_break (0);
492     return;
493   }
494   for (unsigned int c = 0; c < rd_count; c++)
495   {
496     if (GNUNET_GNSRECORD_TYPE_NICK == rd[c].record_type)
497     {
498       (*res) = GNUNET_malloc (rd[c].data_size + sizeof (struct GNUNET_GNSRECORD_Data));
499       (*res)->data = &(*res)[1];
500       GNUNET_memcpy ((void *) (*res)->data,
501                      rd[c].data,
502                      rd[c].data_size);
503       (*res)->data_size = rd[c].data_size;
504       (*res)->expiration_time = rd[c].expiration_time;
505       (*res)->flags = rd[c].flags;
506       (*res)->record_type = GNUNET_GNSRECORD_TYPE_NICK;
507       return;
508     }
509   }
510   (*res) = NULL;
511 }
512
513
514 /**
515  * Add entry to the cache for @a zone and @a nick
516  *
517  * @param zone zone key to cache under
518  * @param nick nick entry to cache
519  */
520 static void
521 cache_nick (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone,
522             const struct GNUNET_GNSRECORD_Data *nick)
523 {
524   struct NickCache *oldest;
525
526   oldest = NULL;
527   for (unsigned int i=0;i<NC_SIZE;i++)
528   {
529     struct NickCache *pos = &nick_cache[i];
530
531     if ( (NULL == oldest) ||
532          (oldest->last_used.abs_value_us >
533           pos->last_used.abs_value_us) )
534       oldest = pos;
535     if (0 == memcmp (zone,
536                      &pos->zone,
537                      sizeof (*zone)))
538     {
539       oldest = pos;
540       break;
541     }
542   }
543   GNUNET_free_non_null (oldest->rd);
544   oldest->zone = *zone;
545   oldest->rd = GNUNET_malloc (sizeof (*nick) +
546                               nick->data_size);
547   *oldest->rd = *nick;
548   oldest->rd->data = &oldest->rd[1];
549   memcpy (&oldest->rd[1],
550           nick->data,
551           nick->data_size);
552   oldest->last_used = GNUNET_TIME_absolute_get ();
553 }
554
555
556 /**
557  * Return the NICK record for the zone (if it exists).
558  *
559  * @param zone private key for the zone to look for nick
560  * @return NULL if no NICK record was found
561  */
562 static struct GNUNET_GNSRECORD_Data *
563 get_nick_record (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone)
564 {
565   struct GNUNET_CRYPTO_EcdsaPublicKey pub;
566   struct GNUNET_GNSRECORD_Data *nick;
567   int res;
568
569   /* check cache first */
570   for (unsigned int i=0;i<NC_SIZE;i++)
571   {
572     struct NickCache *pos = &nick_cache[i];
573     if ( (NULL != pos->rd) &&
574          (0 == memcmp (zone,
575                        &pos->zone,
576                        sizeof (*zone))) )
577     {
578       nick = GNUNET_malloc (sizeof (*nick) +
579                             pos->rd->data_size);
580       *nick = *pos->rd;
581       nick->data = &nick[1];
582       memcpy (&nick[1],
583               pos->rd->data,
584               pos->rd->data_size);
585       pos->last_used = GNUNET_TIME_absolute_get ();
586       return nick;
587     }
588   }
589   
590   nick = NULL;
591   res = GSN_database->lookup_records (GSN_database->cls,
592                                       zone,
593                                       GNUNET_GNS_EMPTY_LABEL_AT,
594                                       &lookup_nick_it,
595                                       &nick);
596   if ( (GNUNET_OK != res) ||
597        (NULL == nick) )
598   {
599     GNUNET_CRYPTO_ecdsa_key_get_public (zone, &pub);
600     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
601                 "No nick name set for zone `%s'\n",
602                 GNUNET_GNSRECORD_z2s (&pub));
603     return NULL;
604   }
605
606   /* update cache */
607   cache_nick (zone,
608               nick);
609   return nick;
610 }
611
612
613 /**
614  * Merge the nick record @a nick_rd with the rest of the
615  * record set given in @a rd2.  Store the result in @a rdc_res
616  * and @a rd_res.  The @a nick_rd's expiration time is set to
617  * the maximum expiration time of all of the records in @a rd2.
618  *
619  * @param nick_rd the nick record to integrate
620  * @param rd2_length length of the @a rd2 array
621  * @param rd2 array of records
622  * @param rdc_res[out] length of the resulting @a rd_res array
623  * @param rd_res[out] set to an array of records,
624  *                    including @a nick_rd and @a rd2;
625  *           all of the variable-size 'data' fields in @a rd2 are
626  *           allocated in the same chunk of memory!
627  */
628 static void
629 merge_with_nick_records (const struct GNUNET_GNSRECORD_Data *nick_rd,
630                          unsigned int rd2_length,
631                          const struct GNUNET_GNSRECORD_Data *rd2,
632                          unsigned int *rdc_res,
633                          struct GNUNET_GNSRECORD_Data **rd_res)
634 {
635   uint64_t latest_expiration;
636   size_t req;
637   char *data;
638   size_t data_offset;
639   struct GNUNET_GNSRECORD_Data *target;
640
641   (*rdc_res) = 1 + rd2_length;
642   if (0 == 1 + rd2_length)
643   {
644     GNUNET_break (0);
645     (*rd_res) = NULL;
646     return;
647   }
648   req = sizeof (struct GNUNET_GNSRECORD_Data) + nick_rd->data_size;
649   for (unsigned int i=0; i<rd2_length; i++)
650   {
651     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
652
653     if (req + sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size < req)
654     {
655       GNUNET_break (0);
656       (*rd_res) = NULL;
657       return;
658     }
659     req += sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size;
660   }
661   target = GNUNET_malloc (req);
662   (*rd_res) = target;
663   data = (char *) &target[1 + rd2_length];
664   data_offset = 0;
665   latest_expiration = 0;
666   for (unsigned int i=0;i<rd2_length;i++)
667   {
668     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
669
670     if (0 != (orig->flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
671     {
672       if ((GNUNET_TIME_absolute_get().abs_value_us + orig->expiration_time) >
673           latest_expiration)
674         latest_expiration = orig->expiration_time;
675     }
676     else if (orig->expiration_time > latest_expiration)
677       latest_expiration = orig->expiration_time;
678     target[i] = *orig;
679     target[i].data = (void *) &data[data_offset];
680     GNUNET_memcpy (&data[data_offset],
681                    orig->data,
682                    orig->data_size);
683     data_offset += orig->data_size;
684   }
685   /* append nick */
686   target[rd2_length] = *nick_rd;
687   target[rd2_length].expiration_time = latest_expiration;
688   target[rd2_length].data = (void *) &data[data_offset];
689   GNUNET_memcpy (&data[data_offset],
690                  nick_rd->data,
691                  nick_rd->data_size);
692   data_offset += nick_rd->data_size;
693   GNUNET_assert (req ==
694                  (sizeof (struct GNUNET_GNSRECORD_Data)) * (*rdc_res) + data_offset);
695 }
696
697
698 /**
699  * Generate a `struct LookupNameResponseMessage` and send it to the
700  * given client using the given notification context.
701  *
702  * @param nc client to unicast to
703  * @param request_id request ID to use
704  * @param zone_key zone key of the zone
705  * @param name name
706  * @param rd_count number of records in @a rd
707  * @param rd array of records
708  */
709 static void
710 send_lookup_response (struct NamestoreClient *nc,
711                       uint32_t request_id,
712                       const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
713                       const char *name,
714                       unsigned int rd_count,
715                       const struct GNUNET_GNSRECORD_Data *rd)
716 {
717   struct GNUNET_MQ_Envelope *env;
718   struct RecordResultMessage *zir_msg;
719   struct GNUNET_GNSRECORD_Data *nick;
720   struct GNUNET_GNSRECORD_Data *res;
721   unsigned int res_count;
722   size_t name_len;
723   ssize_t rd_ser_len;
724   char *name_tmp;
725   char *rd_ser;
726
727   nick = get_nick_record (zone_key);
728   GNUNET_assert (-1 !=
729                  GNUNET_GNSRECORD_records_get_size (rd_count,
730                                                     rd));
731
732   if ( (NULL != nick) &&
733        (0 != strcmp (name,
734                      GNUNET_GNS_EMPTY_LABEL_AT)))
735   {
736     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
737     merge_with_nick_records (nick,
738                              rd_count,
739                              rd,
740                              &res_count,
741                              &res);
742     GNUNET_free (nick);
743   }
744   else
745   {
746     res_count = rd_count;
747     res = (struct GNUNET_GNSRECORD_Data *) rd;
748   }
749
750   GNUNET_assert (-1 !=
751                  GNUNET_GNSRECORD_records_get_size (res_count,
752                                                     res));
753
754
755   name_len = strlen (name) + 1;
756   rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count,
757                                                   res);
758   if (rd_ser_len < 0)
759   {
760     GNUNET_break (0);
761     GNUNET_SERVICE_client_drop (nc->client);
762     return;
763   }
764   if (((size_t) rd_ser_len) >= UINT16_MAX - name_len - sizeof (*zir_msg))
765   {
766     GNUNET_break (0);
767     GNUNET_SERVICE_client_drop (nc->client);
768     return;
769   }
770   env = GNUNET_MQ_msg_extra (zir_msg,
771                              name_len + rd_ser_len,
772                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
773   zir_msg->gns_header.r_id = htonl (request_id);
774   zir_msg->name_len = htons (name_len);
775   zir_msg->rd_count = htons (res_count);
776   zir_msg->rd_len = htons ((uint16_t) rd_ser_len);
777   zir_msg->private_key = *zone_key;
778   name_tmp = (char *) &zir_msg[1];
779   GNUNET_memcpy (name_tmp,
780                  name,
781                  name_len);
782   rd_ser = &name_tmp[name_len];
783   GNUNET_assert (rd_ser_len ==
784                  GNUNET_GNSRECORD_records_serialize (res_count,
785                                                      res,
786                                                      rd_ser_len,
787                                                      rd_ser));
788   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
789               "Sending RECORD_RESULT message with %u records\n",
790               res_count);
791   GNUNET_STATISTICS_update (statistics,
792                             "Record sets sent to clients",
793                             1,
794                             GNUNET_NO);
795   GNUNET_MQ_send (nc->mq,
796                   env);
797   if (rd != res)
798     GNUNET_free (res);
799 }
800
801
802 /**
803  * Send response to the store request to the client.
804  *
805  * @param client client to talk to
806  * @param res status of the operation
807  * @param rid client's request ID
808  */
809 static void
810 send_store_response (struct NamestoreClient *nc,
811                      int res,
812                      uint32_t rid)
813 {
814   struct GNUNET_MQ_Envelope *env;
815   struct RecordStoreResponseMessage *rcr_msg;
816
817   GNUNET_assert (NULL != nc);
818   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819               "Sending RECORD_STORE_RESPONSE message\n");
820   GNUNET_STATISTICS_update (statistics,
821                             "Store requests completed",
822                             1,
823                             GNUNET_NO);
824   env = GNUNET_MQ_msg (rcr_msg,
825                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE_RESPONSE);
826   rcr_msg->gns_header.r_id = htonl (rid);
827   rcr_msg->op_result = htonl (res);
828   GNUNET_MQ_send (nc->mq,
829                   env);
830 }
831
832
833 /**
834  * Cache operation complete, clean up.
835  *
836  * @param cls the `struct CacheOperation`
837  * @param success success
838  * @param emsg error messages
839  */
840 static void
841 finish_cache_operation (void *cls,
842                         int32_t success,
843                         const char *emsg)
844 {
845   struct CacheOperation *cop = cls;
846
847   if (NULL != emsg)
848     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
849                 _("Failed to replicate block in namecache: %s\n"),
850                 emsg);
851   else
852     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
853                 "CACHE operation completed\n");
854   GNUNET_CONTAINER_DLL_remove (cop_head,
855                                cop_tail,
856                                cop);
857   if (NULL != cop->nc)
858     send_store_response (cop->nc,
859                          success,
860                          cop->rid);
861   GNUNET_free (cop);
862 }
863
864
865 /**
866  * We just touched the plaintext information about a name in our zone;
867  * refresh the corresponding (encrypted) block in the namecache.
868  *
869  * @param nc client responsible for the request, can be NULL
870  * @param rid request ID of the client
871  * @param zone_key private key of the zone
872  * @param name label for the records
873  * @param rd_count number of records
874  * @param rd records stored under the given @a name
875  */
876 static void
877 refresh_block (struct NamestoreClient *nc,
878                uint32_t rid,
879                const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
880                const char *name,
881                unsigned int rd_count,
882                const struct GNUNET_GNSRECORD_Data *rd)
883 {
884   struct GNUNET_GNSRECORD_Block *block;
885   struct CacheOperation *cop;
886   struct GNUNET_CRYPTO_EcdsaPublicKey pkey;
887   struct GNUNET_GNSRECORD_Data *nick;
888   struct GNUNET_GNSRECORD_Data *res;
889   unsigned int res_count;
890   struct GNUNET_TIME_Absolute exp_time;
891
892   nick = get_nick_record (zone_key);
893   res_count = rd_count;
894   res = (struct GNUNET_GNSRECORD_Data *) rd; /* fixme: a bit unclean... */
895   if (NULL != nick)
896   {
897     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
898     merge_with_nick_records (nick,
899                              rd_count,rd,
900                              &res_count,
901                              &res);
902     GNUNET_free (nick);
903   }
904   if (0 == res_count)
905   {
906     if (NULL != nc)
907       send_store_response (nc,
908                            GNUNET_OK,
909                            rid);
910     return; /* no data, no need to update cache */
911   }
912   if (GNUNET_YES == disable_namecache)
913   {
914     GNUNET_STATISTICS_update (statistics,
915                               "Namecache updates skipped (NC disabled)",
916                               1,
917                               GNUNET_NO);
918     if (NULL != nc)
919       send_store_response (nc,
920                            GNUNET_OK,
921                            rid);
922     return;
923   }
924   exp_time = GNUNET_GNSRECORD_record_get_expiration_time (res_count,
925                                                           res);
926   if (cache_keys)
927     block = GNUNET_GNSRECORD_block_create2 (zone_key,
928                                             exp_time,
929                                             name,
930                                             res,
931                                             res_count);
932   else
933     block = GNUNET_GNSRECORD_block_create (zone_key,
934                                            exp_time,
935                                            name,
936                                            res,
937                                            res_count);
938   GNUNET_assert (NULL != block);
939   GNUNET_CRYPTO_ecdsa_key_get_public (zone_key,
940                                       &pkey);
941   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942               "Caching block for label `%s' with %u records and expiration %s in zone `%s' in namecache\n",
943               name,
944               res_count,
945               GNUNET_STRINGS_absolute_time_to_string (exp_time),
946               GNUNET_GNSRECORD_z2s (&pkey));
947   GNUNET_STATISTICS_update (statistics,
948                             "Namecache updates pushed",
949                             1,
950                             GNUNET_NO);
951   cop = GNUNET_new (struct CacheOperation);
952   cop->nc = nc;
953   cop->rid = rid;
954   GNUNET_CONTAINER_DLL_insert (cop_head,
955                                cop_tail,
956                                cop);
957   cop->qe = GNUNET_NAMECACHE_block_cache (namecache,
958                                           block,
959                                           &finish_cache_operation,
960                                           cop);
961   GNUNET_free (block);
962 }
963
964
965 /**
966  * Print a warning that one of our monitors is no longer reacting.
967  *
968  * @param cls a `struct ZoneMonitor` to warn about
969  */
970 static void
971 warn_monitor_slow (void *cls)
972 {
973   struct ZoneMonitor *zm = cls;
974
975   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
976               "No response from monitor since %s\n",
977               GNUNET_STRINGS_absolute_time_to_string (zm->sa_waiting_start));
978   zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
979                                                       &warn_monitor_slow,
980                                                       zm);
981 }
982
983
984 /**
985  * Continue processing the @a sa.
986  *
987  * @param sa store activity to process
988  */
989 static void
990 continue_store_activity (struct StoreActivity *sa)
991 {
992   const struct RecordStoreMessage *rp_msg = sa->rsm;
993   unsigned int rd_count;
994   size_t name_len;
995   size_t rd_ser_len;
996   uint32_t rid;
997   const char *name_tmp;
998   const char *rd_ser;
999
1000   rid = ntohl (rp_msg->gns_header.r_id);
1001   name_len = ntohs (rp_msg->name_len);
1002   rd_count = ntohs (rp_msg->rd_count);
1003   rd_ser_len = ntohs (rp_msg->rd_len);
1004   name_tmp = (const char *) &rp_msg[1];
1005   rd_ser = &name_tmp[name_len];
1006   {
1007     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1008
1009     /* We did this before, must succeed again */
1010     GNUNET_assert (GNUNET_OK ==
1011                    GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1012                                                          rd_ser,
1013                                                          rd_count,
1014                                                          rd));
1015
1016     for (struct ZoneMonitor *zm = sa->zm_pos;
1017          NULL != zm;
1018          zm = sa->zm_pos)
1019     {
1020       if ( (0 != memcmp (&rp_msg->private_key,
1021                          &zm->zone,
1022                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) &&
1023            (0 != memcmp (&zm->zone,
1024                          &zero,
1025                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) )
1026         {
1027           sa->zm_pos = zm->next; /* not interesting to this monitor */
1028           continue;
1029         }
1030       if (zm->limit == zm->iteration_cnt)
1031       {
1032         zm->sa_waiting = GNUNET_YES;
1033         zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
1034         if (NULL != zm->sa_wait_warning)
1035           GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1036         zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1037                                                             &warn_monitor_slow,
1038                                                             zm);
1039         return; /* blocked on zone monitor */
1040       }
1041       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1042                   "Notifying monitor about changes under label `%s'\n",
1043                   sa->conv_name);
1044       zm->limit--;
1045       send_lookup_response (zm->nc,
1046                             0,
1047                             &rp_msg->private_key,
1048                             sa->conv_name,
1049                             rd_count,
1050                             rd);
1051       sa->zm_pos = zm->next;
1052     }
1053     /* great, done with the monitors, unpack (again) for refresh_block operation */
1054     refresh_block (sa->nc,
1055                    rid,
1056                    &rp_msg->private_key,
1057                    sa->conv_name,
1058                    rd_count,
1059                    rd);
1060   }
1061   GNUNET_SERVICE_client_continue (sa->nc->client);
1062   free_store_activity (sa);
1063 }
1064
1065
1066 /**
1067  * Called whenever a client is disconnected.
1068  * Frees our resources associated with that client.
1069  *
1070  * @param cls closure
1071  * @param client identification of the client
1072  * @param app_ctx the `struct NamestoreClient` of @a client
1073  */
1074 static void
1075 client_disconnect_cb (void *cls,
1076                       struct GNUNET_SERVICE_Client *client,
1077                       void *app_ctx)
1078 {
1079   struct NamestoreClient *nc = app_ctx;
1080   struct ZoneIteration *no;
1081   struct CacheOperation *cop;
1082
1083   (void) cls;
1084   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085               "Client %p disconnected\n",
1086               client);
1087   for (struct ZoneMonitor *zm = monitor_head; NULL != zm; zm = zm->next)
1088   {
1089     struct StoreActivity *san;
1090
1091     if (nc != zm->nc)
1092       continue;
1093     GNUNET_CONTAINER_DLL_remove (monitor_head,
1094                                  monitor_tail,
1095                                  zm);
1096     if (NULL != zm->task)
1097     {
1098       GNUNET_SCHEDULER_cancel (zm->task);
1099       zm->task = NULL;
1100     }
1101     if (NULL != zm->sa_wait_warning)
1102     {
1103       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1104       zm->sa_wait_warning = NULL;
1105     }
1106     for (struct StoreActivity *sa = sa_head; NULL != sa; sa = san)
1107     {
1108       san = sa->next;
1109       if (zm == sa->zm_pos)
1110       {
1111         sa->zm_pos = zm->next;
1112         /* this may free sa */
1113         continue_store_activity (sa);
1114       }
1115     }
1116     GNUNET_free (zm);
1117     break;
1118   }
1119   for (struct StoreActivity *sa = sa_head; NULL != sa; sa = sa->next)
1120   {
1121     if (sa->nc == nc)
1122     {
1123       /* this may free sa */
1124       free_store_activity (sa);
1125       break; /* there can only be one per nc */
1126     }
1127   }
1128   while (NULL != (no = nc->op_head))
1129   {
1130     GNUNET_CONTAINER_DLL_remove (nc->op_head,
1131                                  nc->op_tail,
1132                                  no);
1133     GNUNET_free (no);
1134   }
1135   for (cop = cop_head; NULL != cop; cop = cop->next)
1136     if (nc == cop->nc)
1137       cop->nc = NULL;
1138   GNUNET_free (nc);
1139 }
1140
1141
1142 /**
1143  * Add a client to our list of active clients.
1144  *
1145  * @param cls NULL
1146  * @param client client to add
1147  * @param mq message queue for @a client
1148  * @return internal namestore client structure for this client
1149  */
1150 static void *
1151 client_connect_cb (void *cls,
1152                    struct GNUNET_SERVICE_Client *client,
1153                    struct GNUNET_MQ_Handle *mq)
1154 {
1155   struct NamestoreClient *nc;
1156
1157   (void) cls;
1158   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1159               "Client %p connected\n",
1160               client);
1161   nc = GNUNET_new (struct NamestoreClient);
1162   nc->client = client;
1163   nc->mq = mq;
1164   return nc;
1165 }
1166
1167
1168 /**
1169  * Closure for #lookup_it().
1170  */
1171 struct RecordLookupContext
1172 {
1173
1174   /**
1175    * FIXME.
1176    */
1177   const char *label;
1178
1179   /**
1180    * FIXME.
1181    */
1182   char *res_rd;
1183
1184   /**
1185    * FIXME.
1186    */
1187   struct GNUNET_GNSRECORD_Data *nick;
1188
1189   /**
1190    * FIXME.
1191    */
1192   int found;
1193
1194   /**
1195    * FIXME.
1196    */
1197   unsigned int res_rd_count;
1198
1199   /**
1200    * FIXME.
1201    */
1202   ssize_t rd_ser_len;
1203 };
1204
1205
1206 /**
1207  * FIXME.
1208  *
1209  * @param seq sequence number of the record
1210  */
1211 static void
1212 lookup_it (void *cls,
1213            uint64_t seq,
1214            const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
1215            const char *label,
1216            unsigned int rd_count,
1217            const struct GNUNET_GNSRECORD_Data *rd)
1218 {
1219   struct RecordLookupContext *rlc = cls;
1220
1221   (void) private_key;
1222   (void) seq;
1223   if (0 != strcmp (label,
1224                    rlc->label))
1225     return;
1226   rlc->found = GNUNET_YES;
1227   if (0 == rd_count)
1228   {
1229     rlc->rd_ser_len = 0;
1230     rlc->res_rd_count = 0;
1231     rlc->res_rd = NULL;
1232     return;
1233   }
1234   if ( (NULL != rlc->nick) &&
1235        (0 != strcmp (label,
1236                      GNUNET_GNS_EMPTY_LABEL_AT)) )
1237   {
1238     /* Merge */
1239     struct GNUNET_GNSRECORD_Data *rd_res;
1240     unsigned int rdc_res;
1241
1242     rd_res = NULL;
1243     rdc_res = 0;
1244     rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
1245     merge_with_nick_records (rlc->nick,
1246                              rd_count,
1247                              rd,
1248                              &rdc_res,
1249                              &rd_res);
1250     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res,
1251                                                          rd_res);
1252     if (rlc->rd_ser_len < 0)
1253     {
1254       GNUNET_break (0);
1255       GNUNET_free  (rd_res);
1256       rlc->found = GNUNET_NO;
1257       rlc->rd_ser_len = 0;
1258       return;
1259     }
1260     rlc->res_rd_count = rdc_res;
1261     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1262     if (rlc->rd_ser_len !=
1263         GNUNET_GNSRECORD_records_serialize (rdc_res,
1264                                             rd_res,
1265                                             rlc->rd_ser_len,
1266                                             rlc->res_rd))
1267     {
1268       GNUNET_break (0);
1269       GNUNET_free  (rlc->res_rd);
1270       rlc->res_rd = NULL;
1271       rlc->res_rd_count = 0;
1272       rlc->rd_ser_len = 0;
1273       GNUNET_free  (rd_res);
1274       rlc->found = GNUNET_NO;
1275       return;
1276     }
1277     GNUNET_free (rd_res);
1278     GNUNET_free (rlc->nick);
1279     rlc->nick = NULL;
1280   }
1281   else
1282   {
1283     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1284                                                          rd);
1285     if (rlc->rd_ser_len < 0)
1286     {
1287       GNUNET_break (0);
1288       rlc->found = GNUNET_NO;
1289       rlc->rd_ser_len = 0;
1290       return;
1291     }
1292     rlc->res_rd_count = rd_count;
1293     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1294     if (rlc->rd_ser_len !=
1295         GNUNET_GNSRECORD_records_serialize (rd_count,
1296                                             rd,
1297                                             rlc->rd_ser_len,
1298                                             rlc->res_rd))
1299     {
1300       GNUNET_break (0);
1301       GNUNET_free  (rlc->res_rd);
1302       rlc->res_rd = NULL;
1303       rlc->res_rd_count = 0;
1304       rlc->rd_ser_len = 0;
1305       rlc->found = GNUNET_NO;
1306       return;
1307     }
1308   }
1309 }
1310
1311
1312 /**
1313  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1314  *
1315  * @param cls client sending the message
1316  * @param ll_msg message of type `struct LabelLookupMessage`
1317  * @return #GNUNET_OK if @a ll_msg is well-formed
1318  */
1319 static int
1320 check_record_lookup (void *cls,
1321                      const struct LabelLookupMessage *ll_msg)
1322 {
1323   uint32_t name_len;
1324   size_t src_size;
1325
1326   (void) cls;
1327   name_len = ntohl (ll_msg->label_len);
1328   src_size = ntohs (ll_msg->gns_header.header.size);
1329   if (name_len != src_size - sizeof (struct LabelLookupMessage))
1330   {
1331     GNUNET_break (0);
1332     return GNUNET_SYSERR;
1333   }
1334   GNUNET_MQ_check_zero_termination (ll_msg);
1335   return GNUNET_OK;
1336 }
1337
1338
1339 /**
1340  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1341  *
1342  * @param cls client sending the message
1343  * @param ll_msg message of type `struct LabelLookupMessage`
1344  */
1345 static void
1346 handle_record_lookup (void *cls,
1347                       const struct LabelLookupMessage *ll_msg)
1348 {
1349   struct NamestoreClient *nc = cls;
1350   struct GNUNET_MQ_Envelope *env;
1351   struct LabelLookupResponseMessage *llr_msg;
1352   struct RecordLookupContext rlc;
1353   const char *name_tmp;
1354   char *res_name;
1355   char *conv_name;
1356   uint32_t name_len;
1357   int res;
1358
1359   name_len = ntohl (ll_msg->label_len);
1360   name_tmp = (const char *) &ll_msg[1];
1361   GNUNET_SERVICE_client_continue (nc->client);
1362   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1363               "Received NAMESTORE_RECORD_LOOKUP message for name `%s'\n",
1364               name_tmp);
1365
1366   conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1367   if (NULL == conv_name)
1368   {
1369     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1370                 "Error converting name `%s'\n",
1371                 name_tmp);
1372     GNUNET_SERVICE_client_drop (nc->client);
1373     return;
1374   }
1375   rlc.label = conv_name;
1376   rlc.found = GNUNET_NO;
1377   rlc.res_rd_count = 0;
1378   rlc.res_rd = NULL;
1379   rlc.rd_ser_len = 0;
1380   rlc.nick = get_nick_record (&ll_msg->zone);
1381   res = GSN_database->lookup_records (GSN_database->cls,
1382                                       &ll_msg->zone,
1383                                       conv_name,
1384                                       &lookup_it,
1385                                       &rlc);
1386   GNUNET_free (conv_name);
1387   env = GNUNET_MQ_msg_extra (llr_msg,
1388                              name_len + rlc.rd_ser_len,
1389                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP_RESPONSE);
1390   llr_msg->gns_header.r_id = ll_msg->gns_header.r_id;
1391   llr_msg->private_key = ll_msg->zone;
1392   llr_msg->name_len = htons (name_len);
1393   llr_msg->rd_count = htons (rlc.res_rd_count);
1394   llr_msg->rd_len = htons (rlc.rd_ser_len);
1395   res_name = (char *) &llr_msg[1];
1396   if  ((GNUNET_YES == rlc.found) && (GNUNET_OK == res))
1397     llr_msg->found = ntohs (GNUNET_YES);
1398   else
1399     llr_msg->found = ntohs (GNUNET_NO);
1400   GNUNET_memcpy (&llr_msg[1],
1401                  name_tmp,
1402                  name_len);
1403   GNUNET_memcpy (&res_name[name_len],
1404                  rlc.res_rd,
1405                  rlc.rd_ser_len);
1406   GNUNET_MQ_send (nc->mq,
1407                   env);
1408   GNUNET_free_non_null (rlc.res_rd);
1409 }
1410
1411
1412 /**
1413  * Checks a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1414  *
1415  * @param cls client sending the message
1416  * @param rp_msg message of type `struct RecordStoreMessage`
1417  * @return #GNUNET_OK if @a rp_msg is well-formed
1418  */
1419 static int
1420 check_record_store (void *cls,
1421                     const struct RecordStoreMessage *rp_msg)
1422 {
1423   size_t name_len;
1424   size_t msg_size;
1425   size_t msg_size_exp;
1426   size_t rd_ser_len;
1427   const char *name_tmp;
1428
1429   (void) cls;
1430   name_len = ntohs (rp_msg->name_len);
1431   msg_size = ntohs (rp_msg->gns_header.header.size);
1432   rd_ser_len = ntohs (rp_msg->rd_len);
1433   msg_size_exp = sizeof (struct RecordStoreMessage) + name_len + rd_ser_len;
1434   if (msg_size != msg_size_exp)
1435   {
1436     GNUNET_break (0);
1437     return GNUNET_SYSERR;
1438   }
1439   if ( (0 == name_len) ||
1440        (name_len > MAX_NAME_LEN) )
1441   {
1442     GNUNET_break (0);
1443     return GNUNET_SYSERR;
1444   }
1445   name_tmp = (const char *) &rp_msg[1];
1446   if ('\0' != name_tmp[name_len -1])
1447   {
1448     GNUNET_break (0);
1449     return GNUNET_SYSERR;
1450   }
1451   return GNUNET_OK;
1452 }
1453
1454
1455 /**
1456  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1457  *
1458  * @param cls client sending the message
1459  * @param rp_msg message of type `struct RecordStoreMessage`
1460  */
1461 static void
1462 handle_record_store (void *cls,
1463                      const struct RecordStoreMessage *rp_msg)
1464 {
1465   struct NamestoreClient *nc = cls;
1466   size_t name_len;
1467   size_t rd_ser_len;
1468   uint32_t rid;
1469   const char *name_tmp;
1470   char *conv_name;
1471   const char *rd_ser;
1472   unsigned int rd_count;
1473   int res;
1474   struct StoreActivity *sa;
1475
1476   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1477               "Received NAMESTORE_RECORD_STORE message\n");
1478   rid = ntohl (rp_msg->gns_header.r_id);
1479   name_len = ntohs (rp_msg->name_len);
1480   rd_count = ntohs (rp_msg->rd_count);
1481   rd_ser_len = ntohs (rp_msg->rd_len);
1482   GNUNET_break (0 == ntohs (rp_msg->reserved));
1483   name_tmp = (const char *) &rp_msg[1];
1484   rd_ser = &name_tmp[name_len];
1485   {
1486     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1487
1488     if (GNUNET_OK !=
1489         GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1490                                               rd_ser,
1491                                               rd_count,
1492                                               rd))
1493     {
1494       GNUNET_break (0);
1495       GNUNET_SERVICE_client_drop (nc->client);
1496       return;
1497     }
1498
1499     /* Extracting and converting private key */
1500     conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1501     if (NULL == conv_name)
1502     {
1503       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1504                   "Error converting name `%s'\n",
1505                   name_tmp);
1506       GNUNET_SERVICE_client_drop (nc->client);
1507       return;
1508     }
1509     GNUNET_STATISTICS_update (statistics,
1510                               "Well-formed store requests received",
1511                               1,
1512                               GNUNET_NO);
1513     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1514                 "Creating %u records for name `%s'\n",
1515                 (unsigned int) rd_count,
1516                 conv_name);
1517     if ( (0 == rd_count) &&
1518          (GNUNET_NO ==
1519           GSN_database->lookup_records (GSN_database->cls,
1520                                         &rp_msg->private_key,
1521                                         conv_name,
1522                                         NULL,
1523                                         0)) )
1524     {
1525       /* This name does not exist, so cannot be removed */
1526       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1527                   "Name `%s' does not exist, no deletion required\n",
1528                   conv_name);
1529       res = GNUNET_NO;
1530     }
1531     else
1532     {
1533       /* remove "NICK" records, unless this is for the
1534          #GNUNET_GNS_EMPTY_LABEL_AT label */
1535       struct GNUNET_GNSRECORD_Data rd_clean[GNUNET_NZL(rd_count)];
1536       unsigned int rd_clean_off;
1537
1538       rd_clean_off = 0;
1539       for (unsigned int i=0;i<rd_count;i++)
1540       {
1541         rd_clean[rd_clean_off] = rd[i];
1542         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1543                            conv_name)) ||
1544              (GNUNET_GNSRECORD_TYPE_NICK != rd[i].record_type) )
1545           rd_clean_off++;
1546         
1547         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1548                            conv_name)) &&
1549              (GNUNET_GNSRECORD_TYPE_NICK == rd[i].record_type) )
1550           cache_nick (&rp_msg->private_key,
1551                       &rd[i]);
1552       }
1553       res = GSN_database->store_records (GSN_database->cls,
1554                                          &rp_msg->private_key,
1555                                          conv_name,
1556                                          rd_clean_off,
1557                                          rd_clean);
1558     }
1559
1560     if (GNUNET_OK != res)
1561     {
1562       /* store not successful, not need to tell monitors */
1563       send_store_response (nc,
1564                            res,
1565                            rid);
1566       GNUNET_SERVICE_client_continue (nc->client);
1567       GNUNET_free (conv_name);
1568       return;
1569     }
1570
1571     sa = GNUNET_malloc (sizeof (struct StoreActivity) +
1572                         ntohs (rp_msg->gns_header.header.size));
1573     GNUNET_CONTAINER_DLL_insert (sa_head,
1574                                  sa_tail,
1575                                  sa);
1576     sa->nc = nc;
1577     sa->rsm = (const struct RecordStoreMessage *) &sa[1];
1578     GNUNET_memcpy (&sa[1],
1579                    rp_msg,
1580                    ntohs (rp_msg->gns_header.header.size));
1581     sa->zm_pos = monitor_head;
1582     sa->conv_name = conv_name;
1583     continue_store_activity (sa);
1584   }
1585 }
1586
1587
1588 /**
1589  * Context for record remove operations passed from #handle_zone_to_name to
1590  * #handle_zone_to_name_it as closure
1591  */
1592 struct ZoneToNameCtx
1593 {
1594   /**
1595    * Namestore client
1596    */
1597   struct NamestoreClient *nc;
1598
1599   /**
1600    * Request id (to be used in the response to the client).
1601    */
1602   uint32_t rid;
1603
1604   /**
1605    * Set to #GNUNET_OK on success, #GNUNET_SYSERR on error.  Note that
1606    * not finding a name for the zone still counts as a 'success' here,
1607    * as this field is about the success of executing the IPC protocol.
1608    */
1609   int success;
1610 };
1611
1612
1613 /**
1614  * Zone to name iterator
1615  *
1616  * @param cls struct ZoneToNameCtx *
1617  * @param seq sequence number of the record
1618  * @param zone_key the zone key
1619  * @param name name
1620  * @param rd_count number of records in @a rd
1621  * @param rd record data
1622  */
1623 static void
1624 handle_zone_to_name_it (void *cls,
1625                         uint64_t seq,
1626                         const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1627                         const char *name,
1628                         unsigned int rd_count,
1629                         const struct GNUNET_GNSRECORD_Data *rd)
1630 {
1631   struct ZoneToNameCtx *ztn_ctx = cls;
1632   struct GNUNET_MQ_Envelope *env;
1633   struct ZoneToNameResponseMessage *ztnr_msg;
1634   int16_t res;
1635   size_t name_len;
1636   ssize_t rd_ser_len;
1637   size_t msg_size;
1638   char *name_tmp;
1639   char *rd_tmp;
1640
1641   (void) seq;
1642   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1643               "Found result for zone-to-name lookup: `%s'\n",
1644               name);
1645   res = GNUNET_YES;
1646   name_len = (NULL == name) ? 0 : strlen (name) + 1;
1647   rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1648                                                   rd);
1649   if (rd_ser_len < 0)
1650   {
1651     GNUNET_break (0);
1652     ztn_ctx->success = GNUNET_SYSERR;
1653     return;
1654   }
1655   msg_size = sizeof (struct ZoneToNameResponseMessage) + name_len + rd_ser_len;
1656   if (msg_size >= GNUNET_MAX_MESSAGE_SIZE)
1657   {
1658     GNUNET_break (0);
1659     ztn_ctx->success = GNUNET_SYSERR;
1660     return;
1661   }
1662   env = GNUNET_MQ_msg_extra (ztnr_msg,
1663                              name_len + rd_ser_len,
1664                              GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1665   ztnr_msg->gns_header.header.size = htons (msg_size);
1666   ztnr_msg->gns_header.r_id = htonl (ztn_ctx->rid);
1667   ztnr_msg->res = htons (res);
1668   ztnr_msg->rd_len = htons (rd_ser_len);
1669   ztnr_msg->rd_count = htons (rd_count);
1670   ztnr_msg->name_len = htons (name_len);
1671   ztnr_msg->zone = *zone_key;
1672   name_tmp = (char *) &ztnr_msg[1];
1673   GNUNET_memcpy (name_tmp,
1674                  name,
1675                  name_len);
1676   rd_tmp = &name_tmp[name_len];
1677   GNUNET_assert (rd_ser_len ==
1678                  GNUNET_GNSRECORD_records_serialize (rd_count,
1679                                                      rd,
1680                                                      rd_ser_len,
1681                                                      rd_tmp));
1682   ztn_ctx->success = GNUNET_OK;
1683   GNUNET_MQ_send (ztn_ctx->nc->mq,
1684                   env);
1685 }
1686
1687
1688 /**
1689  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME message
1690  *
1691  * @param cls client client sending the message
1692  * @param ztn_msg message of type 'struct ZoneToNameMessage'
1693  */
1694 static void
1695 handle_zone_to_name (void *cls,
1696                      const struct ZoneToNameMessage *ztn_msg)
1697 {
1698   struct NamestoreClient *nc = cls;
1699   struct ZoneToNameCtx ztn_ctx;
1700   struct GNUNET_MQ_Envelope *env;
1701   struct ZoneToNameResponseMessage *ztnr_msg;
1702
1703   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1704               "Received ZONE_TO_NAME message\n");
1705   ztn_ctx.rid = ntohl (ztn_msg->gns_header.r_id);
1706   ztn_ctx.nc = nc;
1707   ztn_ctx.success = GNUNET_NO;
1708   if (GNUNET_SYSERR ==
1709       GSN_database->zone_to_name (GSN_database->cls,
1710                                   &ztn_msg->zone,
1711                                   &ztn_msg->value_zone,
1712                                   &handle_zone_to_name_it, &ztn_ctx))
1713   {
1714     /* internal error, hang up instead of signalling something
1715        that might be wrong */
1716     GNUNET_break (0);
1717     GNUNET_SERVICE_client_drop (nc->client);
1718     return;
1719   }
1720   if (GNUNET_NO == ztn_ctx.success)
1721   {
1722     /* no result found, send empty response */
1723     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1724                 "Found no result for zone-to-name lookup.\n");
1725     env = GNUNET_MQ_msg (ztnr_msg,
1726                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1727     ztnr_msg->gns_header.r_id = ztn_msg->gns_header.r_id;
1728     ztnr_msg->res = htons (GNUNET_NO);
1729     GNUNET_MQ_send (nc->mq,
1730                     env);
1731   }
1732   GNUNET_SERVICE_client_continue (nc->client);
1733 }
1734
1735
1736 /**
1737  * Context for record remove operations passed from
1738  * #run_zone_iteration_round to #zone_iterate_proc as closure
1739  */
1740 struct ZoneIterationProcResult
1741 {
1742   /**
1743    * The zone iteration handle
1744    */
1745   struct ZoneIteration *zi;
1746
1747   /**
1748    * Number of results left to be returned in this iteration.
1749    */
1750   uint64_t limit;
1751
1752 };
1753
1754
1755 /**
1756  * Process results for zone iteration from database
1757  *
1758  * @param cls struct ZoneIterationProcResult
1759  * @param seq sequence number of the record
1760  * @param zone_key the zone key
1761  * @param name name
1762  * @param rd_count number of records for this name
1763  * @param rd record data
1764  */
1765 static void
1766 zone_iterate_proc (void *cls,
1767                    uint64_t seq,
1768                    const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1769                    const char *name,
1770                    unsigned int rd_count,
1771                    const struct GNUNET_GNSRECORD_Data *rd)
1772 {
1773   struct ZoneIterationProcResult *proc = cls;
1774   int do_refresh_block;
1775
1776   if ( (NULL == zone_key) &&
1777        (NULL == name) )
1778   {
1779     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1780                 "Iteration done\n");
1781     return;
1782   }
1783   if ( (NULL == zone_key) ||
1784        (NULL == name) )
1785   {
1786     /* what is this!? should never happen */
1787     GNUNET_break (0);
1788     return;
1789   }
1790   if (0 == proc->limit)
1791   {
1792     /* what is this!? should never happen */
1793     GNUNET_break (0);
1794     return;
1795   }
1796   proc->limit--;
1797   proc->zi->seq = seq;
1798   send_lookup_response (proc->zi->nc,
1799                         proc->zi->request_id,
1800                         zone_key,
1801                         name,
1802                         rd_count,
1803                         rd);
1804
1805
1806   do_refresh_block = GNUNET_NO;
1807   for (unsigned int i=0;i<rd_count;i++)
1808     if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
1809     {
1810       do_refresh_block = GNUNET_YES;
1811       break;
1812     }
1813   if (GNUNET_YES == do_refresh_block)
1814     refresh_block (NULL,
1815                    0,
1816                    zone_key,
1817                    name,
1818                    rd_count,
1819                    rd);
1820 }
1821
1822
1823 /**
1824  * Perform the next round of the zone iteration.
1825  *
1826  * @param zi zone iterator to process
1827  * @param limit number of results to return in one pass
1828  */
1829 static void
1830 run_zone_iteration_round (struct ZoneIteration *zi,
1831                           uint64_t limit)
1832 {
1833   struct ZoneIterationProcResult proc;
1834   struct GNUNET_MQ_Envelope *env;
1835   struct GNUNET_NAMESTORE_Header *em;
1836   struct GNUNET_TIME_Absolute start;
1837   struct GNUNET_TIME_Relative duration;
1838
1839   memset (&proc,
1840           0,
1841           sizeof (proc));
1842   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1843               "Asked to return up to %llu records at position %llu\n",
1844               (unsigned long long) limit,
1845               (unsigned long long) zi->seq);
1846   proc.zi = zi;
1847   proc.limit = limit;
1848   start = GNUNET_TIME_absolute_get ();
1849   GNUNET_break (GNUNET_SYSERR !=
1850                 GSN_database->iterate_records (GSN_database->cls,
1851                                                (0 == memcmp (&zi->zone,
1852                                                              &zero,
1853                                                              sizeof (zero)))
1854                                                ? NULL
1855                                                : &zi->zone,
1856                                                zi->seq,
1857                                                limit,
1858                                                &zone_iterate_proc,
1859                                                &proc));
1860   duration = GNUNET_TIME_absolute_get_duration (start);
1861   duration = GNUNET_TIME_relative_divide (duration,
1862                                           limit - proc.limit);
1863   GNUNET_STATISTICS_set (statistics,
1864                          "NAMESTORE iteration delay (μs/record)",
1865                          duration.rel_value_us,
1866                          GNUNET_NO);
1867   if (0 == proc.limit)
1868   {
1869     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1870                 "Returned %llu results, more results available\n",
1871                 (unsigned long long) limit);
1872     return; /* more results later after we get the
1873                #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message */
1874   }
1875   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1876               "Completed iteration after %llu/%llu results\n",
1877               (unsigned long long) (limit - proc.limit),
1878               (unsigned long long) limit);
1879   /* send empty response to indicate end of list */
1880   env = GNUNET_MQ_msg (em,
1881                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END);
1882   em->r_id = htonl (zi->request_id);
1883   GNUNET_MQ_send (zi->nc->mq,
1884                   env);
1885   GNUNET_CONTAINER_DLL_remove (zi->nc->op_head,
1886                                zi->nc->op_tail,
1887                                zi);
1888   GNUNET_free (zi);
1889 }
1890
1891
1892 /**
1893  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START message
1894  *
1895  * @param cls the client sending the message
1896  * @param zis_msg message from the client
1897  */
1898 static void
1899 handle_iteration_start (void *cls,
1900                         const struct ZoneIterationStartMessage *zis_msg)
1901 {
1902   struct NamestoreClient *nc = cls;
1903   struct ZoneIteration *zi;
1904
1905   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906               "Received ZONE_ITERATION_START message\n");
1907   zi = GNUNET_new (struct ZoneIteration);
1908   zi->request_id = ntohl (zis_msg->gns_header.r_id);
1909   zi->offset = 0;
1910   zi->nc = nc;
1911   zi->zone = zis_msg->zone;
1912
1913   GNUNET_CONTAINER_DLL_insert (nc->op_head,
1914                                nc->op_tail,
1915                                zi);
1916   run_zone_iteration_round (zi,
1917                             1);
1918   GNUNET_SERVICE_client_continue (nc->client);
1919 }
1920
1921
1922 /**
1923  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP message
1924  *
1925  * @param cls the client sending the message
1926  * @param zis_msg message from the client
1927  */
1928 static void
1929 handle_iteration_stop (void *cls,
1930                        const struct ZoneIterationStopMessage *zis_msg)
1931 {
1932   struct NamestoreClient *nc = cls;
1933   struct ZoneIteration *zi;
1934   uint32_t rid;
1935
1936   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1937               "Received ZONE_ITERATION_STOP message\n");
1938   rid = ntohl (zis_msg->gns_header.r_id);
1939   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1940     if (zi->request_id == rid)
1941       break;
1942   if (NULL == zi)
1943   {
1944     GNUNET_break (0);
1945     GNUNET_SERVICE_client_drop (nc->client);
1946     return;
1947   }
1948   GNUNET_CONTAINER_DLL_remove (nc->op_head,
1949                                nc->op_tail,
1950                                zi);
1951   GNUNET_free (zi);
1952   GNUNET_SERVICE_client_continue (nc->client);
1953 }
1954
1955
1956 /**
1957  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message
1958  *
1959  * @param cls the client sending the message
1960  * @param message message from the client
1961  */
1962 static void
1963 handle_iteration_next (void *cls,
1964                        const struct ZoneIterationNextMessage *zis_msg)
1965 {
1966   struct NamestoreClient *nc = cls;
1967   struct ZoneIteration *zi;
1968   uint32_t rid;
1969   uint64_t limit;
1970
1971   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1972               "Received ZONE_ITERATION_NEXT message\n");
1973   GNUNET_STATISTICS_update (statistics,
1974                             "Iteration NEXT messages received",
1975                             1,
1976                             GNUNET_NO);
1977   rid = ntohl (zis_msg->gns_header.r_id);
1978   limit = GNUNET_ntohll (zis_msg->limit);
1979   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1980     if (zi->request_id == rid)
1981       break;
1982   if (NULL == zi)
1983   {
1984     GNUNET_break (0);
1985     GNUNET_SERVICE_client_drop (nc->client);
1986     return;
1987   }
1988   run_zone_iteration_round (zi,
1989                             limit);
1990   GNUNET_SERVICE_client_continue (nc->client);
1991 }
1992
1993
1994 /**
1995  * Function called when the monitor is ready for more data, and we
1996  * should thus unblock PUT operations that were blocked on the
1997  * monitor not being ready.
1998  */
1999 static void
2000 monitor_unblock (struct ZoneMonitor *zm)
2001 {
2002   struct StoreActivity *sa = sa_head;
2003
2004   while ( (NULL != sa) &&
2005           (zm->limit > zm->iteration_cnt) )
2006   {
2007     struct StoreActivity *sn = sa->next;
2008
2009     if (sa->zm_pos == zm)
2010       continue_store_activity (sa);
2011     sa = sn;
2012   }
2013   if (zm->limit > zm->iteration_cnt)
2014   {
2015     zm->sa_waiting = GNUNET_NO;
2016     if (NULL != zm->sa_wait_warning)
2017     {
2018       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2019       zm->sa_wait_warning = NULL;
2020     }
2021   }
2022   else if (GNUNET_YES == zm->sa_waiting)
2023   {
2024     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2025     if (NULL != zm->sa_wait_warning)
2026       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2027     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2028                                                         &warn_monitor_slow,
2029                                                         zm);
2030   }
2031 }
2032
2033
2034 /**
2035  * Send 'sync' message to zone monitor, we're now in sync.
2036  *
2037  * @param zm monitor that is now in sync
2038  */
2039 static void
2040 monitor_sync (struct ZoneMonitor *zm)
2041 {
2042   struct GNUNET_MQ_Envelope *env;
2043   struct GNUNET_MessageHeader *sync;
2044
2045   env = GNUNET_MQ_msg (sync,
2046                        GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC);
2047   GNUNET_MQ_send (zm->nc->mq,
2048                   env);
2049   /* mark iteration done */
2050   zm->in_first_iteration = GNUNET_NO;
2051   zm->iteration_cnt = 0;
2052   if ( (zm->limit > 0) &&
2053        (zm->sa_waiting) )
2054     monitor_unblock (zm);
2055 }
2056
2057
2058 /**
2059  * Obtain the next datum during the zone monitor's zone initial iteration.
2060  *
2061  * @param cls zone monitor that does its initial iteration
2062  */
2063 static void
2064 monitor_iteration_next (void *cls);
2065
2066
2067 /**
2068  * A #GNUNET_NAMESTORE_RecordIterator for monitors.
2069  *
2070  * @param cls a 'struct ZoneMonitor *' with information about the monitor
2071  * @param seq sequence number of the record
2072  * @param zone_key zone key of the zone
2073  * @param name name
2074  * @param rd_count number of records in @a rd
2075  * @param rd array of records
2076  */
2077 static void
2078 monitor_iterate_cb (void *cls,
2079                     uint64_t seq,
2080                     const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
2081                     const char *name,
2082                     unsigned int rd_count,
2083                     const struct GNUNET_GNSRECORD_Data *rd)
2084 {
2085   struct ZoneMonitor *zm = cls;
2086
2087   zm->seq = seq;
2088   GNUNET_assert (NULL != name);
2089   GNUNET_STATISTICS_update (statistics,
2090                             "Monitor notifications sent",
2091                             1,
2092                             GNUNET_NO);
2093   zm->limit--;
2094   zm->iteration_cnt--;
2095   send_lookup_response (zm->nc,
2096                         0,
2097                         zone_key,
2098                         name,
2099                         rd_count,
2100                         rd);
2101   if ( (0 == zm->iteration_cnt) &&
2102        (0 != zm->limit) )
2103   {
2104     /* We are done with the current iteration batch, AND the
2105        client would right now accept more, so go again! */
2106     GNUNET_assert (NULL == zm->task);
2107     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2108                                          zm);
2109   }
2110 }
2111
2112
2113 /**
2114  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START message
2115  *
2116  * @param cls the client sending the message
2117  * @param zis_msg message from the client
2118  */
2119 static void
2120 handle_monitor_start (void *cls,
2121                       const struct ZoneMonitorStartMessage *zis_msg)
2122 {
2123   struct NamestoreClient *nc = cls;
2124   struct ZoneMonitor *zm;
2125
2126   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2127               "Received ZONE_MONITOR_START message\n");
2128   zm = GNUNET_new (struct ZoneMonitor);
2129   zm->nc = nc;
2130   zm->zone = zis_msg->zone;
2131   zm->limit = 1;
2132   zm->in_first_iteration = (GNUNET_YES == ntohl (zis_msg->iterate_first));
2133   GNUNET_CONTAINER_DLL_insert (monitor_head,
2134                                monitor_tail,
2135                                zm);
2136   GNUNET_SERVICE_client_mark_monitor (nc->client);
2137   GNUNET_SERVICE_client_continue (nc->client);
2138   GNUNET_notification_context_add (monitor_nc,
2139                                    nc->mq);
2140   if (zm->in_first_iteration)
2141     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2142                                          zm);
2143   else
2144     monitor_sync (zm);
2145 }
2146
2147
2148 /**
2149  * Obtain the next datum during the zone monitor's zone initial iteration.
2150  *
2151  * @param cls zone monitor that does its initial iteration
2152  */
2153 static void
2154 monitor_iteration_next (void *cls)
2155 {
2156   struct ZoneMonitor *zm = cls;
2157   int ret;
2158
2159   zm->task = NULL;
2160   GNUNET_assert (0 == zm->iteration_cnt);
2161   if (zm->limit > 16)
2162     zm->iteration_cnt = zm->limit / 2; /* leave half for monitor events */
2163   else
2164     zm->iteration_cnt = zm->limit; /* use it all */
2165   ret = GSN_database->iterate_records (GSN_database->cls,
2166                                        (0 == memcmp (&zm->zone,
2167                                                      &zero,
2168                                                      sizeof (zero)))
2169                                        ? NULL
2170                                        : &zm->zone,
2171                                        zm->seq,
2172                                        zm->iteration_cnt,
2173                                        &monitor_iterate_cb,
2174                                        zm);
2175   if (GNUNET_SYSERR == ret)
2176   {
2177     GNUNET_SERVICE_client_drop (zm->nc->client);
2178     return;
2179   }
2180   if (GNUNET_NO == ret)
2181   {
2182     /* empty zone */
2183     monitor_sync (zm);
2184     return;
2185   }
2186 }
2187
2188
2189 /**
2190  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT message
2191  *
2192  * @param cls the client sending the message
2193  * @param nm message from the client
2194  */
2195 static void
2196 handle_monitor_next (void *cls,
2197                      const struct ZoneMonitorNextMessage *nm)
2198 {
2199   struct NamestoreClient *nc = cls;
2200   struct ZoneMonitor *zm;
2201   uint64_t inc;
2202
2203   inc = GNUNET_ntohll (nm->limit);
2204   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2205               "Received ZONE_MONITOR_NEXT message with limit %llu\n",
2206               (unsigned long long) inc);
2207   for (zm = monitor_head; NULL != zm; zm = zm->next)
2208     if (zm->nc == nc)
2209       break;
2210   if (NULL == zm)
2211   {
2212     GNUNET_break (0);
2213     GNUNET_SERVICE_client_drop (nc->client);
2214     return;
2215   }
2216   GNUNET_SERVICE_client_continue (nc->client);
2217   if (zm->limit + inc < zm->limit)
2218   {
2219     GNUNET_break (0);
2220     GNUNET_SERVICE_client_drop (nc->client);
2221     return;
2222   }
2223   zm->limit += inc;
2224   if ( (zm->in_first_iteration) &&
2225        (zm->limit == inc) )
2226   {
2227     /* We are still iterating, and the previous iteration must
2228        have stopped due to the client's limit, so continue it! */
2229     GNUNET_assert (NULL == zm->task);
2230     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2231                                          zm);
2232   }
2233   GNUNET_assert (zm->iteration_cnt <= zm->limit);
2234   if ( (zm->limit > zm->iteration_cnt) &&
2235        (zm->sa_waiting) )
2236   {
2237     monitor_unblock (zm);
2238   }
2239   else if (GNUNET_YES == zm->sa_waiting)
2240   {
2241     if (NULL != zm->sa_wait_warning)
2242       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2243     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2244     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2245                                                         &warn_monitor_slow,
2246                                                         zm);
2247   }
2248 }
2249
2250
2251 /**
2252  * Process namestore requests.
2253  *
2254  * @param cls closure
2255  * @param cfg configuration to use
2256  * @param service the initialized service
2257  */
2258 static void
2259 run (void *cls,
2260      const struct GNUNET_CONFIGURATION_Handle *cfg,
2261      struct GNUNET_SERVICE_Handle *service)
2262 {
2263   char *database;
2264
2265   (void) cls;
2266   (void) service;
2267   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2268               "Starting namestore service\n");
2269   cache_keys = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2270                                                      "namestore",
2271                                                      "CACHE_KEYS");
2272   disable_namecache = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2273                                                             "namecache",
2274                                                             "DISABLE");
2275   GSN_cfg = cfg;
2276   monitor_nc = GNUNET_notification_context_create (1);
2277   if (GNUNET_YES != disable_namecache)
2278   {
2279     namecache = GNUNET_NAMECACHE_connect (cfg);
2280     GNUNET_assert (NULL != namecache);
2281   }
2282   /* Loading database plugin */
2283   if (GNUNET_OK !=
2284       GNUNET_CONFIGURATION_get_value_string (cfg,
2285                                              "namestore",
2286                                              "database",
2287                                              &database))
2288     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2289                 "No database backend configured\n");
2290
2291   GNUNET_asprintf (&db_lib_name,
2292                    "libgnunet_plugin_namestore_%s",
2293                    database);
2294   GSN_database = GNUNET_PLUGIN_load (db_lib_name,
2295                                      (void *) GSN_cfg);
2296   GNUNET_free (database);
2297   statistics = GNUNET_STATISTICS_create ("namestore",
2298                                          cfg);
2299   GNUNET_SCHEDULER_add_shutdown (&cleanup_task,
2300                                  NULL);
2301   if (NULL == GSN_database)
2302   {
2303     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2304                 "Could not load database backend `%s'\n",
2305                 db_lib_name);
2306     GNUNET_SCHEDULER_shutdown ();
2307     return;
2308   }
2309 }
2310
2311
2312 /**
2313  * Define "main" method using service macro.
2314  */
2315 GNUNET_SERVICE_MAIN
2316 ("namestore",
2317  GNUNET_SERVICE_OPTION_NONE,
2318  &run,
2319  &client_connect_cb,
2320  &client_disconnect_cb,
2321  NULL,
2322  GNUNET_MQ_hd_var_size (record_store,
2323                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE,
2324                         struct RecordStoreMessage,
2325                         NULL),
2326  GNUNET_MQ_hd_var_size (record_lookup,
2327                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP,
2328                         struct LabelLookupMessage,
2329                         NULL),
2330  GNUNET_MQ_hd_fixed_size (zone_to_name,
2331                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME,
2332                           struct ZoneToNameMessage,
2333                           NULL),
2334  GNUNET_MQ_hd_fixed_size (iteration_start,
2335                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START,
2336                           struct ZoneIterationStartMessage,
2337                           NULL),
2338  GNUNET_MQ_hd_fixed_size (iteration_next,
2339                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT,
2340                           struct ZoneIterationNextMessage,
2341                           NULL),
2342  GNUNET_MQ_hd_fixed_size (iteration_stop,
2343                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP,
2344                           struct ZoneIterationStopMessage,
2345                           NULL),
2346  GNUNET_MQ_hd_fixed_size (monitor_start,
2347                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START,
2348                           struct ZoneMonitorStartMessage,
2349                           NULL),
2350  GNUNET_MQ_hd_fixed_size (monitor_next,
2351                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT,
2352                           struct ZoneMonitorNextMessage,
2353                           NULL),
2354  GNUNET_MQ_handler_end ());
2355
2356
2357 /* end of gnunet-service-namestore.c */