implement filtering by record type in gnunet-namestore
[oweals/gnunet.git] / src / namestore / gnunet-service-namestore.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file namestore/gnunet-service-namestore.c
23  * @brief namestore for the GNUnet naming system
24  * @author Matthias Wachs
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - "get_nick_record" is a bottleneck, introduce a cache to
29  *   avoid looking it up again and again (for the same few
30  *   zones that the user will typically manage!)
31  * - run testcases, make sure everything works!
32  */
33 #include "platform.h"
34 #include "gnunet_util_lib.h"
35 #include "gnunet_dnsparser_lib.h"
36 #include "gnunet_gns_service.h"
37 #include "gnunet_namecache_service.h"
38 #include "gnunet_namestore_service.h"
39 #include "gnunet_namestore_plugin.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet_signatures.h"
42 #include "namestore.h"
43
44 #define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
45
46 /**
47  * If a monitor takes more than 1 minute to process an event, print a warning.
48  */
49 #define MONITOR_STALL_WARN_DELAY GNUNET_TIME_UNIT_MINUTES
50
51 /**
52  * Size of the cache used by #get_nick_record()
53  */
54 #define NC_SIZE 16
55
56 /**
57  * A namestore client
58  */
59 struct NamestoreClient;
60
61
62 /**
63  * A namestore iteration operation.
64  */
65 struct ZoneIteration
66 {
67   /**
68    * Next element in the DLL
69    */
70   struct ZoneIteration *next;
71
72   /**
73    * Previous element in the DLL
74    */
75   struct ZoneIteration *prev;
76
77   /**
78    * Namestore client which intiated this zone iteration
79    */
80   struct NamestoreClient *nc;
81
82   /**
83    * The nick to add to the records
84    */
85   struct GNUNET_GNSRECORD_Data *nick;
86
87   /**
88    * Key of the zone we are iterating over.
89    */
90   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
91
92   /**
93    * Last sequence number in the zone iteration used to address next
94    * result of the zone iteration in the store
95    *
96    * Initialy set to 0.
97    * Updated in #zone_iterate_proc()
98    */
99   uint64_t seq;
100
101   /**
102    * The operation id fot the zone iteration in the response for the client
103    */
104   uint32_t request_id;
105
106   /**
107    * Offset of the zone iteration used to address next result of the zone
108    * iteration in the store
109    *
110    * Initialy set to 0 in #handle_iteration_start
111    * Incremented with by every call to #handle_iteration_next
112    */
113   uint32_t offset;
114
115   /**
116    * Number of pending cache operations triggered by this zone iteration which we
117    * need to wait for before allowing the client to continue.
118    */
119   unsigned int cache_ops;
120
121   /**
122    * Set to #GNUNET_YES if the last iteration exhausted the limit set by the
123    * client and we should send the #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END
124    * message and free the data structure once @e cache_ops is zero.
125    */
126   int send_end;
127
128 };
129
130
131 /**
132  * A namestore client
133  */
134 struct NamestoreClient
135 {
136
137   /**
138    * The client
139    */
140   struct GNUNET_SERVICE_Client *client;
141
142   /**
143    * Message queue for transmission to @e client
144    */
145   struct GNUNET_MQ_Handle *mq;
146
147   /**
148    * Head of the DLL of
149    * Zone iteration operations in progress initiated by this client
150    */
151   struct ZoneIteration *op_head;
152
153   /**
154    * Tail of the DLL of
155    * Zone iteration operations in progress initiated by this client
156    */
157   struct ZoneIteration *op_tail;
158 };
159
160
161 /**
162  * A namestore monitor.
163  */
164 struct ZoneMonitor
165 {
166   /**
167    * Next element in the DLL
168    */
169   struct ZoneMonitor *next;
170
171   /**
172    * Previous element in the DLL
173    */
174   struct ZoneMonitor *prev;
175
176   /**
177    * Namestore client which intiated this zone monitor
178    */
179   struct NamestoreClient *nc;
180
181   /**
182    * Private key of the zone.
183    */
184   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
185
186   /**
187    * Task active during initial iteration.
188    */
189   struct GNUNET_SCHEDULER_Task *task;
190
191   /**
192    * Task to warn about slow monitors.
193    */
194   struct GNUNET_SCHEDULER_Task *sa_wait_warning;
195
196   /**
197    * Since when are we blocked on this monitor?
198    */
199   struct GNUNET_TIME_Absolute sa_waiting_start;
200
201   /**
202    * Last sequence number in the zone iteration used to address next
203    * result of the zone iteration in the store
204    *
205    * Initialy set to 0.
206    * Updated in #monitor_iterate_cb()
207    */
208   uint64_t seq;
209
210   /**
211    * Current limit of how many more messages we are allowed
212    * to queue to this monitor.
213    */
214   uint64_t limit;
215
216   /**
217    * How many more requests may we receive from the iterator
218    * before it is at the limit we gave it?  Will be below or
219    * equal to @e limit.  The effective limit for monitor
220    * events is thus @e iteration_cnt - @e limit!
221    */
222   uint64_t iteration_cnt;
223
224   /**
225    * Are we (still) in the initial iteration pass?
226    */
227   int in_first_iteration;
228
229   /**
230    * Is there a store activity waiting for this monitor?  We only raise the
231    * flag when it happens and search the DLL for the store activity when we
232    * had a limit increase.  If we cannot find any waiting store activity at
233    * that time, we clear the flag again.
234    */
235   int sa_waiting;
236
237 };
238
239
240 /**
241  * Pending operation on the namecache.
242  */
243 struct CacheOperation
244 {
245
246   /**
247    * Kept in a DLL.
248    */
249   struct CacheOperation *prev;
250
251   /**
252    * Kept in a DLL.
253    */
254   struct CacheOperation *next;
255
256   /**
257    * Handle to namecache queue.
258    */
259   struct GNUNET_NAMECACHE_QueueEntry *qe;
260
261   /**
262    * Client to notify about the result, can be NULL.
263    */
264   struct NamestoreClient *nc;
265
266   /**
267    * Zone iteration to call #zone_iteration_done_client_continue()
268    * for if applicable, can be NULL.
269    */
270   struct ZoneIteration *zi;
271
272   /**
273    * Client's request ID.
274    */
275   uint32_t rid;
276 };
277
278
279 /**
280  * Information for an ongoing #handle_record_store() operation.
281  * Needed as we may wait for monitors to be ready for the notification.
282  */
283 struct StoreActivity
284 {
285   /**
286    * Kept in a DLL.
287    */
288   struct StoreActivity *next;
289
290   /**
291    * Kept in a DLL.
292    */
293   struct StoreActivity *prev;
294
295   /**
296    * Which client triggered the store activity?
297    */
298   struct NamestoreClient *nc;
299
300   /**
301    * Copy of the original store message (as data fields in @e rd will
302    * point into it!).
303    */
304   const struct RecordStoreMessage *rsm;
305
306   /**
307    * Next zone monitor that still needs to be notified about this PUT.
308    */
309   struct ZoneMonitor *zm_pos;
310
311   /**
312    * Label nicely canonicalized (lower case).
313    */
314   char *conv_name;
315
316 };
317
318
319 /**
320  * Entry in list of cached nick resolutions.
321  */
322 struct NickCache
323 {
324   /**
325    * Zone the cache entry is for.
326    */
327   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
328
329   /**
330    * Cached record data.
331    */
332   struct GNUNET_GNSRECORD_Data *rd;
333
334   /**
335    * Timestamp when this cache entry was used last.
336    */
337   struct GNUNET_TIME_Absolute last_used;
338 };
339
340
341 /**
342  * We cache nick records to reduce DB load.
343  */
344 static struct NickCache nick_cache[NC_SIZE];
345
346 /**
347  * Public key of all zeros.
348  */
349 static const struct GNUNET_CRYPTO_EcdsaPrivateKey zero;
350
351 /**
352  * Configuration handle.
353  */
354 static const struct GNUNET_CONFIGURATION_Handle *GSN_cfg;
355
356 /**
357  * Handle to the statistics service
358  */
359 static struct GNUNET_STATISTICS_Handle *statistics;
360
361 /**
362  * Namecache handle.
363  */
364 static struct GNUNET_NAMECACHE_Handle *namecache;
365
366 /**
367  * Database handle
368  */
369 static struct GNUNET_NAMESTORE_PluginFunctions *GSN_database;
370
371 /**
372  * Name of the database plugin
373  */
374 static char *db_lib_name;
375
376 /**
377  * Head of cop DLL.
378  */
379 static struct CacheOperation *cop_head;
380
381 /**
382  * Tail of cop DLL.
383  */
384 static struct CacheOperation *cop_tail;
385
386 /**
387  * First active zone monitor.
388  */
389 static struct ZoneMonitor *monitor_head;
390
391 /**
392  * Last active zone monitor.
393  */
394 static struct ZoneMonitor *monitor_tail;
395
396 /**
397  * Head of DLL of monitor-blocked store activities.
398  */
399 static struct StoreActivity *sa_head;
400
401 /**
402  * Tail of DLL of monitor-blocked store activities.
403  */
404 static struct StoreActivity *sa_tail;
405
406 /**
407  * Notification context shared by all monitors.
408  */
409 static struct GNUNET_NotificationContext *monitor_nc;
410
411 /**
412  * Optimize block insertion by caching map of private keys to
413  * public keys in memory?
414  */
415 static int cache_keys;
416
417 /**
418  * Use the namecache? Doing so creates additional cryptographic
419  * operations whenever we touch a record.
420  */
421 static int disable_namecache;
422
423
424 /**
425  * Task run during shutdown.
426  *
427  * @param cls unused
428  */
429 static void
430 cleanup_task (void *cls)
431 {
432   struct CacheOperation *cop;
433
434   (void) cls;
435   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
436               "Stopping namestore service\n");
437   while (NULL != (cop = cop_head))
438   {
439     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
440                 "Aborting incomplete namecache operation\n");
441     GNUNET_NAMECACHE_cancel (cop->qe);
442     GNUNET_CONTAINER_DLL_remove (cop_head,
443                                  cop_tail,
444                                  cop);
445     GNUNET_free (cop);
446   }
447   if (NULL != namecache)
448   {
449     GNUNET_NAMECACHE_disconnect (namecache);
450     namecache = NULL;
451   }
452   GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name,
453                                               GSN_database));
454   GNUNET_free (db_lib_name);
455   db_lib_name = NULL;
456   if (NULL != monitor_nc)
457   {
458     GNUNET_notification_context_destroy (monitor_nc);
459     monitor_nc = NULL;
460   }
461   if (NULL != statistics)
462   {
463     GNUNET_STATISTICS_destroy (statistics,
464                                GNUNET_NO);
465     statistics = NULL;
466   }
467 }
468
469
470 /**
471  * Release memory used by @a sa.
472  *
473  * @param sa activity to free
474  */
475 static void
476 free_store_activity (struct StoreActivity *sa)
477 {
478   GNUNET_CONTAINER_DLL_remove (sa_head,
479                                sa_tail,
480                                sa);
481   GNUNET_free (sa->conv_name);
482   GNUNET_free (sa);
483 }
484
485
486 /**
487  * Function called with the records for the #GNUNET_GNS_EMPTY_LABEL_AT
488  * label in the zone.  Used to locate the #GNUNET_GNSRECORD_TYPE_NICK
489  * record, which (if found) is then copied to @a cls for future use.
490  *
491  * @param cls a `struct GNUNET_GNSRECORD_Data **` for storing the nick (if found)
492  * @param seq sequence number of the record, MUST NOT BE ZERO
493  * @param private_key the private key of the zone (unused)
494  * @param label should be #GNUNET_GNS_EMPTY_LABEL_AT
495  * @param rd_count number of records in @a rd
496  * @param rd records stored under @a label in the zone
497  */
498 static void
499 lookup_nick_it (void *cls,
500                 uint64_t seq,
501                 const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
502                 const char *label,
503                 unsigned int rd_count,
504                 const struct GNUNET_GNSRECORD_Data *rd)
505 {
506   struct GNUNET_GNSRECORD_Data **res = cls;
507
508   (void) private_key;
509   GNUNET_assert (0 != seq);
510   if (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT))
511   {
512     GNUNET_break (0);
513     return;
514   }
515   for (unsigned int c = 0; c < rd_count; c++)
516   {
517     if (GNUNET_GNSRECORD_TYPE_NICK == rd[c].record_type)
518     {
519       (*res) = GNUNET_malloc (rd[c].data_size + sizeof (struct GNUNET_GNSRECORD_Data));
520       (*res)->data = &(*res)[1];
521       GNUNET_memcpy ((void *) (*res)->data,
522                      rd[c].data,
523                      rd[c].data_size);
524       (*res)->data_size = rd[c].data_size;
525       (*res)->expiration_time = rd[c].expiration_time;
526       (*res)->flags = rd[c].flags;
527       (*res)->record_type = GNUNET_GNSRECORD_TYPE_NICK;
528       return;
529     }
530   }
531   (*res) = NULL;
532 }
533
534
535 /**
536  * Add entry to the cache for @a zone and @a nick
537  *
538  * @param zone zone key to cache under
539  * @param nick nick entry to cache
540  */
541 static void
542 cache_nick (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone,
543             const struct GNUNET_GNSRECORD_Data *nick)
544 {
545   struct NickCache *oldest;
546
547   oldest = NULL;
548   for (unsigned int i=0;i<NC_SIZE;i++)
549   {
550     struct NickCache *pos = &nick_cache[i];
551
552     if ( (NULL == oldest) ||
553          (oldest->last_used.abs_value_us >
554           pos->last_used.abs_value_us) )
555       oldest = pos;
556     if (0 == GNUNET_memcmp (zone,
557                      &pos->zone))
558     {
559       oldest = pos;
560       break;
561     }
562   }
563   GNUNET_free_non_null (oldest->rd);
564   oldest->zone = *zone;
565   oldest->rd = GNUNET_malloc (sizeof (*nick) +
566                               nick->data_size);
567   *oldest->rd = *nick;
568   oldest->rd->data = &oldest->rd[1];
569   memcpy (&oldest->rd[1],
570           nick->data,
571           nick->data_size);
572   oldest->last_used = GNUNET_TIME_absolute_get ();
573 }
574
575
576 /**
577  * Return the NICK record for the zone (if it exists).
578  *
579  * @param zone private key for the zone to look for nick
580  * @return NULL if no NICK record was found
581  */
582 static struct GNUNET_GNSRECORD_Data *
583 get_nick_record (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone)
584 {
585   struct GNUNET_CRYPTO_EcdsaPublicKey pub;
586   struct GNUNET_GNSRECORD_Data *nick;
587   int res;
588
589   /* check cache first */
590   for (unsigned int i=0;i<NC_SIZE;i++)
591   {
592     struct NickCache *pos = &nick_cache[i];
593     if ( (NULL != pos->rd) &&
594          (0 == GNUNET_memcmp (zone,
595                        &pos->zone)) )
596     {
597       nick = GNUNET_malloc (sizeof (*nick) +
598                             pos->rd->data_size);
599       *nick = *pos->rd;
600       nick->data = &nick[1];
601       memcpy (&nick[1],
602               pos->rd->data,
603               pos->rd->data_size);
604       pos->last_used = GNUNET_TIME_absolute_get ();
605       return nick;
606     }
607   }
608
609   nick = NULL;
610   res = GSN_database->lookup_records (GSN_database->cls,
611                                       zone,
612                                       GNUNET_GNS_EMPTY_LABEL_AT,
613                                       &lookup_nick_it,
614                                       &nick);
615   if ( (GNUNET_OK != res) ||
616        (NULL == nick) )
617   {
618     GNUNET_CRYPTO_ecdsa_key_get_public (zone, &pub);
619     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
620                 "No nick name set for zone `%s'\n",
621                 GNUNET_GNSRECORD_z2s (&pub));
622     return NULL;
623   }
624
625   /* update cache */
626   cache_nick (zone,
627               nick);
628   return nick;
629 }
630
631
632 /**
633  * Merge the nick record @a nick_rd with the rest of the
634  * record set given in @a rd2.  Store the result in @a rdc_res
635  * and @a rd_res.  The @a nick_rd's expiration time is set to
636  * the maximum expiration time of all of the records in @a rd2.
637  *
638  * @param nick_rd the nick record to integrate
639  * @param rd2_length length of the @a rd2 array
640  * @param rd2 array of records
641  * @param rdc_res[out] length of the resulting @a rd_res array
642  * @param rd_res[out] set to an array of records,
643  *                    including @a nick_rd and @a rd2;
644  *           all of the variable-size 'data' fields in @a rd2 are
645  *           allocated in the same chunk of memory!
646  */
647 static void
648 merge_with_nick_records (const struct GNUNET_GNSRECORD_Data *nick_rd,
649                          unsigned int rd2_length,
650                          const struct GNUNET_GNSRECORD_Data *rd2,
651                          unsigned int *rdc_res,
652                          struct GNUNET_GNSRECORD_Data **rd_res)
653 {
654   uint64_t latest_expiration;
655   size_t req;
656   char *data;
657   size_t data_offset;
658   struct GNUNET_GNSRECORD_Data *target;
659
660   (*rdc_res) = 1 + rd2_length;
661   if (0 == 1 + rd2_length)
662   {
663     GNUNET_break (0);
664     (*rd_res) = NULL;
665     return;
666   }
667   req = sizeof (struct GNUNET_GNSRECORD_Data) + nick_rd->data_size;
668   for (unsigned int i=0; i<rd2_length; i++)
669   {
670     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
671
672     if (req + sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size < req)
673     {
674       GNUNET_break (0);
675       (*rd_res) = NULL;
676       return;
677     }
678     req += sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size;
679   }
680   target = GNUNET_malloc (req);
681   (*rd_res) = target;
682   data = (char *) &target[1 + rd2_length];
683   data_offset = 0;
684   latest_expiration = 0;
685   for (unsigned int i=0;i<rd2_length;i++)
686   {
687     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
688
689     if (0 != (orig->flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
690     {
691       if ((GNUNET_TIME_absolute_get().abs_value_us + orig->expiration_time) >
692           latest_expiration)
693         latest_expiration = orig->expiration_time;
694     }
695     else if (orig->expiration_time > latest_expiration)
696       latest_expiration = orig->expiration_time;
697     target[i] = *orig;
698     target[i].data = (void *) &data[data_offset];
699     GNUNET_memcpy (&data[data_offset],
700                    orig->data,
701                    orig->data_size);
702     data_offset += orig->data_size;
703   }
704   /* append nick */
705   target[rd2_length] = *nick_rd;
706   target[rd2_length].expiration_time = latest_expiration;
707   target[rd2_length].data = (void *) &data[data_offset];
708   GNUNET_memcpy (&data[data_offset],
709                  nick_rd->data,
710                  nick_rd->data_size);
711   data_offset += nick_rd->data_size;
712   GNUNET_assert (req ==
713                  (sizeof (struct GNUNET_GNSRECORD_Data)) * (*rdc_res) + data_offset);
714 }
715
716
717 /**
718  * Generate a `struct LookupNameResponseMessage` and send it to the
719  * given client using the given notification context.
720  *
721  * @param nc client to unicast to
722  * @param request_id request ID to use
723  * @param zone_key zone key of the zone
724  * @param name name
725  * @param rd_count number of records in @a rd
726  * @param rd array of records
727  */
728 static void
729 send_lookup_response (struct NamestoreClient *nc,
730                       uint32_t request_id,
731                       const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
732                       const char *name,
733                       unsigned int rd_count,
734                       const struct GNUNET_GNSRECORD_Data *rd)
735 {
736   struct GNUNET_MQ_Envelope *env;
737   struct RecordResultMessage *zir_msg;
738   struct GNUNET_GNSRECORD_Data *nick;
739   struct GNUNET_GNSRECORD_Data *res;
740   unsigned int res_count;
741   size_t name_len;
742   ssize_t rd_ser_len;
743   char *name_tmp;
744   char *rd_ser;
745
746   nick = get_nick_record (zone_key);
747   GNUNET_assert (-1 !=
748                  GNUNET_GNSRECORD_records_get_size (rd_count,
749                                                     rd));
750
751   if ( (NULL != nick) &&
752        (0 != strcmp (name,
753                      GNUNET_GNS_EMPTY_LABEL_AT)))
754   {
755     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
756     merge_with_nick_records (nick,
757                              rd_count,
758                              rd,
759                              &res_count,
760                              &res);
761     GNUNET_free (nick);
762   }
763   else
764   {
765     res_count = rd_count;
766     res = (struct GNUNET_GNSRECORD_Data *) rd;
767   }
768
769   GNUNET_assert (-1 !=
770                  GNUNET_GNSRECORD_records_get_size (res_count,
771                                                     res));
772
773
774   name_len = strlen (name) + 1;
775   rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count,
776                                                   res);
777   if (rd_ser_len < 0)
778   {
779     GNUNET_break (0);
780     GNUNET_SERVICE_client_drop (nc->client);
781     return;
782   }
783   if (((size_t) rd_ser_len) >= UINT16_MAX - name_len - sizeof (*zir_msg))
784   {
785     GNUNET_break (0);
786     GNUNET_SERVICE_client_drop (nc->client);
787     return;
788   }
789   env = GNUNET_MQ_msg_extra (zir_msg,
790                              name_len + rd_ser_len,
791                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
792   zir_msg->gns_header.r_id = htonl (request_id);
793   zir_msg->name_len = htons (name_len);
794   zir_msg->rd_count = htons (res_count);
795   zir_msg->rd_len = htons ((uint16_t) rd_ser_len);
796   zir_msg->private_key = *zone_key;
797   name_tmp = (char *) &zir_msg[1];
798   GNUNET_memcpy (name_tmp,
799                  name,
800                  name_len);
801   rd_ser = &name_tmp[name_len];
802   GNUNET_assert (rd_ser_len ==
803                  GNUNET_GNSRECORD_records_serialize (res_count,
804                                                      res,
805                                                      rd_ser_len,
806                                                      rd_ser));
807   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808               "Sending RECORD_RESULT message with %u records\n",
809               res_count);
810   GNUNET_STATISTICS_update (statistics,
811                             "Record sets sent to clients",
812                             1,
813                             GNUNET_NO);
814   GNUNET_MQ_send (nc->mq,
815                   env);
816   if (rd != res)
817     GNUNET_free (res);
818 }
819
820
821 /**
822  * Send response to the store request to the client.
823  *
824  * @param client client to talk to
825  * @param res status of the operation
826  * @param rid client's request ID
827  */
828 static void
829 send_store_response (struct NamestoreClient *nc,
830                      int res,
831                      uint32_t rid)
832 {
833   struct GNUNET_MQ_Envelope *env;
834   struct RecordStoreResponseMessage *rcr_msg;
835
836   GNUNET_assert (NULL != nc);
837   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838               "Sending RECORD_STORE_RESPONSE message\n");
839   GNUNET_STATISTICS_update (statistics,
840                             "Store requests completed",
841                             1,
842                             GNUNET_NO);
843   env = GNUNET_MQ_msg (rcr_msg,
844                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE_RESPONSE);
845   rcr_msg->gns_header.r_id = htonl (rid);
846   rcr_msg->op_result = htonl (res);
847   GNUNET_MQ_send (nc->mq,
848                   env);
849 }
850
851
852 /**
853  * Function called once we are done with the zone iteration and
854  * allow the zone iteration client to send us more messages.
855  *
856  * @param zi zone iteration we are processing
857  */
858 static void
859 zone_iteration_done_client_continue (struct ZoneIteration *zi)
860 {
861   struct GNUNET_MQ_Envelope *env;
862   struct GNUNET_NAMESTORE_Header *em;
863
864   GNUNET_SERVICE_client_continue (zi->nc->client);
865   if (! zi->send_end)
866     return;
867   /* send empty response to indicate end of list */
868   env = GNUNET_MQ_msg (em,
869                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END);
870   em->r_id = htonl (zi->request_id);
871   GNUNET_MQ_send (zi->nc->mq,
872                   env);
873
874   GNUNET_CONTAINER_DLL_remove (zi->nc->op_head,
875                                zi->nc->op_tail,
876                                zi);
877   GNUNET_free (zi);
878 }
879
880
881 /**
882  * Cache operation complete, clean up.
883  *
884  * @param cls the `struct CacheOperation`
885  * @param success success
886  * @param emsg error messages
887  */
888 static void
889 finish_cache_operation (void *cls,
890                         int32_t success,
891                         const char *emsg)
892 {
893   struct CacheOperation *cop = cls;
894   struct ZoneIteration *zi;
895
896   if (NULL != emsg)
897     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
898                 _("Failed to replicate block in namecache: %s\n"),
899                 emsg);
900   else
901     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
902                 "CACHE operation completed\n");
903   GNUNET_CONTAINER_DLL_remove (cop_head,
904                                cop_tail,
905                                cop);
906   if (NULL != cop->nc)
907     send_store_response (cop->nc,
908                          success,
909                          cop->rid);
910   if (NULL != (zi = cop->zi))
911     {
912       zi->cache_ops--;
913       if (0 == zi->cache_ops)
914       {
915         /* unchoke zone iteration, cache has caught up */
916         zone_iteration_done_client_continue (zi);
917       }
918     }
919   GNUNET_free (cop);
920 }
921
922
923 /**
924  * We just touched the plaintext information about a name in our zone;
925  * refresh the corresponding (encrypted) block in the namecache.
926  *
927  * @param nc client responsible for the request, can be NULL
928  * @param zi zone iteration response for the request, can be NULL
929  * @param rid request ID of the client
930  * @param zone_key private key of the zone
931  * @param name label for the records
932  * @param rd_count number of records
933  * @param rd records stored under the given @a name
934  */
935 static void
936 refresh_block (struct NamestoreClient *nc,
937                struct ZoneIteration *zi,
938                uint32_t rid,
939                const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
940                const char *name,
941                unsigned int rd_count,
942                const struct GNUNET_GNSRECORD_Data *rd)
943 {
944   struct GNUNET_GNSRECORD_Block *block;
945   struct CacheOperation *cop;
946   struct GNUNET_CRYPTO_EcdsaPublicKey pkey;
947   struct GNUNET_GNSRECORD_Data *nick;
948   struct GNUNET_GNSRECORD_Data *res;
949   unsigned int res_count;
950   struct GNUNET_TIME_Absolute exp_time;
951
952   nick = get_nick_record (zone_key);
953   res_count = rd_count;
954   res = (struct GNUNET_GNSRECORD_Data *) rd; /* fixme: a bit unclean... */
955   if (NULL != nick)
956   {
957     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
958     merge_with_nick_records (nick,
959                              rd_count,rd,
960                              &res_count,
961                              &res);
962     GNUNET_free (nick);
963   }
964   if (0 == res_count)
965   {
966     if (NULL != nc)
967       send_store_response (nc,
968                            GNUNET_OK,
969                            rid);
970     return; /* no data, no need to update cache */
971   }
972   if (GNUNET_YES == disable_namecache)
973   {
974     GNUNET_STATISTICS_update (statistics,
975                               "Namecache updates skipped (NC disabled)",
976                               1,
977                               GNUNET_NO);
978     if (NULL != nc)
979       send_store_response (nc,
980                            GNUNET_OK,
981                            rid);
982     return;
983   }
984   exp_time = GNUNET_GNSRECORD_record_get_expiration_time (res_count,
985                                                           res);
986   if (cache_keys)
987     block = GNUNET_GNSRECORD_block_create2 (zone_key,
988                                             exp_time,
989                                             name,
990                                             res,
991                                             res_count);
992   else
993     block = GNUNET_GNSRECORD_block_create (zone_key,
994                                            exp_time,
995                                            name,
996                                            res,
997                                            res_count);
998   GNUNET_assert (NULL != block);
999   GNUNET_CRYPTO_ecdsa_key_get_public (zone_key,
1000                                       &pkey);
1001   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002               "Caching block for label `%s' with %u records and expiration %s in zone `%s' in namecache\n",
1003               name,
1004               res_count,
1005               GNUNET_STRINGS_absolute_time_to_string (exp_time),
1006               GNUNET_GNSRECORD_z2s (&pkey));
1007   GNUNET_STATISTICS_update (statistics,
1008                             "Namecache updates pushed",
1009                             1,
1010                             GNUNET_NO);
1011   cop = GNUNET_new (struct CacheOperation);
1012   cop->nc = nc;
1013   cop->zi = zi;
1014   if (NULL != zi)
1015     zi->cache_ops++;
1016   cop->rid = rid;
1017   GNUNET_CONTAINER_DLL_insert (cop_head,
1018                                cop_tail,
1019                                cop);
1020   cop->qe = GNUNET_NAMECACHE_block_cache (namecache,
1021                                           block,
1022                                           &finish_cache_operation,
1023                                           cop);
1024   GNUNET_free (block);
1025 }
1026
1027
1028 /**
1029  * Print a warning that one of our monitors is no longer reacting.
1030  *
1031  * @param cls a `struct ZoneMonitor` to warn about
1032  */
1033 static void
1034 warn_monitor_slow (void *cls)
1035 {
1036   struct ZoneMonitor *zm = cls;
1037
1038   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1039               "No response from monitor since %s\n",
1040               GNUNET_STRINGS_absolute_time_to_string (zm->sa_waiting_start));
1041   zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1042                                                       &warn_monitor_slow,
1043                                                       zm);
1044 }
1045
1046
1047 /**
1048  * Continue processing the @a sa.
1049  *
1050  * @param sa store activity to process
1051  */
1052 static void
1053 continue_store_activity (struct StoreActivity *sa)
1054 {
1055   const struct RecordStoreMessage *rp_msg = sa->rsm;
1056   unsigned int rd_count;
1057   size_t name_len;
1058   size_t rd_ser_len;
1059   uint32_t rid;
1060   const char *name_tmp;
1061   const char *rd_ser;
1062
1063   rid = ntohl (rp_msg->gns_header.r_id);
1064   name_len = ntohs (rp_msg->name_len);
1065   rd_count = ntohs (rp_msg->rd_count);
1066   rd_ser_len = ntohs (rp_msg->rd_len);
1067   name_tmp = (const char *) &rp_msg[1];
1068   rd_ser = &name_tmp[name_len];
1069   {
1070     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1071
1072     /* We did this before, must succeed again */
1073     GNUNET_assert (GNUNET_OK ==
1074                    GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1075                                                          rd_ser,
1076                                                          rd_count,
1077                                                          rd));
1078
1079     for (struct ZoneMonitor *zm = sa->zm_pos;
1080          NULL != zm;
1081          zm = sa->zm_pos)
1082     {
1083       if ( (0 != GNUNET_memcmp (&rp_msg->private_key,
1084                          &zm->zone)) &&
1085            (0 != GNUNET_memcmp (&zm->zone,
1086                          &zero)) )
1087         {
1088           sa->zm_pos = zm->next; /* not interesting to this monitor */
1089           continue;
1090         }
1091       if (zm->limit == zm->iteration_cnt)
1092       {
1093         zm->sa_waiting = GNUNET_YES;
1094         zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
1095         if (NULL != zm->sa_wait_warning)
1096           GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1097         zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1098                                                             &warn_monitor_slow,
1099                                                             zm);
1100         return; /* blocked on zone monitor */
1101       }
1102       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103                   "Notifying monitor about changes under label `%s'\n",
1104                   sa->conv_name);
1105       zm->limit--;
1106       send_lookup_response (zm->nc,
1107                             0,
1108                             &rp_msg->private_key,
1109                             sa->conv_name,
1110                             rd_count,
1111                             rd);
1112       sa->zm_pos = zm->next;
1113     }
1114     /* great, done with the monitors, unpack (again) for refresh_block operation */
1115     refresh_block (sa->nc,
1116                    NULL,
1117                    rid,
1118                    &rp_msg->private_key,
1119                    sa->conv_name,
1120                    rd_count,
1121                    rd);
1122   }
1123   GNUNET_SERVICE_client_continue (sa->nc->client);
1124   free_store_activity (sa);
1125 }
1126
1127
1128 /**
1129  * Called whenever a client is disconnected.
1130  * Frees our resources associated with that client.
1131  *
1132  * @param cls closure
1133  * @param client identification of the client
1134  * @param app_ctx the `struct NamestoreClient` of @a client
1135  */
1136 static void
1137 client_disconnect_cb (void *cls,
1138                       struct GNUNET_SERVICE_Client *client,
1139                       void *app_ctx)
1140 {
1141   struct NamestoreClient *nc = app_ctx;
1142   struct ZoneIteration *no;
1143   struct CacheOperation *cop;
1144
1145   (void) cls;
1146   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147               "Client %p disconnected\n",
1148               client);
1149   for (struct ZoneMonitor *zm = monitor_head; NULL != zm; zm = zm->next)
1150   {
1151     struct StoreActivity *san;
1152
1153     if (nc != zm->nc)
1154       continue;
1155     GNUNET_CONTAINER_DLL_remove (monitor_head,
1156                                  monitor_tail,
1157                                  zm);
1158     if (NULL != zm->task)
1159     {
1160       GNUNET_SCHEDULER_cancel (zm->task);
1161       zm->task = NULL;
1162     }
1163     if (NULL != zm->sa_wait_warning)
1164     {
1165       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1166       zm->sa_wait_warning = NULL;
1167     }
1168     for (struct StoreActivity *sa = sa_head; NULL != sa; sa = san)
1169     {
1170       san = sa->next;
1171       if (zm == sa->zm_pos)
1172       {
1173         sa->zm_pos = zm->next;
1174         /* this may free sa */
1175         continue_store_activity (sa);
1176       }
1177     }
1178     GNUNET_free (zm);
1179     break;
1180   }
1181   for (struct StoreActivity *sa = sa_head; NULL != sa; sa = sa->next)
1182   {
1183     if (sa->nc == nc)
1184     {
1185       /* this may free sa */
1186       free_store_activity (sa);
1187       break; /* there can only be one per nc */
1188     }
1189   }
1190   while (NULL != (no = nc->op_head))
1191   {
1192     GNUNET_CONTAINER_DLL_remove (nc->op_head,
1193                                  nc->op_tail,
1194                                  no);
1195     GNUNET_free (no);
1196   }
1197   for (cop = cop_head; NULL != cop; cop = cop->next)
1198     if (nc == cop->nc)
1199       cop->nc = NULL;
1200   GNUNET_free (nc);
1201 }
1202
1203
1204 /**
1205  * Add a client to our list of active clients.
1206  *
1207  * @param cls NULL
1208  * @param client client to add
1209  * @param mq message queue for @a client
1210  * @return internal namestore client structure for this client
1211  */
1212 static void *
1213 client_connect_cb (void *cls,
1214                    struct GNUNET_SERVICE_Client *client,
1215                    struct GNUNET_MQ_Handle *mq)
1216 {
1217   struct NamestoreClient *nc;
1218
1219   (void) cls;
1220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221               "Client %p connected\n",
1222               client);
1223   nc = GNUNET_new (struct NamestoreClient);
1224   nc->client = client;
1225   nc->mq = mq;
1226   return nc;
1227 }
1228
1229
1230 /**
1231  * Closure for #lookup_it().
1232  */
1233 struct RecordLookupContext
1234 {
1235
1236   /**
1237    * FIXME.
1238    */
1239   const char *label;
1240
1241   /**
1242    * FIXME.
1243    */
1244   char *res_rd;
1245
1246   /**
1247    * FIXME.
1248    */
1249   struct GNUNET_GNSRECORD_Data *nick;
1250
1251   /**
1252    * FIXME.
1253    */
1254   int found;
1255
1256   /**
1257    * FIXME.
1258    */
1259   unsigned int res_rd_count;
1260
1261   /**
1262    * FIXME.
1263    */
1264   ssize_t rd_ser_len;
1265 };
1266
1267
1268 /**
1269  * Function called by the namestore plugin when we are trying to lookup
1270  * a record as part of #handle_record_lookup().  Merges all results into
1271  * the context.
1272  *
1273  * @param cls closure with a `struct RecordLookupContext`
1274  * @param seq unique serial number of the record, MUST NOT BE ZERO
1275  * @param zone_key private key of the zone
1276  * @param label name that is being mapped (at most 255 characters long)
1277  * @param rd_count number of entries in @a rd array
1278  * @param rd array of records with data to store
1279  */
1280 static void
1281 lookup_it (void *cls,
1282            uint64_t seq,
1283            const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
1284            const char *label,
1285            unsigned int rd_count,
1286            const struct GNUNET_GNSRECORD_Data *rd)
1287 {
1288   struct RecordLookupContext *rlc = cls;
1289
1290   (void) private_key;
1291   GNUNET_assert (0 != seq);
1292   if (0 != strcmp (label,
1293                    rlc->label))
1294     return;
1295   rlc->found = GNUNET_YES;
1296   if (0 == rd_count)
1297   {
1298     rlc->rd_ser_len = 0;
1299     rlc->res_rd_count = 0;
1300     rlc->res_rd = NULL;
1301     return;
1302   }
1303   if ( (NULL != rlc->nick) &&
1304        (0 != strcmp (label,
1305                      GNUNET_GNS_EMPTY_LABEL_AT)) )
1306   {
1307     /* Merge */
1308     struct GNUNET_GNSRECORD_Data *rd_res;
1309     unsigned int rdc_res;
1310
1311     rd_res = NULL;
1312     rdc_res = 0;
1313     rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
1314     merge_with_nick_records (rlc->nick,
1315                              rd_count,
1316                              rd,
1317                              &rdc_res,
1318                              &rd_res);
1319     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res,
1320                                                          rd_res);
1321     if (rlc->rd_ser_len < 0)
1322     {
1323       GNUNET_break (0);
1324       GNUNET_free  (rd_res);
1325       rlc->found = GNUNET_NO;
1326       rlc->rd_ser_len = 0;
1327       return;
1328     }
1329     rlc->res_rd_count = rdc_res;
1330     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1331     if (rlc->rd_ser_len !=
1332         GNUNET_GNSRECORD_records_serialize (rdc_res,
1333                                             rd_res,
1334                                             rlc->rd_ser_len,
1335                                             rlc->res_rd))
1336     {
1337       GNUNET_break (0);
1338       GNUNET_free  (rlc->res_rd);
1339       rlc->res_rd = NULL;
1340       rlc->res_rd_count = 0;
1341       rlc->rd_ser_len = 0;
1342       GNUNET_free  (rd_res);
1343       rlc->found = GNUNET_NO;
1344       return;
1345     }
1346     GNUNET_free (rd_res);
1347     GNUNET_free (rlc->nick);
1348     rlc->nick = NULL;
1349   }
1350   else
1351   {
1352     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1353                                                          rd);
1354     if (rlc->rd_ser_len < 0)
1355     {
1356       GNUNET_break (0);
1357       rlc->found = GNUNET_NO;
1358       rlc->rd_ser_len = 0;
1359       return;
1360     }
1361     rlc->res_rd_count = rd_count;
1362     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1363     if (rlc->rd_ser_len !=
1364         GNUNET_GNSRECORD_records_serialize (rd_count,
1365                                             rd,
1366                                             rlc->rd_ser_len,
1367                                             rlc->res_rd))
1368     {
1369       GNUNET_break (0);
1370       GNUNET_free  (rlc->res_rd);
1371       rlc->res_rd = NULL;
1372       rlc->res_rd_count = 0;
1373       rlc->rd_ser_len = 0;
1374       rlc->found = GNUNET_NO;
1375       return;
1376     }
1377   }
1378 }
1379
1380
1381 /**
1382  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1383  *
1384  * @param cls client sending the message
1385  * @param ll_msg message of type `struct LabelLookupMessage`
1386  * @return #GNUNET_OK if @a ll_msg is well-formed
1387  */
1388 static int
1389 check_record_lookup (void *cls,
1390                      const struct LabelLookupMessage *ll_msg)
1391 {
1392   uint32_t name_len;
1393   size_t src_size;
1394
1395   (void) cls;
1396   name_len = ntohl (ll_msg->label_len);
1397   src_size = ntohs (ll_msg->gns_header.header.size);
1398   if (name_len != src_size - sizeof (struct LabelLookupMessage))
1399   {
1400     GNUNET_break (0);
1401     return GNUNET_SYSERR;
1402   }
1403   GNUNET_MQ_check_zero_termination (ll_msg);
1404   return GNUNET_OK;
1405 }
1406
1407
1408 /**
1409  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1410  *
1411  * @param cls client sending the message
1412  * @param ll_msg message of type `struct LabelLookupMessage`
1413  */
1414 static void
1415 handle_record_lookup (void *cls,
1416                       const struct LabelLookupMessage *ll_msg)
1417 {
1418   struct NamestoreClient *nc = cls;
1419   struct GNUNET_MQ_Envelope *env;
1420   struct LabelLookupResponseMessage *llr_msg;
1421   struct RecordLookupContext rlc;
1422   const char *name_tmp;
1423   char *res_name;
1424   char *conv_name;
1425   uint32_t name_len;
1426   int res;
1427
1428   name_len = ntohl (ll_msg->label_len);
1429   name_tmp = (const char *) &ll_msg[1];
1430   GNUNET_SERVICE_client_continue (nc->client);
1431   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1432               "Received NAMESTORE_RECORD_LOOKUP message for name `%s'\n",
1433               name_tmp);
1434
1435   conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1436   if (NULL == conv_name)
1437   {
1438     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1439                 "Error converting name `%s'\n",
1440                 name_tmp);
1441     GNUNET_SERVICE_client_drop (nc->client);
1442     return;
1443   }
1444   rlc.label = conv_name;
1445   rlc.found = GNUNET_NO;
1446   rlc.res_rd_count = 0;
1447   rlc.res_rd = NULL;
1448   rlc.rd_ser_len = 0;
1449   rlc.nick = get_nick_record (&ll_msg->zone);
1450   res = GSN_database->lookup_records (GSN_database->cls,
1451                                       &ll_msg->zone,
1452                                       conv_name,
1453                                       &lookup_it,
1454                                       &rlc);
1455   GNUNET_free (conv_name);
1456   env = GNUNET_MQ_msg_extra (llr_msg,
1457                              name_len + rlc.rd_ser_len,
1458                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP_RESPONSE);
1459   llr_msg->gns_header.r_id = ll_msg->gns_header.r_id;
1460   llr_msg->private_key = ll_msg->zone;
1461   llr_msg->name_len = htons (name_len);
1462   llr_msg->rd_count = htons (rlc.res_rd_count);
1463   llr_msg->rd_len = htons (rlc.rd_ser_len);
1464   res_name = (char *) &llr_msg[1];
1465   if  ((GNUNET_YES == rlc.found) && (GNUNET_OK == res))
1466     llr_msg->found = ntohs (GNUNET_YES);
1467   else
1468     llr_msg->found = ntohs (GNUNET_NO);
1469   GNUNET_memcpy (&llr_msg[1],
1470                  name_tmp,
1471                  name_len);
1472   GNUNET_memcpy (&res_name[name_len],
1473                  rlc.res_rd,
1474                  rlc.rd_ser_len);
1475   GNUNET_MQ_send (nc->mq,
1476                   env);
1477   GNUNET_free_non_null (rlc.res_rd);
1478 }
1479
1480
1481 /**
1482  * Checks a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1483  *
1484  * @param cls client sending the message
1485  * @param rp_msg message of type `struct RecordStoreMessage`
1486  * @return #GNUNET_OK if @a rp_msg is well-formed
1487  */
1488 static int
1489 check_record_store (void *cls,
1490                     const struct RecordStoreMessage *rp_msg)
1491 {
1492   size_t name_len;
1493   size_t msg_size;
1494   size_t msg_size_exp;
1495   size_t rd_ser_len;
1496   const char *name_tmp;
1497
1498   (void) cls;
1499   name_len = ntohs (rp_msg->name_len);
1500   msg_size = ntohs (rp_msg->gns_header.header.size);
1501   rd_ser_len = ntohs (rp_msg->rd_len);
1502   msg_size_exp = sizeof (struct RecordStoreMessage) + name_len + rd_ser_len;
1503   if (msg_size != msg_size_exp)
1504   {
1505     GNUNET_break (0);
1506     return GNUNET_SYSERR;
1507   }
1508   if ( (0 == name_len) ||
1509        (name_len > MAX_NAME_LEN) )
1510   {
1511     GNUNET_break (0);
1512     return GNUNET_SYSERR;
1513   }
1514   name_tmp = (const char *) &rp_msg[1];
1515   if ('\0' != name_tmp[name_len -1])
1516   {
1517     GNUNET_break (0);
1518     return GNUNET_SYSERR;
1519   }
1520   return GNUNET_OK;
1521 }
1522
1523
1524 /**
1525  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1526  *
1527  * @param cls client sending the message
1528  * @param rp_msg message of type `struct RecordStoreMessage`
1529  */
1530 static void
1531 handle_record_store (void *cls,
1532                      const struct RecordStoreMessage *rp_msg)
1533 {
1534   struct NamestoreClient *nc = cls;
1535   size_t name_len;
1536   size_t rd_ser_len;
1537   uint32_t rid;
1538   const char *name_tmp;
1539   char *conv_name;
1540   const char *rd_ser;
1541   unsigned int rd_count;
1542   int res;
1543   struct StoreActivity *sa;
1544
1545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1546               "Received NAMESTORE_RECORD_STORE message\n");
1547   rid = ntohl (rp_msg->gns_header.r_id);
1548   name_len = ntohs (rp_msg->name_len);
1549   rd_count = ntohs (rp_msg->rd_count);
1550   rd_ser_len = ntohs (rp_msg->rd_len);
1551   GNUNET_break (0 == ntohs (rp_msg->reserved));
1552   name_tmp = (const char *) &rp_msg[1];
1553   rd_ser = &name_tmp[name_len];
1554   {
1555     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1556
1557     if (GNUNET_OK !=
1558         GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1559                                               rd_ser,
1560                                               rd_count,
1561                                               rd))
1562     {
1563       GNUNET_break (0);
1564       GNUNET_SERVICE_client_drop (nc->client);
1565       return;
1566     }
1567
1568     /* Extracting and converting private key */
1569     conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1570     if (NULL == conv_name)
1571     {
1572       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1573                   "Error converting name `%s'\n",
1574                   name_tmp);
1575       GNUNET_SERVICE_client_drop (nc->client);
1576       return;
1577     }
1578     GNUNET_STATISTICS_update (statistics,
1579                               "Well-formed store requests received",
1580                               1,
1581                               GNUNET_NO);
1582     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1583                 "Creating %u records for name `%s'\n",
1584                 (unsigned int) rd_count,
1585                 conv_name);
1586     if ( (0 == rd_count) &&
1587          (GNUNET_NO ==
1588           GSN_database->lookup_records (GSN_database->cls,
1589                                         &rp_msg->private_key,
1590                                         conv_name,
1591                                         NULL,
1592                                         0)) )
1593     {
1594       /* This name does not exist, so cannot be removed */
1595       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1596                   "Name `%s' does not exist, no deletion required\n",
1597                   conv_name);
1598       res = GNUNET_NO;
1599     }
1600     else
1601     {
1602       /* remove "NICK" records, unless this is for the
1603          #GNUNET_GNS_EMPTY_LABEL_AT label */
1604       struct GNUNET_GNSRECORD_Data rd_clean[GNUNET_NZL(rd_count)];
1605       unsigned int rd_clean_off;
1606
1607       rd_clean_off = 0;
1608       for (unsigned int i=0;i<rd_count;i++)
1609       {
1610         rd_clean[rd_clean_off] = rd[i];
1611         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1612                            conv_name)) ||
1613              (GNUNET_GNSRECORD_TYPE_NICK != rd[i].record_type) )
1614           rd_clean_off++;
1615
1616         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1617                            conv_name)) &&
1618              (GNUNET_GNSRECORD_TYPE_NICK == rd[i].record_type) )
1619           cache_nick (&rp_msg->private_key,
1620                       &rd[i]);
1621       }
1622       res = GSN_database->store_records (GSN_database->cls,
1623                                          &rp_msg->private_key,
1624                                          conv_name,
1625                                          rd_clean_off,
1626                                          rd_clean);
1627     }
1628
1629     if (GNUNET_OK != res)
1630     {
1631       /* store not successful, not need to tell monitors */
1632       send_store_response (nc,
1633                            res,
1634                            rid);
1635       GNUNET_SERVICE_client_continue (nc->client);
1636       GNUNET_free (conv_name);
1637       return;
1638     }
1639
1640     sa = GNUNET_malloc (sizeof (struct StoreActivity) +
1641                         ntohs (rp_msg->gns_header.header.size));
1642     GNUNET_CONTAINER_DLL_insert (sa_head,
1643                                  sa_tail,
1644                                  sa);
1645     sa->nc = nc;
1646     sa->rsm = (const struct RecordStoreMessage *) &sa[1];
1647     GNUNET_memcpy (&sa[1],
1648                    rp_msg,
1649                    ntohs (rp_msg->gns_header.header.size));
1650     sa->zm_pos = monitor_head;
1651     sa->conv_name = conv_name;
1652     continue_store_activity (sa);
1653   }
1654 }
1655
1656
1657 /**
1658  * Context for record remove operations passed from #handle_zone_to_name to
1659  * #handle_zone_to_name_it as closure
1660  */
1661 struct ZoneToNameCtx
1662 {
1663   /**
1664    * Namestore client
1665    */
1666   struct NamestoreClient *nc;
1667
1668   /**
1669    * Request id (to be used in the response to the client).
1670    */
1671   uint32_t rid;
1672
1673   /**
1674    * Set to #GNUNET_OK on success, #GNUNET_SYSERR on error.  Note that
1675    * not finding a name for the zone still counts as a 'success' here,
1676    * as this field is about the success of executing the IPC protocol.
1677    */
1678   int success;
1679 };
1680
1681
1682 /**
1683  * Zone to name iterator
1684  *
1685  * @param cls struct ZoneToNameCtx *
1686  * @param seq sequence number of the record, MUST NOT BE ZERO
1687  * @param zone_key the zone key
1688  * @param name name
1689  * @param rd_count number of records in @a rd
1690  * @param rd record data
1691  */
1692 static void
1693 handle_zone_to_name_it (void *cls,
1694                         uint64_t seq,
1695                         const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1696                         const char *name,
1697                         unsigned int rd_count,
1698                         const struct GNUNET_GNSRECORD_Data *rd)
1699 {
1700   struct ZoneToNameCtx *ztn_ctx = cls;
1701   struct GNUNET_MQ_Envelope *env;
1702   struct ZoneToNameResponseMessage *ztnr_msg;
1703   int16_t res;
1704   size_t name_len;
1705   ssize_t rd_ser_len;
1706   size_t msg_size;
1707   char *name_tmp;
1708   char *rd_tmp;
1709
1710   GNUNET_assert (0 != seq);
1711   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1712               "Found result for zone-to-name lookup: `%s'\n",
1713               name);
1714   res = GNUNET_YES;
1715   name_len = (NULL == name) ? 0 : strlen (name) + 1;
1716   rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1717                                                   rd);
1718   if (rd_ser_len < 0)
1719   {
1720     GNUNET_break (0);
1721     ztn_ctx->success = GNUNET_SYSERR;
1722     return;
1723   }
1724   msg_size = sizeof (struct ZoneToNameResponseMessage) + name_len + rd_ser_len;
1725   if (msg_size >= GNUNET_MAX_MESSAGE_SIZE)
1726   {
1727     GNUNET_break (0);
1728     ztn_ctx->success = GNUNET_SYSERR;
1729     return;
1730   }
1731   env = GNUNET_MQ_msg_extra (ztnr_msg,
1732                              name_len + rd_ser_len,
1733                              GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1734   ztnr_msg->gns_header.header.size = htons (msg_size);
1735   ztnr_msg->gns_header.r_id = htonl (ztn_ctx->rid);
1736   ztnr_msg->res = htons (res);
1737   ztnr_msg->rd_len = htons (rd_ser_len);
1738   ztnr_msg->rd_count = htons (rd_count);
1739   ztnr_msg->name_len = htons (name_len);
1740   ztnr_msg->zone = *zone_key;
1741   name_tmp = (char *) &ztnr_msg[1];
1742   GNUNET_memcpy (name_tmp,
1743                  name,
1744                  name_len);
1745   rd_tmp = &name_tmp[name_len];
1746   GNUNET_assert (rd_ser_len ==
1747                  GNUNET_GNSRECORD_records_serialize (rd_count,
1748                                                      rd,
1749                                                      rd_ser_len,
1750                                                      rd_tmp));
1751   ztn_ctx->success = GNUNET_OK;
1752   GNUNET_MQ_send (ztn_ctx->nc->mq,
1753                   env);
1754 }
1755
1756
1757 /**
1758  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME message
1759  *
1760  * @param cls client client sending the message
1761  * @param ztn_msg message of type 'struct ZoneToNameMessage'
1762  */
1763 static void
1764 handle_zone_to_name (void *cls,
1765                      const struct ZoneToNameMessage *ztn_msg)
1766 {
1767   struct NamestoreClient *nc = cls;
1768   struct ZoneToNameCtx ztn_ctx;
1769   struct GNUNET_MQ_Envelope *env;
1770   struct ZoneToNameResponseMessage *ztnr_msg;
1771
1772   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1773               "Received ZONE_TO_NAME message\n");
1774   ztn_ctx.rid = ntohl (ztn_msg->gns_header.r_id);
1775   ztn_ctx.nc = nc;
1776   ztn_ctx.success = GNUNET_NO;
1777   if (GNUNET_SYSERR ==
1778       GSN_database->zone_to_name (GSN_database->cls,
1779                                   &ztn_msg->zone,
1780                                   &ztn_msg->value_zone,
1781                                   &handle_zone_to_name_it, &ztn_ctx))
1782   {
1783     /* internal error, hang up instead of signalling something
1784        that might be wrong */
1785     GNUNET_break (0);
1786     GNUNET_SERVICE_client_drop (nc->client);
1787     return;
1788   }
1789   if (GNUNET_NO == ztn_ctx.success)
1790   {
1791     /* no result found, send empty response */
1792     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1793                 "Found no result for zone-to-name lookup.\n");
1794     env = GNUNET_MQ_msg (ztnr_msg,
1795                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1796     ztnr_msg->gns_header.r_id = ztn_msg->gns_header.r_id;
1797     ztnr_msg->res = htons (GNUNET_NO);
1798     GNUNET_MQ_send (nc->mq,
1799                     env);
1800   }
1801   GNUNET_SERVICE_client_continue (nc->client);
1802 }
1803
1804
1805 /**
1806  * Context for record remove operations passed from
1807  * #run_zone_iteration_round to #zone_iterate_proc as closure
1808  */
1809 struct ZoneIterationProcResult
1810 {
1811   /**
1812    * The zone iteration handle
1813    */
1814   struct ZoneIteration *zi;
1815
1816   /**
1817    * Number of results left to be returned in this iteration.
1818    */
1819   uint64_t limit;
1820
1821 };
1822
1823
1824 /**
1825  * Process results for zone iteration from database
1826  *
1827  * @param cls struct ZoneIterationProcResult
1828  * @param seq sequence number of the record, MUST NOT BE ZERO
1829  * @param zone_key the zone key
1830  * @param name name
1831  * @param rd_count number of records for this name
1832  * @param rd record data
1833  */
1834 static void
1835 zone_iterate_proc (void *cls,
1836                    uint64_t seq,
1837                    const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1838                    const char *name,
1839                    unsigned int rd_count,
1840                    const struct GNUNET_GNSRECORD_Data *rd)
1841 {
1842   struct ZoneIterationProcResult *proc = cls;
1843   int do_refresh_block;
1844
1845   GNUNET_assert (0 != seq);
1846   if ( (NULL == zone_key) &&
1847        (NULL == name) )
1848   {
1849     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1850                 "Iteration done\n");
1851     return;
1852   }
1853   if ( (NULL == zone_key) ||
1854        (NULL == name) )
1855   {
1856     /* what is this!? should never happen */
1857     GNUNET_break (0);
1858     return;
1859   }
1860   if (0 == proc->limit)
1861   {
1862     /* what is this!? should never happen */
1863     GNUNET_break (0);
1864     return;
1865   }
1866   proc->limit--;
1867   proc->zi->seq = seq;
1868   send_lookup_response (proc->zi->nc,
1869                         proc->zi->request_id,
1870                         zone_key,
1871                         name,
1872                         rd_count,
1873                         rd);
1874
1875
1876   do_refresh_block = GNUNET_NO;
1877   for (unsigned int i=0;i<rd_count;i++)
1878     if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
1879     {
1880       do_refresh_block = GNUNET_YES;
1881       break;
1882     }
1883   if (GNUNET_YES == do_refresh_block)
1884     refresh_block (NULL,
1885                    proc->zi,
1886                    0,
1887                    zone_key,
1888                    name,
1889                    rd_count,
1890                    rd);
1891 }
1892
1893
1894 /**
1895  * Perform the next round of the zone iteration.
1896  *
1897  * @param zi zone iterator to process
1898  * @param limit number of results to return in one pass
1899  */
1900 static void
1901 run_zone_iteration_round (struct ZoneIteration *zi,
1902                           uint64_t limit)
1903 {
1904   struct ZoneIterationProcResult proc;
1905   struct GNUNET_TIME_Absolute start;
1906   struct GNUNET_TIME_Relative duration;
1907
1908   memset (&proc,
1909           0,
1910           sizeof (proc));
1911   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1912               "Asked to return up to %llu records at position %llu\n",
1913               (unsigned long long) limit,
1914               (unsigned long long) zi->seq);
1915   proc.zi = zi;
1916   proc.limit = limit;
1917   start = GNUNET_TIME_absolute_get ();
1918   GNUNET_break (GNUNET_SYSERR !=
1919                 GSN_database->iterate_records (GSN_database->cls,
1920                                                (0 == GNUNET_is_zero (&zi->zone))
1921                                                ? NULL
1922                                                : &zi->zone,
1923                                                zi->seq,
1924                                                limit,
1925                                                &zone_iterate_proc,
1926                                                &proc));
1927   duration = GNUNET_TIME_absolute_get_duration (start);
1928   duration = GNUNET_TIME_relative_divide (duration,
1929                                           limit - proc.limit);
1930   GNUNET_STATISTICS_set (statistics,
1931                          "NAMESTORE iteration delay (μs/record)",
1932                          duration.rel_value_us,
1933                          GNUNET_NO);
1934   if (0 == proc.limit)
1935     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1936                 "Returned %llu results, more results available\n",
1937                 (unsigned long long) limit);
1938   zi->send_end = (0 != proc.limit);
1939   if (0 == zi->cache_ops)
1940     zone_iteration_done_client_continue (zi);
1941 }
1942
1943
1944 /**
1945  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START message
1946  *
1947  * @param cls the client sending the message
1948  * @param zis_msg message from the client
1949  */
1950 static void
1951 handle_iteration_start (void *cls,
1952                         const struct ZoneIterationStartMessage *zis_msg)
1953 {
1954   struct NamestoreClient *nc = cls;
1955   struct ZoneIteration *zi;
1956
1957   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1958               "Received ZONE_ITERATION_START message\n");
1959   zi = GNUNET_new (struct ZoneIteration);
1960   zi->request_id = ntohl (zis_msg->gns_header.r_id);
1961   zi->offset = 0;
1962   zi->nc = nc;
1963   zi->zone = zis_msg->zone;
1964
1965   GNUNET_CONTAINER_DLL_insert (nc->op_head,
1966                                nc->op_tail,
1967                                zi);
1968   run_zone_iteration_round (zi,
1969                             1);
1970 }
1971
1972
1973 /**
1974  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP message
1975  *
1976  * @param cls the client sending the message
1977  * @param zis_msg message from the client
1978  */
1979 static void
1980 handle_iteration_stop (void *cls,
1981                        const struct ZoneIterationStopMessage *zis_msg)
1982 {
1983   struct NamestoreClient *nc = cls;
1984   struct ZoneIteration *zi;
1985   uint32_t rid;
1986
1987   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1988               "Received ZONE_ITERATION_STOP message\n");
1989   rid = ntohl (zis_msg->gns_header.r_id);
1990   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1991     if (zi->request_id == rid)
1992       break;
1993   if (NULL == zi)
1994   {
1995     GNUNET_break (0);
1996     GNUNET_SERVICE_client_drop (nc->client);
1997     return;
1998   }
1999   GNUNET_CONTAINER_DLL_remove (nc->op_head,
2000                                nc->op_tail,
2001                                zi);
2002   GNUNET_free (zi);
2003   GNUNET_SERVICE_client_continue (nc->client);
2004 }
2005
2006
2007 /**
2008  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message
2009  *
2010  * @param cls the client sending the message
2011  * @param message message from the client
2012  */
2013 static void
2014 handle_iteration_next (void *cls,
2015                        const struct ZoneIterationNextMessage *zis_msg)
2016 {
2017   struct NamestoreClient *nc = cls;
2018   struct ZoneIteration *zi;
2019   uint32_t rid;
2020   uint64_t limit;
2021
2022   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2023               "Received ZONE_ITERATION_NEXT message\n");
2024   GNUNET_STATISTICS_update (statistics,
2025                             "Iteration NEXT messages received",
2026                             1,
2027                             GNUNET_NO);
2028   rid = ntohl (zis_msg->gns_header.r_id);
2029   limit = GNUNET_ntohll (zis_msg->limit);
2030   for (zi = nc->op_head; NULL != zi; zi = zi->next)
2031     if (zi->request_id == rid)
2032       break;
2033   if (NULL == zi)
2034   {
2035     GNUNET_break (0);
2036     GNUNET_SERVICE_client_drop (nc->client);
2037     return;
2038   }
2039   run_zone_iteration_round (zi,
2040                             limit);
2041 }
2042
2043
2044 /**
2045  * Function called when the monitor is ready for more data, and we
2046  * should thus unblock PUT operations that were blocked on the
2047  * monitor not being ready.
2048  */
2049 static void
2050 monitor_unblock (struct ZoneMonitor *zm)
2051 {
2052   struct StoreActivity *sa = sa_head;
2053
2054   while ( (NULL != sa) &&
2055           (zm->limit > zm->iteration_cnt) )
2056   {
2057     struct StoreActivity *sn = sa->next;
2058
2059     if (sa->zm_pos == zm)
2060       continue_store_activity (sa);
2061     sa = sn;
2062   }
2063   if (zm->limit > zm->iteration_cnt)
2064   {
2065     zm->sa_waiting = GNUNET_NO;
2066     if (NULL != zm->sa_wait_warning)
2067     {
2068       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2069       zm->sa_wait_warning = NULL;
2070     }
2071   }
2072   else if (GNUNET_YES == zm->sa_waiting)
2073   {
2074     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2075     if (NULL != zm->sa_wait_warning)
2076       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2077     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2078                                                         &warn_monitor_slow,
2079                                                         zm);
2080   }
2081 }
2082
2083
2084 /**
2085  * Send 'sync' message to zone monitor, we're now in sync.
2086  *
2087  * @param zm monitor that is now in sync
2088  */
2089 static void
2090 monitor_sync (struct ZoneMonitor *zm)
2091 {
2092   struct GNUNET_MQ_Envelope *env;
2093   struct GNUNET_MessageHeader *sync;
2094
2095   env = GNUNET_MQ_msg (sync,
2096                        GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC);
2097   GNUNET_MQ_send (zm->nc->mq,
2098                   env);
2099   /* mark iteration done */
2100   zm->in_first_iteration = GNUNET_NO;
2101   zm->iteration_cnt = 0;
2102   if ( (zm->limit > 0) &&
2103        (zm->sa_waiting) )
2104     monitor_unblock (zm);
2105 }
2106
2107
2108 /**
2109  * Obtain the next datum during the zone monitor's zone initial iteration.
2110  *
2111  * @param cls zone monitor that does its initial iteration
2112  */
2113 static void
2114 monitor_iteration_next (void *cls);
2115
2116
2117 /**
2118  * A #GNUNET_NAMESTORE_RecordIterator for monitors.
2119  *
2120  * @param cls a 'struct ZoneMonitor *' with information about the monitor
2121  * @param seq sequence number of the record, MUST NOT BE ZERO
2122  * @param zone_key zone key of the zone
2123  * @param name name
2124  * @param rd_count number of records in @a rd
2125  * @param rd array of records
2126  */
2127 static void
2128 monitor_iterate_cb (void *cls,
2129                     uint64_t seq,
2130                     const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
2131                     const char *name,
2132                     unsigned int rd_count,
2133                     const struct GNUNET_GNSRECORD_Data *rd)
2134 {
2135   struct ZoneMonitor *zm = cls;
2136
2137   GNUNET_assert (0 != seq);
2138   zm->seq = seq;
2139   GNUNET_assert (NULL != name);
2140   GNUNET_STATISTICS_update (statistics,
2141                             "Monitor notifications sent",
2142                             1,
2143                             GNUNET_NO);
2144   zm->limit--;
2145   zm->iteration_cnt--;
2146   send_lookup_response (zm->nc,
2147                         0,
2148                         zone_key,
2149                         name,
2150                         rd_count,
2151                         rd);
2152   if ( (0 == zm->iteration_cnt) &&
2153        (0 != zm->limit) )
2154   {
2155     /* We are done with the current iteration batch, AND the
2156        client would right now accept more, so go again! */
2157     GNUNET_assert (NULL == zm->task);
2158     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2159                                          zm);
2160   }
2161 }
2162
2163
2164 /**
2165  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START message
2166  *
2167  * @param cls the client sending the message
2168  * @param zis_msg message from the client
2169  */
2170 static void
2171 handle_monitor_start (void *cls,
2172                       const struct ZoneMonitorStartMessage *zis_msg)
2173 {
2174   struct NamestoreClient *nc = cls;
2175   struct ZoneMonitor *zm;
2176
2177   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2178               "Received ZONE_MONITOR_START message\n");
2179   zm = GNUNET_new (struct ZoneMonitor);
2180   zm->nc = nc;
2181   zm->zone = zis_msg->zone;
2182   zm->limit = 1;
2183   zm->in_first_iteration = (GNUNET_YES == ntohl (zis_msg->iterate_first));
2184   GNUNET_CONTAINER_DLL_insert (monitor_head,
2185                                monitor_tail,
2186                                zm);
2187   GNUNET_SERVICE_client_mark_monitor (nc->client);
2188   GNUNET_SERVICE_client_continue (nc->client);
2189   GNUNET_notification_context_add (monitor_nc,
2190                                    nc->mq);
2191   if (zm->in_first_iteration)
2192     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2193                                          zm);
2194   else
2195     monitor_sync (zm);
2196 }
2197
2198
2199 /**
2200  * Obtain the next datum during the zone monitor's zone initial iteration.
2201  *
2202  * @param cls zone monitor that does its initial iteration
2203  */
2204 static void
2205 monitor_iteration_next (void *cls)
2206 {
2207   struct ZoneMonitor *zm = cls;
2208   int ret;
2209
2210   zm->task = NULL;
2211   GNUNET_assert (0 == zm->iteration_cnt);
2212   if (zm->limit > 16)
2213     zm->iteration_cnt = zm->limit / 2; /* leave half for monitor events */
2214   else
2215     zm->iteration_cnt = zm->limit; /* use it all */
2216   ret = GSN_database->iterate_records (GSN_database->cls,
2217                                        (0 == GNUNET_is_zero (&zm->zone))
2218                                        ? NULL
2219                                        : &zm->zone,
2220                                        zm->seq,
2221                                        zm->iteration_cnt,
2222                                        &monitor_iterate_cb,
2223                                        zm);
2224   if (GNUNET_SYSERR == ret)
2225   {
2226     GNUNET_SERVICE_client_drop (zm->nc->client);
2227     return;
2228   }
2229   if (GNUNET_NO == ret)
2230   {
2231     /* empty zone */
2232     monitor_sync (zm);
2233     return;
2234   }
2235 }
2236
2237
2238 /**
2239  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT message
2240  *
2241  * @param cls the client sending the message
2242  * @param nm message from the client
2243  */
2244 static void
2245 handle_monitor_next (void *cls,
2246                      const struct ZoneMonitorNextMessage *nm)
2247 {
2248   struct NamestoreClient *nc = cls;
2249   struct ZoneMonitor *zm;
2250   uint64_t inc;
2251
2252   inc = GNUNET_ntohll (nm->limit);
2253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2254               "Received ZONE_MONITOR_NEXT message with limit %llu\n",
2255               (unsigned long long) inc);
2256   for (zm = monitor_head; NULL != zm; zm = zm->next)
2257     if (zm->nc == nc)
2258       break;
2259   if (NULL == zm)
2260   {
2261     GNUNET_break (0);
2262     GNUNET_SERVICE_client_drop (nc->client);
2263     return;
2264   }
2265   GNUNET_SERVICE_client_continue (nc->client);
2266   if (zm->limit + inc < zm->limit)
2267   {
2268     GNUNET_break (0);
2269     GNUNET_SERVICE_client_drop (nc->client);
2270     return;
2271   }
2272   zm->limit += inc;
2273   if ( (zm->in_first_iteration) &&
2274        (zm->limit == inc) )
2275   {
2276     /* We are still iterating, and the previous iteration must
2277        have stopped due to the client's limit, so continue it! */
2278     GNUNET_assert (NULL == zm->task);
2279     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2280                                          zm);
2281   }
2282   GNUNET_assert (zm->iteration_cnt <= zm->limit);
2283   if ( (zm->limit > zm->iteration_cnt) &&
2284        (zm->sa_waiting) )
2285   {
2286     monitor_unblock (zm);
2287   }
2288   else if (GNUNET_YES == zm->sa_waiting)
2289   {
2290     if (NULL != zm->sa_wait_warning)
2291       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2292     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2293     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2294                                                         &warn_monitor_slow,
2295                                                         zm);
2296   }
2297 }
2298
2299
2300 /**
2301  * Process namestore requests.
2302  *
2303  * @param cls closure
2304  * @param cfg configuration to use
2305  * @param service the initialized service
2306  */
2307 static void
2308 run (void *cls,
2309      const struct GNUNET_CONFIGURATION_Handle *cfg,
2310      struct GNUNET_SERVICE_Handle *service)
2311 {
2312   char *database;
2313
2314   (void) cls;
2315   (void) service;
2316   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2317               "Starting namestore service\n");
2318   cache_keys = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2319                                                      "namestore",
2320                                                      "CACHE_KEYS");
2321   disable_namecache = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2322                                                             "namecache",
2323                                                             "DISABLE");
2324   GSN_cfg = cfg;
2325   monitor_nc = GNUNET_notification_context_create (1);
2326   if (GNUNET_YES != disable_namecache)
2327   {
2328     namecache = GNUNET_NAMECACHE_connect (cfg);
2329     GNUNET_assert (NULL != namecache);
2330   }
2331   /* Loading database plugin */
2332   if (GNUNET_OK !=
2333       GNUNET_CONFIGURATION_get_value_string (cfg,
2334                                              "namestore",
2335                                              "database",
2336                                              &database))
2337     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2338                 "No database backend configured\n");
2339
2340   GNUNET_asprintf (&db_lib_name,
2341                    "libgnunet_plugin_namestore_%s",
2342                    database);
2343   GSN_database = GNUNET_PLUGIN_load (db_lib_name,
2344                                      (void *) GSN_cfg);
2345   GNUNET_free (database);
2346   statistics = GNUNET_STATISTICS_create ("namestore",
2347                                          cfg);
2348   GNUNET_SCHEDULER_add_shutdown (&cleanup_task,
2349                                  NULL);
2350   if (NULL == GSN_database)
2351   {
2352     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2353                 "Could not load database backend `%s'\n",
2354                 db_lib_name);
2355     GNUNET_SCHEDULER_shutdown ();
2356     return;
2357   }
2358 }
2359
2360
2361 /**
2362  * Define "main" method using service macro.
2363  */
2364 GNUNET_SERVICE_MAIN
2365 ("namestore",
2366  GNUNET_SERVICE_OPTION_NONE,
2367  &run,
2368  &client_connect_cb,
2369  &client_disconnect_cb,
2370  NULL,
2371  GNUNET_MQ_hd_var_size (record_store,
2372                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE,
2373                         struct RecordStoreMessage,
2374                         NULL),
2375  GNUNET_MQ_hd_var_size (record_lookup,
2376                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP,
2377                         struct LabelLookupMessage,
2378                         NULL),
2379  GNUNET_MQ_hd_fixed_size (zone_to_name,
2380                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME,
2381                           struct ZoneToNameMessage,
2382                           NULL),
2383  GNUNET_MQ_hd_fixed_size (iteration_start,
2384                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START,
2385                           struct ZoneIterationStartMessage,
2386                           NULL),
2387  GNUNET_MQ_hd_fixed_size (iteration_next,
2388                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT,
2389                           struct ZoneIterationNextMessage,
2390                           NULL),
2391  GNUNET_MQ_hd_fixed_size (iteration_stop,
2392                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP,
2393                           struct ZoneIterationStopMessage,
2394                           NULL),
2395  GNUNET_MQ_hd_fixed_size (monitor_start,
2396                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START,
2397                           struct ZoneMonitorStartMessage,
2398                           NULL),
2399  GNUNET_MQ_hd_fixed_size (monitor_next,
2400                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT,
2401                           struct ZoneMonitorNextMessage,
2402                           NULL),
2403  GNUNET_MQ_handler_end ());
2404
2405
2406 /* end of gnunet-service-namestore.c */