NAMESTORE/JSON: fix parsing exp and flags
[oweals/gnunet.git] / src / namestore / gnunet-service-namestore.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file namestore/gnunet-service-namestore.c
23  * @brief namestore for the GNUnet naming system
24  * @author Matthias Wachs
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - "get_nick_record" is a bottleneck, introduce a cache to
29  *   avoid looking it up again and again (for the same few
30  *   zones that the user will typically manage!)
31  * - run testcases, make sure everything works!
32  */
33 #include "platform.h"
34 #include "gnunet_util_lib.h"
35 #include "gnunet_dnsparser_lib.h"
36 #include "gnunet_gns_service.h"
37 #include "gnunet_namecache_service.h"
38 #include "gnunet_namestore_service.h"
39 #include "gnunet_namestore_plugin.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet_signatures.h"
42 #include "namestore.h"
43
44 #define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
45
46 /**
47  * If a monitor takes more than 1 minute to process an event, print a warning.
48  */
49 #define MONITOR_STALL_WARN_DELAY GNUNET_TIME_UNIT_MINUTES
50
51 /**
52  * Size of the cache used by #get_nick_record()
53  */
54 #define NC_SIZE 16
55
56 /**
57  * A namestore client
58  */
59 struct NamestoreClient;
60
61
62 /**
63  * A namestore iteration operation.
64  */
65 struct ZoneIteration
66 {
67   /**
68    * Next element in the DLL
69    */
70   struct ZoneIteration *next;
71
72   /**
73    * Previous element in the DLL
74    */
75   struct ZoneIteration *prev;
76
77   /**
78    * Namestore client which intiated this zone iteration
79    */
80   struct NamestoreClient *nc;
81
82   /**
83    * The nick to add to the records
84    */
85   struct GNUNET_GNSRECORD_Data *nick;
86
87   /**
88    * Key of the zone we are iterating over.
89    */
90   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
91
92   /**
93    * Last sequence number in the zone iteration used to address next
94    * result of the zone iteration in the store
95    *
96    * Initialy set to 0.
97    * Updated in #zone_iterate_proc()
98    */
99   uint64_t seq;
100
101   /**
102    * The operation id fot the zone iteration in the response for the client
103    */
104   uint32_t request_id;
105
106   /**
107    * Offset of the zone iteration used to address next result of the zone
108    * iteration in the store
109    *
110    * Initialy set to 0 in #handle_iteration_start
111    * Incremented with by every call to #handle_iteration_next
112    */
113   uint32_t offset;
114
115   /**
116    * Number of pending cache operations triggered by this zone iteration which we
117    * need to wait for before allowing the client to continue.
118    */
119   unsigned int cache_ops;
120
121   /**
122    * Set to #GNUNET_YES if the last iteration exhausted the limit set by the
123    * client and we should send the #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END
124    * message and free the data structure once @e cache_ops is zero.
125    */
126   int send_end;
127
128 };
129
130
131 /**
132  * A namestore client
133  */
134 struct NamestoreClient
135 {
136
137   /**
138    * The client
139    */
140   struct GNUNET_SERVICE_Client *client;
141
142   /**
143    * Message queue for transmission to @e client
144    */
145   struct GNUNET_MQ_Handle *mq;
146
147   /**
148    * Head of the DLL of
149    * Zone iteration operations in progress initiated by this client
150    */
151   struct ZoneIteration *op_head;
152
153   /**
154    * Tail of the DLL of
155    * Zone iteration operations in progress initiated by this client
156    */
157   struct ZoneIteration *op_tail;
158 };
159
160
161 /**
162  * A namestore monitor.
163  */
164 struct ZoneMonitor
165 {
166   /**
167    * Next element in the DLL
168    */
169   struct ZoneMonitor *next;
170
171   /**
172    * Previous element in the DLL
173    */
174   struct ZoneMonitor *prev;
175
176   /**
177    * Namestore client which intiated this zone monitor
178    */
179   struct NamestoreClient *nc;
180
181   /**
182    * Private key of the zone.
183    */
184   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
185
186   /**
187    * Task active during initial iteration.
188    */
189   struct GNUNET_SCHEDULER_Task *task;
190
191   /**
192    * Task to warn about slow monitors.
193    */
194   struct GNUNET_SCHEDULER_Task *sa_wait_warning;
195
196   /**
197    * Since when are we blocked on this monitor?
198    */
199   struct GNUNET_TIME_Absolute sa_waiting_start;
200
201   /**
202    * Last sequence number in the zone iteration used to address next
203    * result of the zone iteration in the store
204    *
205    * Initialy set to 0.
206    * Updated in #monitor_iterate_cb()
207    */
208   uint64_t seq;
209
210   /**
211    * Current limit of how many more messages we are allowed
212    * to queue to this monitor.
213    */
214   uint64_t limit;
215
216   /**
217    * How many more requests may we receive from the iterator
218    * before it is at the limit we gave it?  Will be below or
219    * equal to @e limit.  The effective limit for monitor
220    * events is thus @e iteration_cnt - @e limit!
221    */
222   uint64_t iteration_cnt;
223
224   /**
225    * Are we (still) in the initial iteration pass?
226    */
227   int in_first_iteration;
228
229   /**
230    * Is there a store activity waiting for this monitor?  We only raise the
231    * flag when it happens and search the DLL for the store activity when we
232    * had a limit increase.  If we cannot find any waiting store activity at
233    * that time, we clear the flag again.
234    */
235   int sa_waiting;
236
237 };
238
239
240 /**
241  * Pending operation on the namecache.
242  */
243 struct CacheOperation
244 {
245
246   /**
247    * Kept in a DLL.
248    */
249   struct CacheOperation *prev;
250
251   /**
252    * Kept in a DLL.
253    */
254   struct CacheOperation *next;
255
256   /**
257    * Handle to namecache queue.
258    */
259   struct GNUNET_NAMECACHE_QueueEntry *qe;
260
261   /**
262    * Client to notify about the result, can be NULL.
263    */
264   struct NamestoreClient *nc;
265
266   /**
267    * Zone iteration to call #zone_iteration_done_client_continue()
268    * for if applicable, can be NULL.
269    */
270   struct ZoneIteration *zi;
271
272   /**
273    * Client's request ID.
274    */
275   uint32_t rid;
276 };
277
278
279 /**
280  * Information for an ongoing #handle_record_store() operation.
281  * Needed as we may wait for monitors to be ready for the notification.
282  */
283 struct StoreActivity
284 {
285   /**
286    * Kept in a DLL.
287    */
288   struct StoreActivity *next;
289
290   /**
291    * Kept in a DLL.
292    */
293   struct StoreActivity *prev;
294
295   /**
296    * Which client triggered the store activity?
297    */
298   struct NamestoreClient *nc;
299
300   /**
301    * Copy of the original store message (as data fields in @e rd will
302    * point into it!).
303    */
304   const struct RecordStoreMessage *rsm;
305
306   /**
307    * Next zone monitor that still needs to be notified about this PUT.
308    */
309   struct ZoneMonitor *zm_pos;
310
311   /**
312    * Label nicely canonicalized (lower case).
313    */
314   char *conv_name;
315
316 };
317
318
319 /**
320  * Entry in list of cached nick resolutions.
321  */
322 struct NickCache
323 {
324   /**
325    * Zone the cache entry is for.
326    */
327   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
328
329   /**
330    * Cached record data.
331    */
332   struct GNUNET_GNSRECORD_Data *rd;
333
334   /**
335    * Timestamp when this cache entry was used last.
336    */
337   struct GNUNET_TIME_Absolute last_used;
338 };
339
340
341 /**
342  * We cache nick records to reduce DB load.
343  */
344 static struct NickCache nick_cache[NC_SIZE];
345
346 /**
347  * Public key of all zeros.
348  */
349 static const struct GNUNET_CRYPTO_EcdsaPrivateKey zero;
350
351 /**
352  * Configuration handle.
353  */
354 static const struct GNUNET_CONFIGURATION_Handle *GSN_cfg;
355
356 /**
357  * Handle to the statistics service
358  */
359 static struct GNUNET_STATISTICS_Handle *statistics;
360
361 /**
362  * Namecache handle.
363  */
364 static struct GNUNET_NAMECACHE_Handle *namecache;
365
366 /**
367  * Database handle
368  */
369 static struct GNUNET_NAMESTORE_PluginFunctions *GSN_database;
370
371 /**
372  * Name of the database plugin
373  */
374 static char *db_lib_name;
375
376 /**
377  * Head of cop DLL.
378  */
379 static struct CacheOperation *cop_head;
380
381 /**
382  * Tail of cop DLL.
383  */
384 static struct CacheOperation *cop_tail;
385
386 /**
387  * First active zone monitor.
388  */
389 static struct ZoneMonitor *monitor_head;
390
391 /**
392  * Last active zone monitor.
393  */
394 static struct ZoneMonitor *monitor_tail;
395
396 /**
397  * Head of DLL of monitor-blocked store activities.
398  */
399 static struct StoreActivity *sa_head;
400
401 /**
402  * Tail of DLL of monitor-blocked store activities.
403  */
404 static struct StoreActivity *sa_tail;
405
406 /**
407  * Notification context shared by all monitors.
408  */
409 static struct GNUNET_NotificationContext *monitor_nc;
410
411 /**
412  * Optimize block insertion by caching map of private keys to
413  * public keys in memory?
414  */
415 static int cache_keys;
416
417 /**
418  * Use the namecache? Doing so creates additional cryptographic
419  * operations whenever we touch a record.
420  */
421 static int disable_namecache;
422
423
424 /**
425  * Task run during shutdown.
426  *
427  * @param cls unused
428  */
429 static void
430 cleanup_task (void *cls)
431 {
432   struct CacheOperation *cop;
433
434   (void) cls;
435   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
436               "Stopping namestore service\n");
437   while (NULL != (cop = cop_head))
438   {
439     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
440                 "Aborting incomplete namecache operation\n");
441     GNUNET_NAMECACHE_cancel (cop->qe);
442     GNUNET_CONTAINER_DLL_remove (cop_head,
443                                  cop_tail,
444                                  cop);
445     GNUNET_free (cop);
446   }
447   if (NULL != namecache)
448   {
449     GNUNET_NAMECACHE_disconnect (namecache);
450     namecache = NULL;
451   }
452   GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name,
453                                               GSN_database));
454   GNUNET_free (db_lib_name);
455   db_lib_name = NULL;
456   if (NULL != monitor_nc)
457   {
458     GNUNET_notification_context_destroy (monitor_nc);
459     monitor_nc = NULL;
460   }
461   if (NULL != statistics)
462   {
463     GNUNET_STATISTICS_destroy (statistics,
464                                GNUNET_NO);
465     statistics = NULL;
466   }
467 }
468
469
470 /**
471  * Release memory used by @a sa.
472  *
473  * @param sa activity to free
474  */
475 static void
476 free_store_activity (struct StoreActivity *sa)
477 {
478   GNUNET_CONTAINER_DLL_remove (sa_head,
479                                sa_tail,
480                                sa);
481   GNUNET_free (sa->conv_name);
482   GNUNET_free (sa);
483 }
484
485
486 /**
487  * Function called with the records for the #GNUNET_GNS_EMPTY_LABEL_AT
488  * label in the zone.  Used to locate the #GNUNET_GNSRECORD_TYPE_NICK
489  * record, which (if found) is then copied to @a cls for future use.
490  *
491  * @param cls a `struct GNUNET_GNSRECORD_Data **` for storing the nick (if found)
492  * @param seq sequence number of the record, MUST NOT BE ZERO
493  * @param private_key the private key of the zone (unused)
494  * @param label should be #GNUNET_GNS_EMPTY_LABEL_AT
495  * @param rd_count number of records in @a rd
496  * @param rd records stored under @a label in the zone
497  */
498 static void
499 lookup_nick_it (void *cls,
500                 uint64_t seq,
501                 const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
502                 const char *label,
503                 unsigned int rd_count,
504                 const struct GNUNET_GNSRECORD_Data *rd)
505 {
506   struct GNUNET_GNSRECORD_Data **res = cls;
507
508   (void) private_key;
509   GNUNET_assert (0 != seq);
510   if (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT))
511   {
512     GNUNET_break (0);
513     return;
514   }
515   for (unsigned int c = 0; c < rd_count; c++)
516   {
517     if (GNUNET_GNSRECORD_TYPE_NICK == rd[c].record_type)
518     {
519       (*res) = GNUNET_malloc (rd[c].data_size + sizeof (struct GNUNET_GNSRECORD_Data));
520       (*res)->data = &(*res)[1];
521       GNUNET_memcpy ((void *) (*res)->data,
522                      rd[c].data,
523                      rd[c].data_size);
524       (*res)->data_size = rd[c].data_size;
525       (*res)->expiration_time = rd[c].expiration_time;
526       (*res)->flags = rd[c].flags;
527       (*res)->record_type = GNUNET_GNSRECORD_TYPE_NICK;
528       return;
529     }
530   }
531   (*res) = NULL;
532 }
533
534
535 /**
536  * Add entry to the cache for @a zone and @a nick
537  *
538  * @param zone zone key to cache under
539  * @param nick nick entry to cache
540  */
541 static void
542 cache_nick (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone,
543             const struct GNUNET_GNSRECORD_Data *nick)
544 {
545   struct NickCache *oldest;
546
547   oldest = NULL;
548   for (unsigned int i=0;i<NC_SIZE;i++)
549   {
550     struct NickCache *pos = &nick_cache[i];
551
552     if ( (NULL == oldest) ||
553          (oldest->last_used.abs_value_us >
554           pos->last_used.abs_value_us) )
555       oldest = pos;
556     if (0 == memcmp (zone,
557                      &pos->zone,
558                      sizeof (*zone)))
559     {
560       oldest = pos;
561       break;
562     }
563   }
564   GNUNET_free_non_null (oldest->rd);
565   oldest->zone = *zone;
566   oldest->rd = GNUNET_malloc (sizeof (*nick) +
567                               nick->data_size);
568   *oldest->rd = *nick;
569   oldest->rd->data = &oldest->rd[1];
570   memcpy (&oldest->rd[1],
571           nick->data,
572           nick->data_size);
573   oldest->last_used = GNUNET_TIME_absolute_get ();
574 }
575
576
577 /**
578  * Return the NICK record for the zone (if it exists).
579  *
580  * @param zone private key for the zone to look for nick
581  * @return NULL if no NICK record was found
582  */
583 static struct GNUNET_GNSRECORD_Data *
584 get_nick_record (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone)
585 {
586   struct GNUNET_CRYPTO_EcdsaPublicKey pub;
587   struct GNUNET_GNSRECORD_Data *nick;
588   int res;
589
590   /* check cache first */
591   for (unsigned int i=0;i<NC_SIZE;i++)
592   {
593     struct NickCache *pos = &nick_cache[i];
594     if ( (NULL != pos->rd) &&
595          (0 == memcmp (zone,
596                        &pos->zone,
597                        sizeof (*zone))) )
598     {
599       nick = GNUNET_malloc (sizeof (*nick) +
600                             pos->rd->data_size);
601       *nick = *pos->rd;
602       nick->data = &nick[1];
603       memcpy (&nick[1],
604               pos->rd->data,
605               pos->rd->data_size);
606       pos->last_used = GNUNET_TIME_absolute_get ();
607       return nick;
608     }
609   }
610
611   nick = NULL;
612   res = GSN_database->lookup_records (GSN_database->cls,
613                                       zone,
614                                       GNUNET_GNS_EMPTY_LABEL_AT,
615                                       &lookup_nick_it,
616                                       &nick);
617   if ( (GNUNET_OK != res) ||
618        (NULL == nick) )
619   {
620     GNUNET_CRYPTO_ecdsa_key_get_public (zone, &pub);
621     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
622                 "No nick name set for zone `%s'\n",
623                 GNUNET_GNSRECORD_z2s (&pub));
624     return NULL;
625   }
626
627   /* update cache */
628   cache_nick (zone,
629               nick);
630   return nick;
631 }
632
633
634 /**
635  * Merge the nick record @a nick_rd with the rest of the
636  * record set given in @a rd2.  Store the result in @a rdc_res
637  * and @a rd_res.  The @a nick_rd's expiration time is set to
638  * the maximum expiration time of all of the records in @a rd2.
639  *
640  * @param nick_rd the nick record to integrate
641  * @param rd2_length length of the @a rd2 array
642  * @param rd2 array of records
643  * @param rdc_res[out] length of the resulting @a rd_res array
644  * @param rd_res[out] set to an array of records,
645  *                    including @a nick_rd and @a rd2;
646  *           all of the variable-size 'data' fields in @a rd2 are
647  *           allocated in the same chunk of memory!
648  */
649 static void
650 merge_with_nick_records (const struct GNUNET_GNSRECORD_Data *nick_rd,
651                          unsigned int rd2_length,
652                          const struct GNUNET_GNSRECORD_Data *rd2,
653                          unsigned int *rdc_res,
654                          struct GNUNET_GNSRECORD_Data **rd_res)
655 {
656   uint64_t latest_expiration;
657   size_t req;
658   char *data;
659   size_t data_offset;
660   struct GNUNET_GNSRECORD_Data *target;
661
662   (*rdc_res) = 1 + rd2_length;
663   if (0 == 1 + rd2_length)
664   {
665     GNUNET_break (0);
666     (*rd_res) = NULL;
667     return;
668   }
669   req = sizeof (struct GNUNET_GNSRECORD_Data) + nick_rd->data_size;
670   for (unsigned int i=0; i<rd2_length; i++)
671   {
672     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
673
674     if (req + sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size < req)
675     {
676       GNUNET_break (0);
677       (*rd_res) = NULL;
678       return;
679     }
680     req += sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size;
681   }
682   target = GNUNET_malloc (req);
683   (*rd_res) = target;
684   data = (char *) &target[1 + rd2_length];
685   data_offset = 0;
686   latest_expiration = 0;
687   for (unsigned int i=0;i<rd2_length;i++)
688   {
689     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
690
691     if (0 != (orig->flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
692     {
693       if ((GNUNET_TIME_absolute_get().abs_value_us + orig->expiration_time) >
694           latest_expiration)
695         latest_expiration = orig->expiration_time;
696     }
697     else if (orig->expiration_time > latest_expiration)
698       latest_expiration = orig->expiration_time;
699     target[i] = *orig;
700     target[i].data = (void *) &data[data_offset];
701     GNUNET_memcpy (&data[data_offset],
702                    orig->data,
703                    orig->data_size);
704     data_offset += orig->data_size;
705   }
706   /* append nick */
707   target[rd2_length] = *nick_rd;
708   target[rd2_length].expiration_time = latest_expiration;
709   target[rd2_length].data = (void *) &data[data_offset];
710   GNUNET_memcpy (&data[data_offset],
711                  nick_rd->data,
712                  nick_rd->data_size);
713   data_offset += nick_rd->data_size;
714   GNUNET_assert (req ==
715                  (sizeof (struct GNUNET_GNSRECORD_Data)) * (*rdc_res) + data_offset);
716 }
717
718
719 /**
720  * Generate a `struct LookupNameResponseMessage` and send it to the
721  * given client using the given notification context.
722  *
723  * @param nc client to unicast to
724  * @param request_id request ID to use
725  * @param zone_key zone key of the zone
726  * @param name name
727  * @param rd_count number of records in @a rd
728  * @param rd array of records
729  */
730 static void
731 send_lookup_response (struct NamestoreClient *nc,
732                       uint32_t request_id,
733                       const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
734                       const char *name,
735                       unsigned int rd_count,
736                       const struct GNUNET_GNSRECORD_Data *rd)
737 {
738   struct GNUNET_MQ_Envelope *env;
739   struct RecordResultMessage *zir_msg;
740   struct GNUNET_GNSRECORD_Data *nick;
741   struct GNUNET_GNSRECORD_Data *res;
742   unsigned int res_count;
743   size_t name_len;
744   ssize_t rd_ser_len;
745   char *name_tmp;
746   char *rd_ser;
747
748   nick = get_nick_record (zone_key);
749   GNUNET_assert (-1 !=
750                  GNUNET_GNSRECORD_records_get_size (rd_count,
751                                                     rd));
752
753   if ( (NULL != nick) &&
754        (0 != strcmp (name,
755                      GNUNET_GNS_EMPTY_LABEL_AT)))
756   {
757     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
758     merge_with_nick_records (nick,
759                              rd_count,
760                              rd,
761                              &res_count,
762                              &res);
763     GNUNET_free (nick);
764   }
765   else
766   {
767     res_count = rd_count;
768     res = (struct GNUNET_GNSRECORD_Data *) rd;
769   }
770
771   GNUNET_assert (-1 !=
772                  GNUNET_GNSRECORD_records_get_size (res_count,
773                                                     res));
774
775
776   name_len = strlen (name) + 1;
777   rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count,
778                                                   res);
779   if (rd_ser_len < 0)
780   {
781     GNUNET_break (0);
782     GNUNET_SERVICE_client_drop (nc->client);
783     return;
784   }
785   if (((size_t) rd_ser_len) >= UINT16_MAX - name_len - sizeof (*zir_msg))
786   {
787     GNUNET_break (0);
788     GNUNET_SERVICE_client_drop (nc->client);
789     return;
790   }
791   env = GNUNET_MQ_msg_extra (zir_msg,
792                              name_len + rd_ser_len,
793                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
794   zir_msg->gns_header.r_id = htonl (request_id);
795   zir_msg->name_len = htons (name_len);
796   zir_msg->rd_count = htons (res_count);
797   zir_msg->rd_len = htons ((uint16_t) rd_ser_len);
798   zir_msg->private_key = *zone_key;
799   name_tmp = (char *) &zir_msg[1];
800   GNUNET_memcpy (name_tmp,
801                  name,
802                  name_len);
803   rd_ser = &name_tmp[name_len];
804   GNUNET_assert (rd_ser_len ==
805                  GNUNET_GNSRECORD_records_serialize (res_count,
806                                                      res,
807                                                      rd_ser_len,
808                                                      rd_ser));
809   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810               "Sending RECORD_RESULT message with %u records\n",
811               res_count);
812   GNUNET_STATISTICS_update (statistics,
813                             "Record sets sent to clients",
814                             1,
815                             GNUNET_NO);
816   GNUNET_MQ_send (nc->mq,
817                   env);
818   if (rd != res)
819     GNUNET_free (res);
820 }
821
822
823 /**
824  * Send response to the store request to the client.
825  *
826  * @param client client to talk to
827  * @param res status of the operation
828  * @param rid client's request ID
829  */
830 static void
831 send_store_response (struct NamestoreClient *nc,
832                      int res,
833                      uint32_t rid)
834 {
835   struct GNUNET_MQ_Envelope *env;
836   struct RecordStoreResponseMessage *rcr_msg;
837
838   GNUNET_assert (NULL != nc);
839   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
840               "Sending RECORD_STORE_RESPONSE message\n");
841   GNUNET_STATISTICS_update (statistics,
842                             "Store requests completed",
843                             1,
844                             GNUNET_NO);
845   env = GNUNET_MQ_msg (rcr_msg,
846                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE_RESPONSE);
847   rcr_msg->gns_header.r_id = htonl (rid);
848   rcr_msg->op_result = htonl (res);
849   GNUNET_MQ_send (nc->mq,
850                   env);
851 }
852
853
854 /**
855  * Function called once we are done with the zone iteration and
856  * allow the zone iteration client to send us more messages.
857  *
858  * @param zi zone iteration we are processing
859  */
860 static void
861 zone_iteration_done_client_continue (struct ZoneIteration *zi)
862 {
863   struct GNUNET_MQ_Envelope *env;
864   struct GNUNET_NAMESTORE_Header *em;
865
866   GNUNET_SERVICE_client_continue (zi->nc->client);
867   if (! zi->send_end)
868     return;
869   /* send empty response to indicate end of list */
870   env = GNUNET_MQ_msg (em,
871                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END);
872   em->r_id = htonl (zi->request_id);
873   GNUNET_MQ_send (zi->nc->mq,
874                   env);
875
876   GNUNET_CONTAINER_DLL_remove (zi->nc->op_head,
877                                zi->nc->op_tail,
878                                zi);
879   GNUNET_free (zi);
880 }
881
882
883 /**
884  * Cache operation complete, clean up.
885  *
886  * @param cls the `struct CacheOperation`
887  * @param success success
888  * @param emsg error messages
889  */
890 static void
891 finish_cache_operation (void *cls,
892                         int32_t success,
893                         const char *emsg)
894 {
895   struct CacheOperation *cop = cls;
896   struct ZoneIteration *zi;
897
898   if (NULL != emsg)
899     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
900                 _("Failed to replicate block in namecache: %s\n"),
901                 emsg);
902   else
903     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
904                 "CACHE operation completed\n");
905   GNUNET_CONTAINER_DLL_remove (cop_head,
906                                cop_tail,
907                                cop);
908   if (NULL != cop->nc)
909     send_store_response (cop->nc,
910                          success,
911                          cop->rid);
912   if (NULL != (zi = cop->zi))
913     {
914       zi->cache_ops--;
915       if (0 == zi->cache_ops)
916       {
917         /* unchoke zone iteration, cache has caught up */
918         zone_iteration_done_client_continue (zi);
919       }
920     }
921   GNUNET_free (cop);
922 }
923
924
925 /**
926  * We just touched the plaintext information about a name in our zone;
927  * refresh the corresponding (encrypted) block in the namecache.
928  *
929  * @param nc client responsible for the request, can be NULL
930  * @param zi zone iteration response for the request, can be NULL
931  * @param rid request ID of the client
932  * @param zone_key private key of the zone
933  * @param name label for the records
934  * @param rd_count number of records
935  * @param rd records stored under the given @a name
936  */
937 static void
938 refresh_block (struct NamestoreClient *nc,
939                struct ZoneIteration *zi,
940                uint32_t rid,
941                const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
942                const char *name,
943                unsigned int rd_count,
944                const struct GNUNET_GNSRECORD_Data *rd)
945 {
946   struct GNUNET_GNSRECORD_Block *block;
947   struct CacheOperation *cop;
948   struct GNUNET_CRYPTO_EcdsaPublicKey pkey;
949   struct GNUNET_GNSRECORD_Data *nick;
950   struct GNUNET_GNSRECORD_Data *res;
951   unsigned int res_count;
952   struct GNUNET_TIME_Absolute exp_time;
953
954   nick = get_nick_record (zone_key);
955   res_count = rd_count;
956   res = (struct GNUNET_GNSRECORD_Data *) rd; /* fixme: a bit unclean... */
957   if (NULL != nick)
958   {
959     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
960     merge_with_nick_records (nick,
961                              rd_count,rd,
962                              &res_count,
963                              &res);
964     GNUNET_free (nick);
965   }
966   if (0 == res_count)
967   {
968     if (NULL != nc)
969       send_store_response (nc,
970                            GNUNET_OK,
971                            rid);
972     return; /* no data, no need to update cache */
973   }
974   if (GNUNET_YES == disable_namecache)
975   {
976     GNUNET_STATISTICS_update (statistics,
977                               "Namecache updates skipped (NC disabled)",
978                               1,
979                               GNUNET_NO);
980     if (NULL != nc)
981       send_store_response (nc,
982                            GNUNET_OK,
983                            rid);
984     return;
985   }
986   exp_time = GNUNET_GNSRECORD_record_get_expiration_time (res_count,
987                                                           res);
988   if (cache_keys)
989     block = GNUNET_GNSRECORD_block_create2 (zone_key,
990                                             exp_time,
991                                             name,
992                                             res,
993                                             res_count);
994   else
995     block = GNUNET_GNSRECORD_block_create (zone_key,
996                                            exp_time,
997                                            name,
998                                            res,
999                                            res_count);
1000   GNUNET_assert (NULL != block);
1001   GNUNET_CRYPTO_ecdsa_key_get_public (zone_key,
1002                                       &pkey);
1003   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1004               "Caching block for label `%s' with %u records and expiration %s in zone `%s' in namecache\n",
1005               name,
1006               res_count,
1007               GNUNET_STRINGS_absolute_time_to_string (exp_time),
1008               GNUNET_GNSRECORD_z2s (&pkey));
1009   GNUNET_STATISTICS_update (statistics,
1010                             "Namecache updates pushed",
1011                             1,
1012                             GNUNET_NO);
1013   cop = GNUNET_new (struct CacheOperation);
1014   cop->nc = nc;
1015   cop->zi = zi;
1016   if (NULL != zi)
1017     zi->cache_ops++;
1018   cop->rid = rid;
1019   GNUNET_CONTAINER_DLL_insert (cop_head,
1020                                cop_tail,
1021                                cop);
1022   cop->qe = GNUNET_NAMECACHE_block_cache (namecache,
1023                                           block,
1024                                           &finish_cache_operation,
1025                                           cop);
1026   GNUNET_free (block);
1027 }
1028
1029
1030 /**
1031  * Print a warning that one of our monitors is no longer reacting.
1032  *
1033  * @param cls a `struct ZoneMonitor` to warn about
1034  */
1035 static void
1036 warn_monitor_slow (void *cls)
1037 {
1038   struct ZoneMonitor *zm = cls;
1039
1040   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1041               "No response from monitor since %s\n",
1042               GNUNET_STRINGS_absolute_time_to_string (zm->sa_waiting_start));
1043   zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1044                                                       &warn_monitor_slow,
1045                                                       zm);
1046 }
1047
1048
1049 /**
1050  * Continue processing the @a sa.
1051  *
1052  * @param sa store activity to process
1053  */
1054 static void
1055 continue_store_activity (struct StoreActivity *sa)
1056 {
1057   const struct RecordStoreMessage *rp_msg = sa->rsm;
1058   unsigned int rd_count;
1059   size_t name_len;
1060   size_t rd_ser_len;
1061   uint32_t rid;
1062   const char *name_tmp;
1063   const char *rd_ser;
1064
1065   rid = ntohl (rp_msg->gns_header.r_id);
1066   name_len = ntohs (rp_msg->name_len);
1067   rd_count = ntohs (rp_msg->rd_count);
1068   rd_ser_len = ntohs (rp_msg->rd_len);
1069   name_tmp = (const char *) &rp_msg[1];
1070   rd_ser = &name_tmp[name_len];
1071   {
1072     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1073
1074     /* We did this before, must succeed again */
1075     GNUNET_assert (GNUNET_OK ==
1076                    GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1077                                                          rd_ser,
1078                                                          rd_count,
1079                                                          rd));
1080
1081     for (struct ZoneMonitor *zm = sa->zm_pos;
1082          NULL != zm;
1083          zm = sa->zm_pos)
1084     {
1085       if ( (0 != memcmp (&rp_msg->private_key,
1086                          &zm->zone,
1087                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) &&
1088            (0 != memcmp (&zm->zone,
1089                          &zero,
1090                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) )
1091         {
1092           sa->zm_pos = zm->next; /* not interesting to this monitor */
1093           continue;
1094         }
1095       if (zm->limit == zm->iteration_cnt)
1096       {
1097         zm->sa_waiting = GNUNET_YES;
1098         zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
1099         if (NULL != zm->sa_wait_warning)
1100           GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1101         zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1102                                                             &warn_monitor_slow,
1103                                                             zm);
1104         return; /* blocked on zone monitor */
1105       }
1106       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107                   "Notifying monitor about changes under label `%s'\n",
1108                   sa->conv_name);
1109       zm->limit--;
1110       send_lookup_response (zm->nc,
1111                             0,
1112                             &rp_msg->private_key,
1113                             sa->conv_name,
1114                             rd_count,
1115                             rd);
1116       sa->zm_pos = zm->next;
1117     }
1118     /* great, done with the monitors, unpack (again) for refresh_block operation */
1119     refresh_block (sa->nc,
1120                    NULL,
1121                    rid,
1122                    &rp_msg->private_key,
1123                    sa->conv_name,
1124                    rd_count,
1125                    rd);
1126   }
1127   GNUNET_SERVICE_client_continue (sa->nc->client);
1128   free_store_activity (sa);
1129 }
1130
1131
1132 /**
1133  * Called whenever a client is disconnected.
1134  * Frees our resources associated with that client.
1135  *
1136  * @param cls closure
1137  * @param client identification of the client
1138  * @param app_ctx the `struct NamestoreClient` of @a client
1139  */
1140 static void
1141 client_disconnect_cb (void *cls,
1142                       struct GNUNET_SERVICE_Client *client,
1143                       void *app_ctx)
1144 {
1145   struct NamestoreClient *nc = app_ctx;
1146   struct ZoneIteration *no;
1147   struct CacheOperation *cop;
1148
1149   (void) cls;
1150   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1151               "Client %p disconnected\n",
1152               client);
1153   for (struct ZoneMonitor *zm = monitor_head; NULL != zm; zm = zm->next)
1154   {
1155     struct StoreActivity *san;
1156
1157     if (nc != zm->nc)
1158       continue;
1159     GNUNET_CONTAINER_DLL_remove (monitor_head,
1160                                  monitor_tail,
1161                                  zm);
1162     if (NULL != zm->task)
1163     {
1164       GNUNET_SCHEDULER_cancel (zm->task);
1165       zm->task = NULL;
1166     }
1167     if (NULL != zm->sa_wait_warning)
1168     {
1169       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1170       zm->sa_wait_warning = NULL;
1171     }
1172     for (struct StoreActivity *sa = sa_head; NULL != sa; sa = san)
1173     {
1174       san = sa->next;
1175       if (zm == sa->zm_pos)
1176       {
1177         sa->zm_pos = zm->next;
1178         /* this may free sa */
1179         continue_store_activity (sa);
1180       }
1181     }
1182     GNUNET_free (zm);
1183     break;
1184   }
1185   for (struct StoreActivity *sa = sa_head; NULL != sa; sa = sa->next)
1186   {
1187     if (sa->nc == nc)
1188     {
1189       /* this may free sa */
1190       free_store_activity (sa);
1191       break; /* there can only be one per nc */
1192     }
1193   }
1194   while (NULL != (no = nc->op_head))
1195   {
1196     GNUNET_CONTAINER_DLL_remove (nc->op_head,
1197                                  nc->op_tail,
1198                                  no);
1199     GNUNET_free (no);
1200   }
1201   for (cop = cop_head; NULL != cop; cop = cop->next)
1202     if (nc == cop->nc)
1203       cop->nc = NULL;
1204   GNUNET_free (nc);
1205 }
1206
1207
1208 /**
1209  * Add a client to our list of active clients.
1210  *
1211  * @param cls NULL
1212  * @param client client to add
1213  * @param mq message queue for @a client
1214  * @return internal namestore client structure for this client
1215  */
1216 static void *
1217 client_connect_cb (void *cls,
1218                    struct GNUNET_SERVICE_Client *client,
1219                    struct GNUNET_MQ_Handle *mq)
1220 {
1221   struct NamestoreClient *nc;
1222
1223   (void) cls;
1224   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1225               "Client %p connected\n",
1226               client);
1227   nc = GNUNET_new (struct NamestoreClient);
1228   nc->client = client;
1229   nc->mq = mq;
1230   return nc;
1231 }
1232
1233
1234 /**
1235  * Closure for #lookup_it().
1236  */
1237 struct RecordLookupContext
1238 {
1239
1240   /**
1241    * FIXME.
1242    */
1243   const char *label;
1244
1245   /**
1246    * FIXME.
1247    */
1248   char *res_rd;
1249
1250   /**
1251    * FIXME.
1252    */
1253   struct GNUNET_GNSRECORD_Data *nick;
1254
1255   /**
1256    * FIXME.
1257    */
1258   int found;
1259
1260   /**
1261    * FIXME.
1262    */
1263   unsigned int res_rd_count;
1264
1265   /**
1266    * FIXME.
1267    */
1268   ssize_t rd_ser_len;
1269 };
1270
1271
1272 /**
1273  * Function called by the namestore plugin when we are trying to lookup
1274  * a record as part of #handle_record_lookup().  Merges all results into
1275  * the context.
1276  *
1277  * @param cls closure with a `struct RecordLookupContext`
1278  * @param seq unique serial number of the record, MUST NOT BE ZERO
1279  * @param zone_key private key of the zone
1280  * @param label name that is being mapped (at most 255 characters long)
1281  * @param rd_count number of entries in @a rd array
1282  * @param rd array of records with data to store
1283  */
1284 static void
1285 lookup_it (void *cls,
1286            uint64_t seq,
1287            const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
1288            const char *label,
1289            unsigned int rd_count,
1290            const struct GNUNET_GNSRECORD_Data *rd)
1291 {
1292   struct RecordLookupContext *rlc = cls;
1293
1294   (void) private_key;
1295   GNUNET_assert (0 != seq);
1296   if (0 != strcmp (label,
1297                    rlc->label))
1298     return;
1299   rlc->found = GNUNET_YES;
1300   if (0 == rd_count)
1301   {
1302     rlc->rd_ser_len = 0;
1303     rlc->res_rd_count = 0;
1304     rlc->res_rd = NULL;
1305     return;
1306   }
1307   if ( (NULL != rlc->nick) &&
1308        (0 != strcmp (label,
1309                      GNUNET_GNS_EMPTY_LABEL_AT)) )
1310   {
1311     /* Merge */
1312     struct GNUNET_GNSRECORD_Data *rd_res;
1313     unsigned int rdc_res;
1314
1315     rd_res = NULL;
1316     rdc_res = 0;
1317     rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
1318     merge_with_nick_records (rlc->nick,
1319                              rd_count,
1320                              rd,
1321                              &rdc_res,
1322                              &rd_res);
1323     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res,
1324                                                          rd_res);
1325     if (rlc->rd_ser_len < 0)
1326     {
1327       GNUNET_break (0);
1328       GNUNET_free  (rd_res);
1329       rlc->found = GNUNET_NO;
1330       rlc->rd_ser_len = 0;
1331       return;
1332     }
1333     rlc->res_rd_count = rdc_res;
1334     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1335     if (rlc->rd_ser_len !=
1336         GNUNET_GNSRECORD_records_serialize (rdc_res,
1337                                             rd_res,
1338                                             rlc->rd_ser_len,
1339                                             rlc->res_rd))
1340     {
1341       GNUNET_break (0);
1342       GNUNET_free  (rlc->res_rd);
1343       rlc->res_rd = NULL;
1344       rlc->res_rd_count = 0;
1345       rlc->rd_ser_len = 0;
1346       GNUNET_free  (rd_res);
1347       rlc->found = GNUNET_NO;
1348       return;
1349     }
1350     GNUNET_free (rd_res);
1351     GNUNET_free (rlc->nick);
1352     rlc->nick = NULL;
1353   }
1354   else
1355   {
1356     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1357                                                          rd);
1358     if (rlc->rd_ser_len < 0)
1359     {
1360       GNUNET_break (0);
1361       rlc->found = GNUNET_NO;
1362       rlc->rd_ser_len = 0;
1363       return;
1364     }
1365     rlc->res_rd_count = rd_count;
1366     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1367     if (rlc->rd_ser_len !=
1368         GNUNET_GNSRECORD_records_serialize (rd_count,
1369                                             rd,
1370                                             rlc->rd_ser_len,
1371                                             rlc->res_rd))
1372     {
1373       GNUNET_break (0);
1374       GNUNET_free  (rlc->res_rd);
1375       rlc->res_rd = NULL;
1376       rlc->res_rd_count = 0;
1377       rlc->rd_ser_len = 0;
1378       rlc->found = GNUNET_NO;
1379       return;
1380     }
1381   }
1382 }
1383
1384
1385 /**
1386  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1387  *
1388  * @param cls client sending the message
1389  * @param ll_msg message of type `struct LabelLookupMessage`
1390  * @return #GNUNET_OK if @a ll_msg is well-formed
1391  */
1392 static int
1393 check_record_lookup (void *cls,
1394                      const struct LabelLookupMessage *ll_msg)
1395 {
1396   uint32_t name_len;
1397   size_t src_size;
1398
1399   (void) cls;
1400   name_len = ntohl (ll_msg->label_len);
1401   src_size = ntohs (ll_msg->gns_header.header.size);
1402   if (name_len != src_size - sizeof (struct LabelLookupMessage))
1403   {
1404     GNUNET_break (0);
1405     return GNUNET_SYSERR;
1406   }
1407   GNUNET_MQ_check_zero_termination (ll_msg);
1408   return GNUNET_OK;
1409 }
1410
1411
1412 /**
1413  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1414  *
1415  * @param cls client sending the message
1416  * @param ll_msg message of type `struct LabelLookupMessage`
1417  */
1418 static void
1419 handle_record_lookup (void *cls,
1420                       const struct LabelLookupMessage *ll_msg)
1421 {
1422   struct NamestoreClient *nc = cls;
1423   struct GNUNET_MQ_Envelope *env;
1424   struct LabelLookupResponseMessage *llr_msg;
1425   struct RecordLookupContext rlc;
1426   const char *name_tmp;
1427   char *res_name;
1428   char *conv_name;
1429   uint32_t name_len;
1430   int res;
1431
1432   name_len = ntohl (ll_msg->label_len);
1433   name_tmp = (const char *) &ll_msg[1];
1434   GNUNET_SERVICE_client_continue (nc->client);
1435   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1436               "Received NAMESTORE_RECORD_LOOKUP message for name `%s'\n",
1437               name_tmp);
1438
1439   conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1440   if (NULL == conv_name)
1441   {
1442     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1443                 "Error converting name `%s'\n",
1444                 name_tmp);
1445     GNUNET_SERVICE_client_drop (nc->client);
1446     return;
1447   }
1448   rlc.label = conv_name;
1449   rlc.found = GNUNET_NO;
1450   rlc.res_rd_count = 0;
1451   rlc.res_rd = NULL;
1452   rlc.rd_ser_len = 0;
1453   rlc.nick = get_nick_record (&ll_msg->zone);
1454   res = GSN_database->lookup_records (GSN_database->cls,
1455                                       &ll_msg->zone,
1456                                       conv_name,
1457                                       &lookup_it,
1458                                       &rlc);
1459   GNUNET_free (conv_name);
1460   env = GNUNET_MQ_msg_extra (llr_msg,
1461                              name_len + rlc.rd_ser_len,
1462                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP_RESPONSE);
1463   llr_msg->gns_header.r_id = ll_msg->gns_header.r_id;
1464   llr_msg->private_key = ll_msg->zone;
1465   llr_msg->name_len = htons (name_len);
1466   llr_msg->rd_count = htons (rlc.res_rd_count);
1467   llr_msg->rd_len = htons (rlc.rd_ser_len);
1468   res_name = (char *) &llr_msg[1];
1469   if  ((GNUNET_YES == rlc.found) && (GNUNET_OK == res))
1470     llr_msg->found = ntohs (GNUNET_YES);
1471   else
1472     llr_msg->found = ntohs (GNUNET_NO);
1473   GNUNET_memcpy (&llr_msg[1],
1474                  name_tmp,
1475                  name_len);
1476   GNUNET_memcpy (&res_name[name_len],
1477                  rlc.res_rd,
1478                  rlc.rd_ser_len);
1479   GNUNET_MQ_send (nc->mq,
1480                   env);
1481   GNUNET_free_non_null (rlc.res_rd);
1482 }
1483
1484
1485 /**
1486  * Checks a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1487  *
1488  * @param cls client sending the message
1489  * @param rp_msg message of type `struct RecordStoreMessage`
1490  * @return #GNUNET_OK if @a rp_msg is well-formed
1491  */
1492 static int
1493 check_record_store (void *cls,
1494                     const struct RecordStoreMessage *rp_msg)
1495 {
1496   size_t name_len;
1497   size_t msg_size;
1498   size_t msg_size_exp;
1499   size_t rd_ser_len;
1500   const char *name_tmp;
1501
1502   (void) cls;
1503   name_len = ntohs (rp_msg->name_len);
1504   msg_size = ntohs (rp_msg->gns_header.header.size);
1505   rd_ser_len = ntohs (rp_msg->rd_len);
1506   msg_size_exp = sizeof (struct RecordStoreMessage) + name_len + rd_ser_len;
1507   if (msg_size != msg_size_exp)
1508   {
1509     GNUNET_break (0);
1510     return GNUNET_SYSERR;
1511   }
1512   if ( (0 == name_len) ||
1513        (name_len > MAX_NAME_LEN) )
1514   {
1515     GNUNET_break (0);
1516     return GNUNET_SYSERR;
1517   }
1518   name_tmp = (const char *) &rp_msg[1];
1519   if ('\0' != name_tmp[name_len -1])
1520   {
1521     GNUNET_break (0);
1522     return GNUNET_SYSERR;
1523   }
1524   return GNUNET_OK;
1525 }
1526
1527
1528 /**
1529  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1530  *
1531  * @param cls client sending the message
1532  * @param rp_msg message of type `struct RecordStoreMessage`
1533  */
1534 static void
1535 handle_record_store (void *cls,
1536                      const struct RecordStoreMessage *rp_msg)
1537 {
1538   struct NamestoreClient *nc = cls;
1539   size_t name_len;
1540   size_t rd_ser_len;
1541   uint32_t rid;
1542   const char *name_tmp;
1543   char *conv_name;
1544   const char *rd_ser;
1545   unsigned int rd_count;
1546   int res;
1547   struct StoreActivity *sa;
1548
1549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1550               "Received NAMESTORE_RECORD_STORE message\n");
1551   rid = ntohl (rp_msg->gns_header.r_id);
1552   name_len = ntohs (rp_msg->name_len);
1553   rd_count = ntohs (rp_msg->rd_count);
1554   rd_ser_len = ntohs (rp_msg->rd_len);
1555   GNUNET_break (0 == ntohs (rp_msg->reserved));
1556   name_tmp = (const char *) &rp_msg[1];
1557   rd_ser = &name_tmp[name_len];
1558   {
1559     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1560
1561     if (GNUNET_OK !=
1562         GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1563                                               rd_ser,
1564                                               rd_count,
1565                                               rd))
1566     {
1567       GNUNET_break (0);
1568       GNUNET_SERVICE_client_drop (nc->client);
1569       return;
1570     }
1571
1572     /* Extracting and converting private key */
1573     conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1574     if (NULL == conv_name)
1575     {
1576       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1577                   "Error converting name `%s'\n",
1578                   name_tmp);
1579       GNUNET_SERVICE_client_drop (nc->client);
1580       return;
1581     }
1582     GNUNET_STATISTICS_update (statistics,
1583                               "Well-formed store requests received",
1584                               1,
1585                               GNUNET_NO);
1586     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1587                 "Creating %u records for name `%s'\n",
1588                 (unsigned int) rd_count,
1589                 conv_name);
1590     if ( (0 == rd_count) &&
1591          (GNUNET_NO ==
1592           GSN_database->lookup_records (GSN_database->cls,
1593                                         &rp_msg->private_key,
1594                                         conv_name,
1595                                         NULL,
1596                                         0)) )
1597     {
1598       /* This name does not exist, so cannot be removed */
1599       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1600                   "Name `%s' does not exist, no deletion required\n",
1601                   conv_name);
1602       res = GNUNET_NO;
1603     }
1604     else
1605     {
1606       /* remove "NICK" records, unless this is for the
1607          #GNUNET_GNS_EMPTY_LABEL_AT label */
1608       struct GNUNET_GNSRECORD_Data rd_clean[GNUNET_NZL(rd_count)];
1609       unsigned int rd_clean_off;
1610
1611       rd_clean_off = 0;
1612       for (unsigned int i=0;i<rd_count;i++)
1613       {
1614         rd_clean[rd_clean_off] = rd[i];
1615         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1616                            conv_name)) ||
1617              (GNUNET_GNSRECORD_TYPE_NICK != rd[i].record_type) )
1618           rd_clean_off++;
1619
1620         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1621                            conv_name)) &&
1622              (GNUNET_GNSRECORD_TYPE_NICK == rd[i].record_type) )
1623           cache_nick (&rp_msg->private_key,
1624                       &rd[i]);
1625       }
1626       res = GSN_database->store_records (GSN_database->cls,
1627                                          &rp_msg->private_key,
1628                                          conv_name,
1629                                          rd_clean_off,
1630                                          rd_clean);
1631     }
1632
1633     if (GNUNET_OK != res)
1634     {
1635       /* store not successful, not need to tell monitors */
1636       send_store_response (nc,
1637                            res,
1638                            rid);
1639       GNUNET_SERVICE_client_continue (nc->client);
1640       GNUNET_free (conv_name);
1641       return;
1642     }
1643
1644     sa = GNUNET_malloc (sizeof (struct StoreActivity) +
1645                         ntohs (rp_msg->gns_header.header.size));
1646     GNUNET_CONTAINER_DLL_insert (sa_head,
1647                                  sa_tail,
1648                                  sa);
1649     sa->nc = nc;
1650     sa->rsm = (const struct RecordStoreMessage *) &sa[1];
1651     GNUNET_memcpy (&sa[1],
1652                    rp_msg,
1653                    ntohs (rp_msg->gns_header.header.size));
1654     sa->zm_pos = monitor_head;
1655     sa->conv_name = conv_name;
1656     continue_store_activity (sa);
1657   }
1658 }
1659
1660
1661 /**
1662  * Context for record remove operations passed from #handle_zone_to_name to
1663  * #handle_zone_to_name_it as closure
1664  */
1665 struct ZoneToNameCtx
1666 {
1667   /**
1668    * Namestore client
1669    */
1670   struct NamestoreClient *nc;
1671
1672   /**
1673    * Request id (to be used in the response to the client).
1674    */
1675   uint32_t rid;
1676
1677   /**
1678    * Set to #GNUNET_OK on success, #GNUNET_SYSERR on error.  Note that
1679    * not finding a name for the zone still counts as a 'success' here,
1680    * as this field is about the success of executing the IPC protocol.
1681    */
1682   int success;
1683 };
1684
1685
1686 /**
1687  * Zone to name iterator
1688  *
1689  * @param cls struct ZoneToNameCtx *
1690  * @param seq sequence number of the record, MUST NOT BE ZERO
1691  * @param zone_key the zone key
1692  * @param name name
1693  * @param rd_count number of records in @a rd
1694  * @param rd record data
1695  */
1696 static void
1697 handle_zone_to_name_it (void *cls,
1698                         uint64_t seq,
1699                         const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1700                         const char *name,
1701                         unsigned int rd_count,
1702                         const struct GNUNET_GNSRECORD_Data *rd)
1703 {
1704   struct ZoneToNameCtx *ztn_ctx = cls;
1705   struct GNUNET_MQ_Envelope *env;
1706   struct ZoneToNameResponseMessage *ztnr_msg;
1707   int16_t res;
1708   size_t name_len;
1709   ssize_t rd_ser_len;
1710   size_t msg_size;
1711   char *name_tmp;
1712   char *rd_tmp;
1713
1714   GNUNET_assert (0 != seq);
1715   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1716               "Found result for zone-to-name lookup: `%s'\n",
1717               name);
1718   res = GNUNET_YES;
1719   name_len = (NULL == name) ? 0 : strlen (name) + 1;
1720   rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1721                                                   rd);
1722   if (rd_ser_len < 0)
1723   {
1724     GNUNET_break (0);
1725     ztn_ctx->success = GNUNET_SYSERR;
1726     return;
1727   }
1728   msg_size = sizeof (struct ZoneToNameResponseMessage) + name_len + rd_ser_len;
1729   if (msg_size >= GNUNET_MAX_MESSAGE_SIZE)
1730   {
1731     GNUNET_break (0);
1732     ztn_ctx->success = GNUNET_SYSERR;
1733     return;
1734   }
1735   env = GNUNET_MQ_msg_extra (ztnr_msg,
1736                              name_len + rd_ser_len,
1737                              GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1738   ztnr_msg->gns_header.header.size = htons (msg_size);
1739   ztnr_msg->gns_header.r_id = htonl (ztn_ctx->rid);
1740   ztnr_msg->res = htons (res);
1741   ztnr_msg->rd_len = htons (rd_ser_len);
1742   ztnr_msg->rd_count = htons (rd_count);
1743   ztnr_msg->name_len = htons (name_len);
1744   ztnr_msg->zone = *zone_key;
1745   name_tmp = (char *) &ztnr_msg[1];
1746   GNUNET_memcpy (name_tmp,
1747                  name,
1748                  name_len);
1749   rd_tmp = &name_tmp[name_len];
1750   GNUNET_assert (rd_ser_len ==
1751                  GNUNET_GNSRECORD_records_serialize (rd_count,
1752                                                      rd,
1753                                                      rd_ser_len,
1754                                                      rd_tmp));
1755   ztn_ctx->success = GNUNET_OK;
1756   GNUNET_MQ_send (ztn_ctx->nc->mq,
1757                   env);
1758 }
1759
1760
1761 /**
1762  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME message
1763  *
1764  * @param cls client client sending the message
1765  * @param ztn_msg message of type 'struct ZoneToNameMessage'
1766  */
1767 static void
1768 handle_zone_to_name (void *cls,
1769                      const struct ZoneToNameMessage *ztn_msg)
1770 {
1771   struct NamestoreClient *nc = cls;
1772   struct ZoneToNameCtx ztn_ctx;
1773   struct GNUNET_MQ_Envelope *env;
1774   struct ZoneToNameResponseMessage *ztnr_msg;
1775
1776   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1777               "Received ZONE_TO_NAME message\n");
1778   ztn_ctx.rid = ntohl (ztn_msg->gns_header.r_id);
1779   ztn_ctx.nc = nc;
1780   ztn_ctx.success = GNUNET_NO;
1781   if (GNUNET_SYSERR ==
1782       GSN_database->zone_to_name (GSN_database->cls,
1783                                   &ztn_msg->zone,
1784                                   &ztn_msg->value_zone,
1785                                   &handle_zone_to_name_it, &ztn_ctx))
1786   {
1787     /* internal error, hang up instead of signalling something
1788        that might be wrong */
1789     GNUNET_break (0);
1790     GNUNET_SERVICE_client_drop (nc->client);
1791     return;
1792   }
1793   if (GNUNET_NO == ztn_ctx.success)
1794   {
1795     /* no result found, send empty response */
1796     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1797                 "Found no result for zone-to-name lookup.\n");
1798     env = GNUNET_MQ_msg (ztnr_msg,
1799                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1800     ztnr_msg->gns_header.r_id = ztn_msg->gns_header.r_id;
1801     ztnr_msg->res = htons (GNUNET_NO);
1802     GNUNET_MQ_send (nc->mq,
1803                     env);
1804   }
1805   GNUNET_SERVICE_client_continue (nc->client);
1806 }
1807
1808
1809 /**
1810  * Context for record remove operations passed from
1811  * #run_zone_iteration_round to #zone_iterate_proc as closure
1812  */
1813 struct ZoneIterationProcResult
1814 {
1815   /**
1816    * The zone iteration handle
1817    */
1818   struct ZoneIteration *zi;
1819
1820   /**
1821    * Number of results left to be returned in this iteration.
1822    */
1823   uint64_t limit;
1824
1825 };
1826
1827
1828 /**
1829  * Process results for zone iteration from database
1830  *
1831  * @param cls struct ZoneIterationProcResult
1832  * @param seq sequence number of the record, MUST NOT BE ZERO
1833  * @param zone_key the zone key
1834  * @param name name
1835  * @param rd_count number of records for this name
1836  * @param rd record data
1837  */
1838 static void
1839 zone_iterate_proc (void *cls,
1840                    uint64_t seq,
1841                    const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1842                    const char *name,
1843                    unsigned int rd_count,
1844                    const struct GNUNET_GNSRECORD_Data *rd)
1845 {
1846   struct ZoneIterationProcResult *proc = cls;
1847   int do_refresh_block;
1848
1849   GNUNET_assert (0 != seq);
1850   if ( (NULL == zone_key) &&
1851        (NULL == name) )
1852   {
1853     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1854                 "Iteration done\n");
1855     return;
1856   }
1857   if ( (NULL == zone_key) ||
1858        (NULL == name) )
1859   {
1860     /* what is this!? should never happen */
1861     GNUNET_break (0);
1862     return;
1863   }
1864   if (0 == proc->limit)
1865   {
1866     /* what is this!? should never happen */
1867     GNUNET_break (0);
1868     return;
1869   }
1870   proc->limit--;
1871   proc->zi->seq = seq;
1872   send_lookup_response (proc->zi->nc,
1873                         proc->zi->request_id,
1874                         zone_key,
1875                         name,
1876                         rd_count,
1877                         rd);
1878
1879
1880   do_refresh_block = GNUNET_NO;
1881   for (unsigned int i=0;i<rd_count;i++)
1882     if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
1883     {
1884       do_refresh_block = GNUNET_YES;
1885       break;
1886     }
1887   if (GNUNET_YES == do_refresh_block)
1888     refresh_block (NULL,
1889                    proc->zi,
1890                    0,
1891                    zone_key,
1892                    name,
1893                    rd_count,
1894                    rd);
1895 }
1896
1897
1898 /**
1899  * Perform the next round of the zone iteration.
1900  *
1901  * @param zi zone iterator to process
1902  * @param limit number of results to return in one pass
1903  */
1904 static void
1905 run_zone_iteration_round (struct ZoneIteration *zi,
1906                           uint64_t limit)
1907 {
1908   struct ZoneIterationProcResult proc;
1909   struct GNUNET_TIME_Absolute start;
1910   struct GNUNET_TIME_Relative duration;
1911
1912   memset (&proc,
1913           0,
1914           sizeof (proc));
1915   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1916               "Asked to return up to %llu records at position %llu\n",
1917               (unsigned long long) limit,
1918               (unsigned long long) zi->seq);
1919   proc.zi = zi;
1920   proc.limit = limit;
1921   start = GNUNET_TIME_absolute_get ();
1922   GNUNET_break (GNUNET_SYSERR !=
1923                 GSN_database->iterate_records (GSN_database->cls,
1924                                                (0 == memcmp (&zi->zone,
1925                                                              &zero,
1926                                                              sizeof (zero)))
1927                                                ? NULL
1928                                                : &zi->zone,
1929                                                zi->seq,
1930                                                limit,
1931                                                &zone_iterate_proc,
1932                                                &proc));
1933   duration = GNUNET_TIME_absolute_get_duration (start);
1934   duration = GNUNET_TIME_relative_divide (duration,
1935                                           limit - proc.limit);
1936   GNUNET_STATISTICS_set (statistics,
1937                          "NAMESTORE iteration delay (μs/record)",
1938                          duration.rel_value_us,
1939                          GNUNET_NO);
1940   if (0 == proc.limit)
1941     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1942                 "Returned %llu results, more results available\n",
1943                 (unsigned long long) limit);
1944   zi->send_end = (0 != proc.limit);
1945   if (0 == zi->cache_ops)
1946     zone_iteration_done_client_continue (zi);
1947 }
1948
1949
1950 /**
1951  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START message
1952  *
1953  * @param cls the client sending the message
1954  * @param zis_msg message from the client
1955  */
1956 static void
1957 handle_iteration_start (void *cls,
1958                         const struct ZoneIterationStartMessage *zis_msg)
1959 {
1960   struct NamestoreClient *nc = cls;
1961   struct ZoneIteration *zi;
1962
1963   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1964               "Received ZONE_ITERATION_START message\n");
1965   zi = GNUNET_new (struct ZoneIteration);
1966   zi->request_id = ntohl (zis_msg->gns_header.r_id);
1967   zi->offset = 0;
1968   zi->nc = nc;
1969   zi->zone = zis_msg->zone;
1970
1971   GNUNET_CONTAINER_DLL_insert (nc->op_head,
1972                                nc->op_tail,
1973                                zi);
1974   run_zone_iteration_round (zi,
1975                             1);
1976 }
1977
1978
1979 /**
1980  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP message
1981  *
1982  * @param cls the client sending the message
1983  * @param zis_msg message from the client
1984  */
1985 static void
1986 handle_iteration_stop (void *cls,
1987                        const struct ZoneIterationStopMessage *zis_msg)
1988 {
1989   struct NamestoreClient *nc = cls;
1990   struct ZoneIteration *zi;
1991   uint32_t rid;
1992
1993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1994               "Received ZONE_ITERATION_STOP message\n");
1995   rid = ntohl (zis_msg->gns_header.r_id);
1996   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1997     if (zi->request_id == rid)
1998       break;
1999   if (NULL == zi)
2000   {
2001     GNUNET_break (0);
2002     GNUNET_SERVICE_client_drop (nc->client);
2003     return;
2004   }
2005   GNUNET_CONTAINER_DLL_remove (nc->op_head,
2006                                nc->op_tail,
2007                                zi);
2008   GNUNET_free (zi);
2009   GNUNET_SERVICE_client_continue (nc->client);
2010 }
2011
2012
2013 /**
2014  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message
2015  *
2016  * @param cls the client sending the message
2017  * @param message message from the client
2018  */
2019 static void
2020 handle_iteration_next (void *cls,
2021                        const struct ZoneIterationNextMessage *zis_msg)
2022 {
2023   struct NamestoreClient *nc = cls;
2024   struct ZoneIteration *zi;
2025   uint32_t rid;
2026   uint64_t limit;
2027
2028   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2029               "Received ZONE_ITERATION_NEXT message\n");
2030   GNUNET_STATISTICS_update (statistics,
2031                             "Iteration NEXT messages received",
2032                             1,
2033                             GNUNET_NO);
2034   rid = ntohl (zis_msg->gns_header.r_id);
2035   limit = GNUNET_ntohll (zis_msg->limit);
2036   for (zi = nc->op_head; NULL != zi; zi = zi->next)
2037     if (zi->request_id == rid)
2038       break;
2039   if (NULL == zi)
2040   {
2041     GNUNET_break (0);
2042     GNUNET_SERVICE_client_drop (nc->client);
2043     return;
2044   }
2045   run_zone_iteration_round (zi,
2046                             limit);
2047 }
2048
2049
2050 /**
2051  * Function called when the monitor is ready for more data, and we
2052  * should thus unblock PUT operations that were blocked on the
2053  * monitor not being ready.
2054  */
2055 static void
2056 monitor_unblock (struct ZoneMonitor *zm)
2057 {
2058   struct StoreActivity *sa = sa_head;
2059
2060   while ( (NULL != sa) &&
2061           (zm->limit > zm->iteration_cnt) )
2062   {
2063     struct StoreActivity *sn = sa->next;
2064
2065     if (sa->zm_pos == zm)
2066       continue_store_activity (sa);
2067     sa = sn;
2068   }
2069   if (zm->limit > zm->iteration_cnt)
2070   {
2071     zm->sa_waiting = GNUNET_NO;
2072     if (NULL != zm->sa_wait_warning)
2073     {
2074       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2075       zm->sa_wait_warning = NULL;
2076     }
2077   }
2078   else if (GNUNET_YES == zm->sa_waiting)
2079   {
2080     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2081     if (NULL != zm->sa_wait_warning)
2082       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2083     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2084                                                         &warn_monitor_slow,
2085                                                         zm);
2086   }
2087 }
2088
2089
2090 /**
2091  * Send 'sync' message to zone monitor, we're now in sync.
2092  *
2093  * @param zm monitor that is now in sync
2094  */
2095 static void
2096 monitor_sync (struct ZoneMonitor *zm)
2097 {
2098   struct GNUNET_MQ_Envelope *env;
2099   struct GNUNET_MessageHeader *sync;
2100
2101   env = GNUNET_MQ_msg (sync,
2102                        GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC);
2103   GNUNET_MQ_send (zm->nc->mq,
2104                   env);
2105   /* mark iteration done */
2106   zm->in_first_iteration = GNUNET_NO;
2107   zm->iteration_cnt = 0;
2108   if ( (zm->limit > 0) &&
2109        (zm->sa_waiting) )
2110     monitor_unblock (zm);
2111 }
2112
2113
2114 /**
2115  * Obtain the next datum during the zone monitor's zone initial iteration.
2116  *
2117  * @param cls zone monitor that does its initial iteration
2118  */
2119 static void
2120 monitor_iteration_next (void *cls);
2121
2122
2123 /**
2124  * A #GNUNET_NAMESTORE_RecordIterator for monitors.
2125  *
2126  * @param cls a 'struct ZoneMonitor *' with information about the monitor
2127  * @param seq sequence number of the record, MUST NOT BE ZERO
2128  * @param zone_key zone key of the zone
2129  * @param name name
2130  * @param rd_count number of records in @a rd
2131  * @param rd array of records
2132  */
2133 static void
2134 monitor_iterate_cb (void *cls,
2135                     uint64_t seq,
2136                     const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
2137                     const char *name,
2138                     unsigned int rd_count,
2139                     const struct GNUNET_GNSRECORD_Data *rd)
2140 {
2141   struct ZoneMonitor *zm = cls;
2142
2143   GNUNET_assert (0 != seq);
2144   zm->seq = seq;
2145   GNUNET_assert (NULL != name);
2146   GNUNET_STATISTICS_update (statistics,
2147                             "Monitor notifications sent",
2148                             1,
2149                             GNUNET_NO);
2150   zm->limit--;
2151   zm->iteration_cnt--;
2152   send_lookup_response (zm->nc,
2153                         0,
2154                         zone_key,
2155                         name,
2156                         rd_count,
2157                         rd);
2158   if ( (0 == zm->iteration_cnt) &&
2159        (0 != zm->limit) )
2160   {
2161     /* We are done with the current iteration batch, AND the
2162        client would right now accept more, so go again! */
2163     GNUNET_assert (NULL == zm->task);
2164     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2165                                          zm);
2166   }
2167 }
2168
2169
2170 /**
2171  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START message
2172  *
2173  * @param cls the client sending the message
2174  * @param zis_msg message from the client
2175  */
2176 static void
2177 handle_monitor_start (void *cls,
2178                       const struct ZoneMonitorStartMessage *zis_msg)
2179 {
2180   struct NamestoreClient *nc = cls;
2181   struct ZoneMonitor *zm;
2182
2183   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2184               "Received ZONE_MONITOR_START message\n");
2185   zm = GNUNET_new (struct ZoneMonitor);
2186   zm->nc = nc;
2187   zm->zone = zis_msg->zone;
2188   zm->limit = 1;
2189   zm->in_first_iteration = (GNUNET_YES == ntohl (zis_msg->iterate_first));
2190   GNUNET_CONTAINER_DLL_insert (monitor_head,
2191                                monitor_tail,
2192                                zm);
2193   GNUNET_SERVICE_client_mark_monitor (nc->client);
2194   GNUNET_SERVICE_client_continue (nc->client);
2195   GNUNET_notification_context_add (monitor_nc,
2196                                    nc->mq);
2197   if (zm->in_first_iteration)
2198     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2199                                          zm);
2200   else
2201     monitor_sync (zm);
2202 }
2203
2204
2205 /**
2206  * Obtain the next datum during the zone monitor's zone initial iteration.
2207  *
2208  * @param cls zone monitor that does its initial iteration
2209  */
2210 static void
2211 monitor_iteration_next (void *cls)
2212 {
2213   struct ZoneMonitor *zm = cls;
2214   int ret;
2215
2216   zm->task = NULL;
2217   GNUNET_assert (0 == zm->iteration_cnt);
2218   if (zm->limit > 16)
2219     zm->iteration_cnt = zm->limit / 2; /* leave half for monitor events */
2220   else
2221     zm->iteration_cnt = zm->limit; /* use it all */
2222   ret = GSN_database->iterate_records (GSN_database->cls,
2223                                        (0 == memcmp (&zm->zone,
2224                                                      &zero,
2225                                                      sizeof (zero)))
2226                                        ? NULL
2227                                        : &zm->zone,
2228                                        zm->seq,
2229                                        zm->iteration_cnt,
2230                                        &monitor_iterate_cb,
2231                                        zm);
2232   if (GNUNET_SYSERR == ret)
2233   {
2234     GNUNET_SERVICE_client_drop (zm->nc->client);
2235     return;
2236   }
2237   if (GNUNET_NO == ret)
2238   {
2239     /* empty zone */
2240     monitor_sync (zm);
2241     return;
2242   }
2243 }
2244
2245
2246 /**
2247  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT message
2248  *
2249  * @param cls the client sending the message
2250  * @param nm message from the client
2251  */
2252 static void
2253 handle_monitor_next (void *cls,
2254                      const struct ZoneMonitorNextMessage *nm)
2255 {
2256   struct NamestoreClient *nc = cls;
2257   struct ZoneMonitor *zm;
2258   uint64_t inc;
2259
2260   inc = GNUNET_ntohll (nm->limit);
2261   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2262               "Received ZONE_MONITOR_NEXT message with limit %llu\n",
2263               (unsigned long long) inc);
2264   for (zm = monitor_head; NULL != zm; zm = zm->next)
2265     if (zm->nc == nc)
2266       break;
2267   if (NULL == zm)
2268   {
2269     GNUNET_break (0);
2270     GNUNET_SERVICE_client_drop (nc->client);
2271     return;
2272   }
2273   GNUNET_SERVICE_client_continue (nc->client);
2274   if (zm->limit + inc < zm->limit)
2275   {
2276     GNUNET_break (0);
2277     GNUNET_SERVICE_client_drop (nc->client);
2278     return;
2279   }
2280   zm->limit += inc;
2281   if ( (zm->in_first_iteration) &&
2282        (zm->limit == inc) )
2283   {
2284     /* We are still iterating, and the previous iteration must
2285        have stopped due to the client's limit, so continue it! */
2286     GNUNET_assert (NULL == zm->task);
2287     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2288                                          zm);
2289   }
2290   GNUNET_assert (zm->iteration_cnt <= zm->limit);
2291   if ( (zm->limit > zm->iteration_cnt) &&
2292        (zm->sa_waiting) )
2293   {
2294     monitor_unblock (zm);
2295   }
2296   else if (GNUNET_YES == zm->sa_waiting)
2297   {
2298     if (NULL != zm->sa_wait_warning)
2299       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2300     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2301     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2302                                                         &warn_monitor_slow,
2303                                                         zm);
2304   }
2305 }
2306
2307
2308 /**
2309  * Process namestore requests.
2310  *
2311  * @param cls closure
2312  * @param cfg configuration to use
2313  * @param service the initialized service
2314  */
2315 static void
2316 run (void *cls,
2317      const struct GNUNET_CONFIGURATION_Handle *cfg,
2318      struct GNUNET_SERVICE_Handle *service)
2319 {
2320   char *database;
2321
2322   (void) cls;
2323   (void) service;
2324   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2325               "Starting namestore service\n");
2326   cache_keys = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2327                                                      "namestore",
2328                                                      "CACHE_KEYS");
2329   disable_namecache = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2330                                                             "namecache",
2331                                                             "DISABLE");
2332   GSN_cfg = cfg;
2333   monitor_nc = GNUNET_notification_context_create (1);
2334   if (GNUNET_YES != disable_namecache)
2335   {
2336     namecache = GNUNET_NAMECACHE_connect (cfg);
2337     GNUNET_assert (NULL != namecache);
2338   }
2339   /* Loading database plugin */
2340   if (GNUNET_OK !=
2341       GNUNET_CONFIGURATION_get_value_string (cfg,
2342                                              "namestore",
2343                                              "database",
2344                                              &database))
2345     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2346                 "No database backend configured\n");
2347
2348   GNUNET_asprintf (&db_lib_name,
2349                    "libgnunet_plugin_namestore_%s",
2350                    database);
2351   GSN_database = GNUNET_PLUGIN_load (db_lib_name,
2352                                      (void *) GSN_cfg);
2353   GNUNET_free (database);
2354   statistics = GNUNET_STATISTICS_create ("namestore",
2355                                          cfg);
2356   GNUNET_SCHEDULER_add_shutdown (&cleanup_task,
2357                                  NULL);
2358   if (NULL == GSN_database)
2359   {
2360     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2361                 "Could not load database backend `%s'\n",
2362                 db_lib_name);
2363     GNUNET_SCHEDULER_shutdown ();
2364     return;
2365   }
2366 }
2367
2368
2369 /**
2370  * Define "main" method using service macro.
2371  */
2372 GNUNET_SERVICE_MAIN
2373 ("namestore",
2374  GNUNET_SERVICE_OPTION_NONE,
2375  &run,
2376  &client_connect_cb,
2377  &client_disconnect_cb,
2378  NULL,
2379  GNUNET_MQ_hd_var_size (record_store,
2380                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE,
2381                         struct RecordStoreMessage,
2382                         NULL),
2383  GNUNET_MQ_hd_var_size (record_lookup,
2384                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP,
2385                         struct LabelLookupMessage,
2386                         NULL),
2387  GNUNET_MQ_hd_fixed_size (zone_to_name,
2388                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME,
2389                           struct ZoneToNameMessage,
2390                           NULL),
2391  GNUNET_MQ_hd_fixed_size (iteration_start,
2392                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START,
2393                           struct ZoneIterationStartMessage,
2394                           NULL),
2395  GNUNET_MQ_hd_fixed_size (iteration_next,
2396                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT,
2397                           struct ZoneIterationNextMessage,
2398                           NULL),
2399  GNUNET_MQ_hd_fixed_size (iteration_stop,
2400                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP,
2401                           struct ZoneIterationStopMessage,
2402                           NULL),
2403  GNUNET_MQ_hd_fixed_size (monitor_start,
2404                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START,
2405                           struct ZoneMonitorStartMessage,
2406                           NULL),
2407  GNUNET_MQ_hd_fixed_size (monitor_next,
2408                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT,
2409                           struct ZoneMonitorNextMessage,
2410                           NULL),
2411  GNUNET_MQ_handler_end ());
2412
2413
2414 /* end of gnunet-service-namestore.c */