error handling
[oweals/gnunet.git] / src / namestore / gnunet-service-namestore.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file namestore/gnunet-service-namestore.c
23  * @brief namestore for the GNUnet naming system
24  * @author Matthias Wachs
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_dnsparser_lib.h"
30 #include "gnunet_gns_service.h"
31 #include "gnunet_namecache_service.h"
32 #include "gnunet_namestore_service.h"
33 #include "gnunet_namestore_plugin.h"
34 #include "gnunet_statistics_service.h"
35 #include "gnunet_signatures.h"
36 #include "namestore.h"
37
38 #define LOG_STRERROR_FILE(kind, syscall, filename) \
39   GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
40
41 /**
42  * If a monitor takes more than 1 minute to process an event, print a warning.
43  */
44 #define MONITOR_STALL_WARN_DELAY GNUNET_TIME_UNIT_MINUTES
45
46 /**
47  * Size of the cache used by #get_nick_record()
48  */
49 #define NC_SIZE 16
50
51 /**
52  * A namestore client
53  */
54 struct NamestoreClient;
55
56
57 /**
58  * A namestore iteration operation.
59  */
60 struct ZoneIteration
61 {
62   /**
63    * Next element in the DLL
64    */
65   struct ZoneIteration *next;
66
67   /**
68    * Previous element in the DLL
69    */
70   struct ZoneIteration *prev;
71
72   /**
73    * Namestore client which intiated this zone iteration
74    */
75   struct NamestoreClient *nc;
76
77   /**
78    * The nick to add to the records
79    */
80   struct GNUNET_GNSRECORD_Data *nick;
81
82   /**
83    * Key of the zone we are iterating over.
84    */
85   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
86
87   /**
88    * Last sequence number in the zone iteration used to address next
89    * result of the zone iteration in the store
90    *
91    * Initialy set to 0.
92    * Updated in #zone_iterate_proc()
93    */
94   uint64_t seq;
95
96   /**
97    * The operation id fot the zone iteration in the response for the client
98    */
99   uint32_t request_id;
100
101   /**
102    * Offset of the zone iteration used to address next result of the zone
103    * iteration in the store
104    *
105    * Initialy set to 0 in #handle_iteration_start
106    * Incremented with by every call to #handle_iteration_next
107    */
108   uint32_t offset;
109
110   /**
111    * Number of pending cache operations triggered by this zone iteration which we
112    * need to wait for before allowing the client to continue.
113    */
114   unsigned int cache_ops;
115
116   /**
117    * Set to #GNUNET_YES if the last iteration exhausted the limit set by the
118    * client and we should send the #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END
119    * message and free the data structure once @e cache_ops is zero.
120    */
121   int send_end;
122 };
123
124
125 /**
126  * A namestore client
127  */
128 struct NamestoreClient
129 {
130   /**
131    * The client
132    */
133   struct GNUNET_SERVICE_Client *client;
134
135   /**
136    * Message queue for transmission to @e client
137    */
138   struct GNUNET_MQ_Handle *mq;
139
140   /**
141    * Head of the DLL of
142    * Zone iteration operations in progress initiated by this client
143    */
144   struct ZoneIteration *op_head;
145
146   /**
147    * Tail of the DLL of
148    * Zone iteration operations in progress initiated by this client
149    */
150   struct ZoneIteration *op_tail;
151 };
152
153
154 /**
155  * A namestore monitor.
156  */
157 struct ZoneMonitor
158 {
159   /**
160    * Next element in the DLL
161    */
162   struct ZoneMonitor *next;
163
164   /**
165    * Previous element in the DLL
166    */
167   struct ZoneMonitor *prev;
168
169   /**
170    * Namestore client which intiated this zone monitor
171    */
172   struct NamestoreClient *nc;
173
174   /**
175    * Private key of the zone.
176    */
177   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
178
179   /**
180    * Task active during initial iteration.
181    */
182   struct GNUNET_SCHEDULER_Task *task;
183
184   /**
185    * Task to warn about slow monitors.
186    */
187   struct GNUNET_SCHEDULER_Task *sa_wait_warning;
188
189   /**
190    * Since when are we blocked on this monitor?
191    */
192   struct GNUNET_TIME_Absolute sa_waiting_start;
193
194   /**
195    * Last sequence number in the zone iteration used to address next
196    * result of the zone iteration in the store
197    *
198    * Initialy set to 0.
199    * Updated in #monitor_iterate_cb()
200    */
201   uint64_t seq;
202
203   /**
204    * Current limit of how many more messages we are allowed
205    * to queue to this monitor.
206    */
207   uint64_t limit;
208
209   /**
210    * How many more requests may we receive from the iterator
211    * before it is at the limit we gave it?  Will be below or
212    * equal to @e limit.  The effective limit for monitor
213    * events is thus @e iteration_cnt - @e limit!
214    */
215   uint64_t iteration_cnt;
216
217   /**
218    * Are we (still) in the initial iteration pass?
219    */
220   int in_first_iteration;
221
222   /**
223    * Is there a store activity waiting for this monitor?  We only raise the
224    * flag when it happens and search the DLL for the store activity when we
225    * had a limit increase.  If we cannot find any waiting store activity at
226    * that time, we clear the flag again.
227    */
228   int sa_waiting;
229 };
230
231
232 /**
233  * Pending operation on the namecache.
234  */
235 struct CacheOperation
236 {
237   /**
238    * Kept in a DLL.
239    */
240   struct CacheOperation *prev;
241
242   /**
243    * Kept in a DLL.
244    */
245   struct CacheOperation *next;
246
247   /**
248    * Handle to namecache queue.
249    */
250   struct GNUNET_NAMECACHE_QueueEntry *qe;
251
252   /**
253    * Client to notify about the result, can be NULL.
254    */
255   struct NamestoreClient *nc;
256
257   /**
258    * Zone iteration to call #zone_iteration_done_client_continue()
259    * for if applicable, can be NULL.
260    */
261   struct ZoneIteration *zi;
262
263   /**
264    * Client's request ID.
265    */
266   uint32_t rid;
267 };
268
269
270 /**
271  * Information for an ongoing #handle_record_store() operation.
272  * Needed as we may wait for monitors to be ready for the notification.
273  */
274 struct StoreActivity
275 {
276   /**
277    * Kept in a DLL.
278    */
279   struct StoreActivity *next;
280
281   /**
282    * Kept in a DLL.
283    */
284   struct StoreActivity *prev;
285
286   /**
287    * Which client triggered the store activity?
288    */
289   struct NamestoreClient *nc;
290
291   /**
292    * Copy of the original store message (as data fields in @e rd will
293    * point into it!).
294    */
295   const struct RecordStoreMessage *rsm;
296
297   /**
298    * Next zone monitor that still needs to be notified about this PUT.
299    */
300   struct ZoneMonitor *zm_pos;
301
302   /**
303    * Label nicely canonicalized (lower case).
304    */
305   char *conv_name;
306 };
307
308
309 /**
310  * Entry in list of cached nick resolutions.
311  */
312 struct NickCache
313 {
314   /**
315    * Zone the cache entry is for.
316    */
317   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
318
319   /**
320    * Cached record data.
321    */
322   struct GNUNET_GNSRECORD_Data *rd;
323
324   /**
325    * Timestamp when this cache entry was used last.
326    */
327   struct GNUNET_TIME_Absolute last_used;
328 };
329
330
331 /**
332  * We cache nick records to reduce DB load.
333  */
334 static struct NickCache nick_cache[NC_SIZE];
335
336 /**
337  * Public key of all zeros.
338  */
339 static const struct GNUNET_CRYPTO_EcdsaPrivateKey zero;
340
341 /**
342  * Configuration handle.
343  */
344 static const struct GNUNET_CONFIGURATION_Handle *GSN_cfg;
345
346 /**
347  * Handle to the statistics service
348  */
349 static struct GNUNET_STATISTICS_Handle *statistics;
350
351 /**
352  * Namecache handle.
353  */
354 static struct GNUNET_NAMECACHE_Handle *namecache;
355
356 /**
357  * Database handle
358  */
359 static struct GNUNET_NAMESTORE_PluginFunctions *GSN_database;
360
361 /**
362  * Name of the database plugin
363  */
364 static char *db_lib_name;
365
366 /**
367  * Head of cop DLL.
368  */
369 static struct CacheOperation *cop_head;
370
371 /**
372  * Tail of cop DLL.
373  */
374 static struct CacheOperation *cop_tail;
375
376 /**
377  * First active zone monitor.
378  */
379 static struct ZoneMonitor *monitor_head;
380
381 /**
382  * Last active zone monitor.
383  */
384 static struct ZoneMonitor *monitor_tail;
385
386 /**
387  * Head of DLL of monitor-blocked store activities.
388  */
389 static struct StoreActivity *sa_head;
390
391 /**
392  * Tail of DLL of monitor-blocked store activities.
393  */
394 static struct StoreActivity *sa_tail;
395
396 /**
397  * Notification context shared by all monitors.
398  */
399 static struct GNUNET_NotificationContext *monitor_nc;
400
401 /**
402  * Optimize block insertion by caching map of private keys to
403  * public keys in memory?
404  */
405 static int cache_keys;
406
407 /**
408  * Use the namecache? Doing so creates additional cryptographic
409  * operations whenever we touch a record.
410  */
411 static int disable_namecache;
412
413
414 /**
415  * Task run during shutdown.
416  *
417  * @param cls unused
418  */
419 static void
420 cleanup_task (void *cls)
421 {
422   struct CacheOperation *cop;
423
424   (void) cls;
425   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stopping namestore service\n");
426   while (NULL != (cop = cop_head))
427   {
428     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
429                 "Aborting incomplete namecache operation\n");
430     GNUNET_NAMECACHE_cancel (cop->qe);
431     GNUNET_CONTAINER_DLL_remove (cop_head, cop_tail, cop);
432     GNUNET_free (cop);
433   }
434   if (NULL != namecache)
435   {
436     GNUNET_NAMECACHE_disconnect (namecache);
437     namecache = NULL;
438   }
439   GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name, GSN_database));
440   GNUNET_free (db_lib_name);
441   db_lib_name = NULL;
442   if (NULL != monitor_nc)
443   {
444     GNUNET_notification_context_destroy (monitor_nc);
445     monitor_nc = NULL;
446   }
447   if (NULL != statistics)
448   {
449     GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
450     statistics = NULL;
451   }
452 }
453
454
455 /**
456  * Release memory used by @a sa.
457  *
458  * @param sa activity to free
459  */
460 static void
461 free_store_activity (struct StoreActivity *sa)
462 {
463   GNUNET_CONTAINER_DLL_remove (sa_head, sa_tail, sa);
464   GNUNET_free (sa->conv_name);
465   GNUNET_free (sa);
466 }
467
468
469 /**
470  * Function called with the records for the #GNUNET_GNS_EMPTY_LABEL_AT
471  * label in the zone.  Used to locate the #GNUNET_GNSRECORD_TYPE_NICK
472  * record, which (if found) is then copied to @a cls for future use.
473  *
474  * @param cls a `struct GNUNET_GNSRECORD_Data **` for storing the nick (if found)
475  * @param seq sequence number of the record, MUST NOT BE ZERO
476  * @param private_key the private key of the zone (unused)
477  * @param label should be #GNUNET_GNS_EMPTY_LABEL_AT
478  * @param rd_count number of records in @a rd
479  * @param rd records stored under @a label in the zone
480  */
481 static void
482 lookup_nick_it (void *cls,
483                 uint64_t seq,
484                 const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
485                 const char *label,
486                 unsigned int rd_count,
487                 const struct GNUNET_GNSRECORD_Data *rd)
488 {
489   struct GNUNET_GNSRECORD_Data **res = cls;
490
491   (void) private_key;
492   GNUNET_assert (0 != seq);
493   if (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT))
494   {
495     GNUNET_break (0);
496     return;
497   }
498   for (unsigned int c = 0; c < rd_count; c++)
499   {
500     if (GNUNET_GNSRECORD_TYPE_NICK == rd[c].record_type)
501     {
502       (*res) =
503         GNUNET_malloc (rd[c].data_size + sizeof(struct GNUNET_GNSRECORD_Data));
504       (*res)->data = &(*res)[1];
505       GNUNET_memcpy ((void *) (*res)->data, rd[c].data, rd[c].data_size);
506       (*res)->data_size = rd[c].data_size;
507       (*res)->expiration_time = rd[c].expiration_time;
508       (*res)->flags = rd[c].flags;
509       (*res)->record_type = GNUNET_GNSRECORD_TYPE_NICK;
510       return;
511     }
512   }
513   (*res) = NULL;
514 }
515
516
517 /**
518  * Add entry to the cache for @a zone and @a nick
519  *
520  * @param zone zone key to cache under
521  * @param nick nick entry to cache
522  */
523 static void
524 cache_nick (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone,
525             const struct GNUNET_GNSRECORD_Data *nick)
526 {
527   struct NickCache *oldest;
528
529   oldest = NULL;
530   for (unsigned int i = 0; i < NC_SIZE; i++)
531   {
532     struct NickCache *pos = &nick_cache[i];
533
534     if ((NULL == oldest) ||
535         (oldest->last_used.abs_value_us > pos->last_used.abs_value_us))
536       oldest = pos;
537     if (0 == GNUNET_memcmp (zone, &pos->zone))
538     {
539       oldest = pos;
540       break;
541     }
542   }
543   GNUNET_free_non_null (oldest->rd);
544   oldest->zone = *zone;
545   if (NULL != nick)
546   {
547     oldest->rd = GNUNET_malloc (sizeof(*nick) + nick->data_size);
548     *oldest->rd = *nick;
549     oldest->rd->data = &oldest->rd[1];
550     memcpy (&oldest->rd[1], nick->data, nick->data_size);
551   }
552   else
553   {
554     oldest->rd = NULL;
555   }
556   oldest->last_used = GNUNET_TIME_absolute_get ();
557 }
558
559
560 /**
561  * Return the NICK record for the zone (if it exists).
562  *
563  * @param zone private key for the zone to look for nick
564  * @return NULL if no NICK record was found
565  */
566 static struct GNUNET_GNSRECORD_Data *
567 get_nick_record (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone)
568 {
569   struct GNUNET_CRYPTO_EcdsaPublicKey pub;
570   struct GNUNET_GNSRECORD_Data *nick;
571   int res;
572
573   /* check cache first */
574   for (unsigned int i = 0; i < NC_SIZE; i++)
575   {
576     struct NickCache *pos = &nick_cache[i];
577     if ((NULL != pos->rd) && (0 == GNUNET_memcmp (zone, &pos->zone)))
578     {
579       if (NULL == pos->rd)
580         return NULL;
581       nick = GNUNET_malloc (sizeof(*nick) + pos->rd->data_size);
582       *nick = *pos->rd;
583       nick->data = &nick[1];
584       memcpy (&nick[1], pos->rd->data, pos->rd->data_size);
585       pos->last_used = GNUNET_TIME_absolute_get ();
586       return nick;
587     }
588   }
589
590   nick = NULL;
591   res = GSN_database->lookup_records (GSN_database->cls,
592                                       zone,
593                                       GNUNET_GNS_EMPTY_LABEL_AT,
594                                       &lookup_nick_it,
595                                       &nick);
596   if ((GNUNET_OK != res) || (NULL == nick))
597   {
598 #if ! defined(GNUNET_CULL_LOGGING)
599     static int do_log = GNUNET_LOG_CALL_STATUS;
600
601     if (0 == do_log)
602       do_log = GNUNET_get_log_call_status (GNUNET_ERROR_TYPE_DEBUG,
603                                            "namestore",
604                                            __FILE__,
605                                            __FUNCTION__,
606                                            __LINE__);
607     if (1 == do_log)
608     {
609       GNUNET_CRYPTO_ecdsa_key_get_public (zone, &pub);
610       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
611                   "No nick name set for zone `%s'\n",
612                   GNUNET_GNSRECORD_z2s (&pub));
613     }
614 #endif
615     /* update cache */
616     cache_nick (zone, NULL);
617     return NULL;
618   }
619
620   /* update cache */
621   cache_nick (zone, nick);
622   return nick;
623 }
624
625
626 /**
627  * Merge the nick record @a nick_rd with the rest of the
628  * record set given in @a rd2.  Store the result in @a rdc_res
629  * and @a rd_res.  The @a nick_rd's expiration time is set to
630  * the maximum expiration time of all of the records in @a rd2.
631  *
632  * @param nick_rd the nick record to integrate
633  * @param rd2_length length of the @a rd2 array
634  * @param rd2 array of records
635  * @param rdc_res[out] length of the resulting @a rd_res array
636  * @param rd_res[out] set to an array of records,
637  *                    including @a nick_rd and @a rd2;
638  *           all of the variable-size 'data' fields in @a rd2 are
639  *           allocated in the same chunk of memory!
640  */
641 static void
642 merge_with_nick_records (const struct GNUNET_GNSRECORD_Data *nick_rd,
643                          unsigned int rd2_length,
644                          const struct GNUNET_GNSRECORD_Data *rd2,
645                          unsigned int *rdc_res,
646                          struct GNUNET_GNSRECORD_Data **rd_res)
647 {
648   uint64_t latest_expiration;
649   size_t req;
650   char *data;
651   size_t data_offset;
652   struct GNUNET_GNSRECORD_Data *target;
653
654   (*rdc_res) = 1 + rd2_length;
655   if (0 == 1 + rd2_length)
656   {
657     GNUNET_break (0);
658     (*rd_res) = NULL;
659     return;
660   }
661   req = sizeof(struct GNUNET_GNSRECORD_Data) + nick_rd->data_size;
662   for (unsigned int i = 0; i < rd2_length; i++)
663   {
664     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
665
666     if (req + sizeof(struct GNUNET_GNSRECORD_Data) + orig->data_size < req)
667     {
668       GNUNET_break (0);
669       (*rd_res) = NULL;
670       return;
671     }
672     req += sizeof(struct GNUNET_GNSRECORD_Data) + orig->data_size;
673   }
674   target = GNUNET_malloc (req);
675   (*rd_res) = target;
676   data = (char *) &target[1 + rd2_length];
677   data_offset = 0;
678   latest_expiration = 0;
679   for (unsigned int i = 0; i < rd2_length; i++)
680   {
681     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
682
683     if (0 != (orig->flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
684     {
685       if ((GNUNET_TIME_absolute_get ().abs_value_us + orig->expiration_time) >
686           latest_expiration)
687         latest_expiration = orig->expiration_time;
688     }
689     else if (orig->expiration_time > latest_expiration)
690       latest_expiration = orig->expiration_time;
691     target[i] = *orig;
692     target[i].data = (void *) &data[data_offset];
693     GNUNET_memcpy (&data[data_offset], orig->data, orig->data_size);
694     data_offset += orig->data_size;
695   }
696   /* append nick */
697   target[rd2_length] = *nick_rd;
698   /* Mark as supplemental */
699   target[rd2_length].flags = nick_rd->flags | GNUNET_GNSRECORD_RF_SUPPLEMENTAL;
700   target[rd2_length].expiration_time = latest_expiration;
701   target[rd2_length].data = (void *) &data[data_offset];
702   GNUNET_memcpy (&data[data_offset], nick_rd->data, nick_rd->data_size);
703   data_offset += nick_rd->data_size;
704   GNUNET_assert (req == (sizeof(struct GNUNET_GNSRECORD_Data)) * (*rdc_res)
705                  + data_offset);
706 }
707
708
709 /**
710  * Generate a `struct LookupNameResponseMessage` and send it to the
711  * given client using the given notification context.
712  *
713  * @param nc client to unicast to
714  * @param request_id request ID to use
715  * @param zone_key zone key of the zone
716  * @param name name
717  * @param rd_count number of records in @a rd
718  * @param rd array of records
719  */
720 static void
721 send_lookup_response (struct NamestoreClient *nc,
722                       uint32_t request_id,
723                       const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
724                       const char *name,
725                       unsigned int rd_count,
726                       const struct GNUNET_GNSRECORD_Data *rd)
727 {
728   struct GNUNET_MQ_Envelope *env;
729   struct RecordResultMessage *zir_msg;
730   struct GNUNET_GNSRECORD_Data *nick;
731   struct GNUNET_GNSRECORD_Data *res;
732   unsigned int res_count;
733   size_t name_len;
734   ssize_t rd_ser_len;
735   char *name_tmp;
736   char *rd_ser;
737
738   nick = get_nick_record (zone_key);
739   GNUNET_assert (-1 != GNUNET_GNSRECORD_records_get_size (rd_count, rd));
740
741   if ((NULL != nick) && (0 != strcmp (name, GNUNET_GNS_EMPTY_LABEL_AT)))
742   {
743     nick->flags =
744       (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
745     merge_with_nick_records (nick, rd_count, rd, &res_count, &res);
746     GNUNET_free (nick);
747   }
748   else
749   {
750     res_count = rd_count;
751     res = (struct GNUNET_GNSRECORD_Data *) rd;
752   }
753
754   GNUNET_assert (-1 != GNUNET_GNSRECORD_records_get_size (res_count, res));
755
756
757   name_len = strlen (name) + 1;
758   rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count, res);
759   if (rd_ser_len < 0)
760   {
761     GNUNET_break (0);
762     GNUNET_SERVICE_client_drop (nc->client);
763     return;
764   }
765   if (((size_t) rd_ser_len) >= UINT16_MAX - name_len - sizeof(*zir_msg))
766   {
767     GNUNET_break (0);
768     GNUNET_SERVICE_client_drop (nc->client);
769     return;
770   }
771   env = GNUNET_MQ_msg_extra (zir_msg,
772                              name_len + rd_ser_len,
773                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
774   zir_msg->gns_header.r_id = htonl (request_id);
775   zir_msg->name_len = htons (name_len);
776   zir_msg->rd_count = htons (res_count);
777   zir_msg->rd_len = htons ((uint16_t) rd_ser_len);
778   zir_msg->private_key = *zone_key;
779   name_tmp = (char *) &zir_msg[1];
780   GNUNET_memcpy (name_tmp, name, name_len);
781   rd_ser = &name_tmp[name_len];
782   GNUNET_assert (
783     rd_ser_len ==
784     GNUNET_GNSRECORD_records_serialize (res_count, res, rd_ser_len, rd_ser));
785   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786               "Sending RECORD_RESULT message with %u records\n",
787               res_count);
788   GNUNET_STATISTICS_update (statistics,
789                             "Record sets sent to clients",
790                             1,
791                             GNUNET_NO);
792   GNUNET_MQ_send (nc->mq, env);
793   if (rd != res)
794     GNUNET_free (res);
795 }
796
797
798 /**
799  * Send response to the store request to the client.
800  *
801  * @param client client to talk to
802  * @param res status of the operation
803  * @param rid client's request ID
804  */
805 static void
806 send_store_response (struct NamestoreClient *nc, int res, uint32_t rid)
807 {
808   struct GNUNET_MQ_Envelope *env;
809   struct RecordStoreResponseMessage *rcr_msg;
810
811   GNUNET_assert (NULL != nc);
812   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813               "Sending RECORD_STORE_RESPONSE message\n");
814   GNUNET_STATISTICS_update (statistics,
815                             "Store requests completed",
816                             1,
817                             GNUNET_NO);
818   env = GNUNET_MQ_msg (rcr_msg,
819                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE_RESPONSE);
820   rcr_msg->gns_header.r_id = htonl (rid);
821   rcr_msg->op_result = htonl (res);
822   GNUNET_MQ_send (nc->mq, env);
823 }
824
825
826 /**
827  * Function called once we are done with the zone iteration and
828  * allow the zone iteration client to send us more messages.
829  *
830  * @param zi zone iteration we are processing
831  */
832 static void
833 zone_iteration_done_client_continue (struct ZoneIteration *zi)
834 {
835   struct GNUNET_MQ_Envelope *env;
836   struct GNUNET_NAMESTORE_Header *em;
837
838   GNUNET_SERVICE_client_continue (zi->nc->client);
839   if (! zi->send_end)
840     return;
841   /* send empty response to indicate end of list */
842   env = GNUNET_MQ_msg (em, GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END);
843   em->r_id = htonl (zi->request_id);
844   GNUNET_MQ_send (zi->nc->mq, env);
845
846   GNUNET_CONTAINER_DLL_remove (zi->nc->op_head, zi->nc->op_tail, zi);
847   GNUNET_free (zi);
848 }
849
850
851 /**
852  * Cache operation complete, clean up.
853  *
854  * @param cls the `struct CacheOperation`
855  * @param success success
856  * @param emsg error messages
857  */
858 static void
859 finish_cache_operation (void *cls, int32_t success, const char *emsg)
860 {
861   struct CacheOperation *cop = cls;
862   struct ZoneIteration *zi;
863
864   if (NULL != emsg)
865     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
866                 _ ("Failed to replicate block in namecache: %s\n"),
867                 emsg);
868   else
869     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "CACHE operation completed\n");
870   GNUNET_CONTAINER_DLL_remove (cop_head, cop_tail, cop);
871   if (NULL != cop->nc)
872     send_store_response (cop->nc, success, cop->rid);
873   if (NULL != (zi = cop->zi))
874   {
875     zi->cache_ops--;
876     if (0 == zi->cache_ops)
877     {
878       /* unchoke zone iteration, cache has caught up */
879       zone_iteration_done_client_continue (zi);
880     }
881   }
882   GNUNET_free (cop);
883 }
884
885
886 /**
887  * We just touched the plaintext information about a name in our zone;
888  * refresh the corresponding (encrypted) block in the namecache.
889  *
890  * @param nc client responsible for the request, can be NULL
891  * @param zi zone iteration response for the request, can be NULL
892  * @param rid request ID of the client
893  * @param zone_key private key of the zone
894  * @param name label for the records
895  * @param rd_count number of records
896  * @param rd records stored under the given @a name
897  */
898 static void
899 refresh_block (struct NamestoreClient *nc,
900                struct ZoneIteration *zi,
901                uint32_t rid,
902                const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
903                const char *name,
904                unsigned int rd_count,
905                const struct GNUNET_GNSRECORD_Data *rd)
906 {
907   struct GNUNET_GNSRECORD_Block *block;
908   struct CacheOperation *cop;
909   struct GNUNET_CRYPTO_EcdsaPublicKey pkey;
910   struct GNUNET_GNSRECORD_Data *nick;
911   struct GNUNET_GNSRECORD_Data *res;
912   unsigned int res_count;
913   struct GNUNET_TIME_Absolute exp_time;
914
915   nick = get_nick_record (zone_key);
916   res_count = rd_count;
917   res = (struct GNUNET_GNSRECORD_Data *) rd;  /* fixme: a bit unclean... */
918   if (NULL != nick  && (0 != strcmp (name, GNUNET_GNS_EMPTY_LABEL_AT)))
919   {
920     nick->flags =
921       (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
922     merge_with_nick_records (nick, rd_count, rd, &res_count, &res);
923     GNUNET_free (nick);
924   }
925   if (0 == res_count)
926   {
927     if (NULL != nc)
928       send_store_response (nc, GNUNET_OK, rid);
929     return;   /* no data, no need to update cache */
930   }
931   if (GNUNET_YES == disable_namecache)
932   {
933     GNUNET_STATISTICS_update (statistics,
934                               "Namecache updates skipped (NC disabled)",
935                               1,
936                               GNUNET_NO);
937     if (NULL != nc)
938       send_store_response (nc, GNUNET_OK, rid);
939     return;
940   }
941   exp_time = GNUNET_GNSRECORD_record_get_expiration_time (res_count, res);
942   if (cache_keys)
943     block =
944       GNUNET_GNSRECORD_block_create2 (zone_key, exp_time, name, res, res_count);
945   else
946     block =
947       GNUNET_GNSRECORD_block_create (zone_key, exp_time, name, res, res_count);
948   GNUNET_assert (NULL != block);
949   GNUNET_CRYPTO_ecdsa_key_get_public (zone_key, &pkey);
950   GNUNET_log (
951     GNUNET_ERROR_TYPE_DEBUG,
952     "Caching block for label `%s' with %u records and expiration %s in zone `%s' in namecache\n",
953     name,
954     res_count,
955     GNUNET_STRINGS_absolute_time_to_string (exp_time),
956     GNUNET_GNSRECORD_z2s (&pkey));
957   GNUNET_STATISTICS_update (statistics,
958                             "Namecache updates pushed",
959                             1,
960                             GNUNET_NO);
961   cop = GNUNET_new (struct CacheOperation);
962   cop->nc = nc;
963   cop->zi = zi;
964   if (NULL != zi)
965     zi->cache_ops++;
966   cop->rid = rid;
967   GNUNET_CONTAINER_DLL_insert (cop_head, cop_tail, cop);
968   cop->qe = GNUNET_NAMECACHE_block_cache (namecache,
969                                           block,
970                                           &finish_cache_operation,
971                                           cop);
972   GNUNET_free (block);
973 }
974
975
976 /**
977  * Print a warning that one of our monitors is no longer reacting.
978  *
979  * @param cls a `struct ZoneMonitor` to warn about
980  */
981 static void
982 warn_monitor_slow (void *cls)
983 {
984   struct ZoneMonitor *zm = cls;
985
986   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
987               "No response from monitor since %s\n",
988               GNUNET_STRINGS_absolute_time_to_string (zm->sa_waiting_start));
989   zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
990                                                       &warn_monitor_slow,
991                                                       zm);
992 }
993
994
995 /**
996  * Continue processing the @a sa.
997  *
998  * @param sa store activity to process
999  */
1000 static void
1001 continue_store_activity (struct StoreActivity *sa)
1002 {
1003   const struct RecordStoreMessage *rp_msg = sa->rsm;
1004   unsigned int rd_count;
1005   size_t name_len;
1006   size_t rd_ser_len;
1007   uint32_t rid;
1008   const char *name_tmp;
1009   const char *rd_ser;
1010
1011   rid = ntohl (rp_msg->gns_header.r_id);
1012   name_len = ntohs (rp_msg->name_len);
1013   rd_count = ntohs (rp_msg->rd_count);
1014   rd_ser_len = ntohs (rp_msg->rd_len);
1015   name_tmp = (const char *) &rp_msg[1];
1016   rd_ser = &name_tmp[name_len];
1017   {
1018     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL (rd_count)];
1019
1020     /* We did this before, must succeed again */
1021     GNUNET_assert (
1022       GNUNET_OK ==
1023       GNUNET_GNSRECORD_records_deserialize (rd_ser_len, rd_ser, rd_count, rd));
1024
1025     for (struct ZoneMonitor *zm = sa->zm_pos; NULL != zm; zm = sa->zm_pos)
1026     {
1027       if ((0 != GNUNET_memcmp (&rp_msg->private_key, &zm->zone)) &&
1028           (0 != GNUNET_memcmp (&zm->zone, &zero)))
1029       {
1030         sa->zm_pos = zm->next;     /* not interesting to this monitor */
1031         continue;
1032       }
1033       if (zm->limit == zm->iteration_cnt)
1034       {
1035         zm->sa_waiting = GNUNET_YES;
1036         zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
1037         if (NULL != zm->sa_wait_warning)
1038           GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1039         zm->sa_wait_warning =
1040           GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1041                                         &warn_monitor_slow,
1042                                         zm);
1043         return;     /* blocked on zone monitor */
1044       }
1045       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1046                   "Notifying monitor about changes under label `%s'\n",
1047                   sa->conv_name);
1048       zm->limit--;
1049       send_lookup_response (zm->nc,
1050                             0,
1051                             &rp_msg->private_key,
1052                             sa->conv_name,
1053                             rd_count,
1054                             rd);
1055       sa->zm_pos = zm->next;
1056     }
1057     /* great, done with the monitors, unpack (again) for refresh_block operation */
1058     refresh_block (sa->nc,
1059                    NULL,
1060                    rid,
1061                    &rp_msg->private_key,
1062                    sa->conv_name,
1063                    rd_count,
1064                    rd);
1065   }
1066   GNUNET_SERVICE_client_continue (sa->nc->client);
1067   free_store_activity (sa);
1068 }
1069
1070
1071 /**
1072  * Called whenever a client is disconnected.
1073  * Frees our resources associated with that client.
1074  *
1075  * @param cls closure
1076  * @param client identification of the client
1077  * @param app_ctx the `struct NamestoreClient` of @a client
1078  */
1079 static void
1080 client_disconnect_cb (void *cls,
1081                       struct GNUNET_SERVICE_Client *client,
1082                       void *app_ctx)
1083 {
1084   struct NamestoreClient *nc = app_ctx;
1085   struct ZoneIteration *no;
1086   struct CacheOperation *cop;
1087
1088   (void) cls;
1089   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
1090   for (struct ZoneMonitor *zm = monitor_head; NULL != zm; zm = zm->next)
1091   {
1092     struct StoreActivity *san;
1093
1094     if (nc != zm->nc)
1095       continue;
1096     GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, zm);
1097     if (NULL != zm->task)
1098     {
1099       GNUNET_SCHEDULER_cancel (zm->task);
1100       zm->task = NULL;
1101     }
1102     if (NULL != zm->sa_wait_warning)
1103     {
1104       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1105       zm->sa_wait_warning = NULL;
1106     }
1107     for (struct StoreActivity *sa = sa_head; NULL != sa; sa = san)
1108     {
1109       san = sa->next;
1110       if (zm == sa->zm_pos)
1111       {
1112         sa->zm_pos = zm->next;
1113         /* this may free sa */
1114         continue_store_activity (sa);
1115       }
1116     }
1117     GNUNET_free (zm);
1118     break;
1119   }
1120   for (struct StoreActivity *sa = sa_head; NULL != sa; sa = sa->next)
1121   {
1122     if (sa->nc == nc)
1123     {
1124       /* this may free sa */
1125       free_store_activity (sa);
1126       break;     /* there can only be one per nc */
1127     }
1128   }
1129   while (NULL != (no = nc->op_head))
1130   {
1131     GNUNET_CONTAINER_DLL_remove (nc->op_head, nc->op_tail, no);
1132     GNUNET_free (no);
1133   }
1134   for (cop = cop_head; NULL != cop; cop = cop->next)
1135     if (nc == cop->nc)
1136       cop->nc = NULL;
1137   GNUNET_free (nc);
1138 }
1139
1140
1141 /**
1142  * Add a client to our list of active clients.
1143  *
1144  * @param cls NULL
1145  * @param client client to add
1146  * @param mq message queue for @a client
1147  * @return internal namestore client structure for this client
1148  */
1149 static void *
1150 client_connect_cb (void *cls,
1151                    struct GNUNET_SERVICE_Client *client,
1152                    struct GNUNET_MQ_Handle *mq)
1153 {
1154   struct NamestoreClient *nc;
1155
1156   (void) cls;
1157   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p connected\n", client);
1158   nc = GNUNET_new (struct NamestoreClient);
1159   nc->client = client;
1160   nc->mq = mq;
1161   return nc;
1162 }
1163
1164
1165 /**
1166  * Closure for #lookup_it().
1167  */
1168 struct RecordLookupContext
1169 {
1170   /**
1171    * FIXME.
1172    */
1173   const char *label;
1174
1175   /**
1176    * FIXME.
1177    */
1178   char *res_rd;
1179
1180   /**
1181    * FIXME.
1182    */
1183   struct GNUNET_GNSRECORD_Data *nick;
1184
1185   /**
1186    * FIXME.
1187    */
1188   int found;
1189
1190   /**
1191    * FIXME.
1192    */
1193   unsigned int res_rd_count;
1194
1195   /**
1196    * FIXME.
1197    */
1198   ssize_t rd_ser_len;
1199 };
1200
1201
1202 /**
1203  * Function called by the namestore plugin when we are trying to lookup
1204  * a record as part of #handle_record_lookup().  Merges all results into
1205  * the context.
1206  *
1207  * @param cls closure with a `struct RecordLookupContext`
1208  * @param seq unique serial number of the record, MUST NOT BE ZERO
1209  * @param zone_key private key of the zone
1210  * @param label name that is being mapped (at most 255 characters long)
1211  * @param rd_count number of entries in @a rd array
1212  * @param rd array of records with data to store
1213  */
1214 static void
1215 lookup_it (void *cls,
1216            uint64_t seq,
1217            const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
1218            const char *label,
1219            unsigned int rd_count,
1220            const struct GNUNET_GNSRECORD_Data *rd)
1221 {
1222   struct RecordLookupContext *rlc = cls;
1223
1224   (void) private_key;
1225   GNUNET_assert (0 != seq);
1226   if (0 != strcmp (label, rlc->label))
1227     return;
1228   rlc->found = GNUNET_YES;
1229   if (0 == rd_count)
1230   {
1231     rlc->rd_ser_len = 0;
1232     rlc->res_rd_count = 0;
1233     rlc->res_rd = NULL;
1234     return;
1235   }
1236   if ((NULL != rlc->nick) && (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT)))
1237   {
1238     /* Merge */
1239     struct GNUNET_GNSRECORD_Data *rd_res;
1240     unsigned int rdc_res;
1241
1242     rd_res = NULL;
1243     rdc_res = 0;
1244     rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE)
1245                        ^ GNUNET_GNSRECORD_RF_PRIVATE;
1246     merge_with_nick_records (rlc->nick, rd_count, rd, &rdc_res, &rd_res);
1247     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res, rd_res);
1248     if (rlc->rd_ser_len < 0)
1249     {
1250       GNUNET_break (0);
1251       GNUNET_free (rd_res);
1252       rlc->found = GNUNET_NO;
1253       rlc->rd_ser_len = 0;
1254       return;
1255     }
1256     rlc->res_rd_count = rdc_res;
1257     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1258     if (rlc->rd_ser_len != GNUNET_GNSRECORD_records_serialize (rdc_res,
1259                                                                rd_res,
1260                                                                rlc->rd_ser_len,
1261                                                                rlc->res_rd))
1262     {
1263       GNUNET_break (0);
1264       GNUNET_free (rlc->res_rd);
1265       rlc->res_rd = NULL;
1266       rlc->res_rd_count = 0;
1267       rlc->rd_ser_len = 0;
1268       GNUNET_free (rd_res);
1269       rlc->found = GNUNET_NO;
1270       return;
1271     }
1272     GNUNET_free (rd_res);
1273     GNUNET_free (rlc->nick);
1274     rlc->nick = NULL;
1275   }
1276   else
1277   {
1278     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count, rd);
1279     if (rlc->rd_ser_len < 0)
1280     {
1281       GNUNET_break (0);
1282       rlc->found = GNUNET_NO;
1283       rlc->rd_ser_len = 0;
1284       return;
1285     }
1286     rlc->res_rd_count = rd_count;
1287     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1288     if (rlc->rd_ser_len != GNUNET_GNSRECORD_records_serialize (rd_count,
1289                                                                rd,
1290                                                                rlc->rd_ser_len,
1291                                                                rlc->res_rd))
1292     {
1293       GNUNET_break (0);
1294       GNUNET_free (rlc->res_rd);
1295       rlc->res_rd = NULL;
1296       rlc->res_rd_count = 0;
1297       rlc->rd_ser_len = 0;
1298       rlc->found = GNUNET_NO;
1299       return;
1300     }
1301   }
1302 }
1303
1304
1305 /**
1306  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1307  *
1308  * @param cls client sending the message
1309  * @param ll_msg message of type `struct LabelLookupMessage`
1310  * @return #GNUNET_OK if @a ll_msg is well-formed
1311  */
1312 static int
1313 check_record_lookup (void *cls, const struct LabelLookupMessage *ll_msg)
1314 {
1315   uint32_t name_len;
1316   size_t src_size;
1317
1318   (void) cls;
1319   name_len = ntohl (ll_msg->label_len);
1320   src_size = ntohs (ll_msg->gns_header.header.size);
1321   if (name_len != src_size - sizeof(struct LabelLookupMessage))
1322   {
1323     GNUNET_break (0);
1324     return GNUNET_SYSERR;
1325   }
1326   GNUNET_MQ_check_zero_termination (ll_msg);
1327   return GNUNET_OK;
1328 }
1329
1330
1331 /**
1332  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1333  *
1334  * @param cls client sending the message
1335  * @param ll_msg message of type `struct LabelLookupMessage`
1336  */
1337 static void
1338 handle_record_lookup (void *cls, const struct LabelLookupMessage *ll_msg)
1339 {
1340   struct NamestoreClient *nc = cls;
1341   struct GNUNET_MQ_Envelope *env;
1342   struct LabelLookupResponseMessage *llr_msg;
1343   struct RecordLookupContext rlc;
1344   const char *name_tmp;
1345   char *res_name;
1346   char *conv_name;
1347   uint32_t name_len;
1348   int res;
1349
1350   name_len = ntohl (ll_msg->label_len);
1351   name_tmp = (const char *) &ll_msg[1];
1352   GNUNET_SERVICE_client_continue (nc->client);
1353   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1354               "Received NAMESTORE_RECORD_LOOKUP message for name `%s'\n",
1355               name_tmp);
1356
1357   conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1358   if (NULL == conv_name)
1359   {
1360     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1361                 "Error converting name `%s'\n",
1362                 name_tmp);
1363     GNUNET_SERVICE_client_drop (nc->client);
1364     return;
1365   }
1366   rlc.label = conv_name;
1367   rlc.found = GNUNET_NO;
1368   rlc.res_rd_count = 0;
1369   rlc.res_rd = NULL;
1370   rlc.rd_ser_len = 0;
1371   rlc.nick = get_nick_record (&ll_msg->zone);
1372   res = GSN_database->lookup_records (GSN_database->cls,
1373                                       &ll_msg->zone,
1374                                       conv_name,
1375                                       &lookup_it,
1376                                       &rlc);
1377   GNUNET_free (conv_name);
1378   env =
1379     GNUNET_MQ_msg_extra (llr_msg,
1380                          name_len + rlc.rd_ser_len,
1381                          GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP_RESPONSE);
1382   llr_msg->gns_header.r_id = ll_msg->gns_header.r_id;
1383   llr_msg->private_key = ll_msg->zone;
1384   llr_msg->name_len = htons (name_len);
1385   llr_msg->rd_count = htons (rlc.res_rd_count);
1386   llr_msg->rd_len = htons (rlc.rd_ser_len);
1387   res_name = (char *) &llr_msg[1];
1388   if ((GNUNET_YES == rlc.found) && (GNUNET_OK == res))
1389     llr_msg->found = ntohs (GNUNET_YES);
1390   else
1391     llr_msg->found = ntohs (GNUNET_NO);
1392   GNUNET_memcpy (&llr_msg[1], name_tmp, name_len);
1393   GNUNET_memcpy (&res_name[name_len], rlc.res_rd, rlc.rd_ser_len);
1394   GNUNET_MQ_send (nc->mq, env);
1395   GNUNET_free_non_null (rlc.res_rd);
1396 }
1397
1398
1399 /**
1400  * Checks a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1401  *
1402  * @param cls client sending the message
1403  * @param rp_msg message of type `struct RecordStoreMessage`
1404  * @return #GNUNET_OK if @a rp_msg is well-formed
1405  */
1406 static int
1407 check_record_store (void *cls, const struct RecordStoreMessage *rp_msg)
1408 {
1409   size_t name_len;
1410   size_t msg_size;
1411   size_t msg_size_exp;
1412   size_t rd_ser_len;
1413   const char *name_tmp;
1414
1415   (void) cls;
1416   name_len = ntohs (rp_msg->name_len);
1417   msg_size = ntohs (rp_msg->gns_header.header.size);
1418   rd_ser_len = ntohs (rp_msg->rd_len);
1419   msg_size_exp = sizeof(struct RecordStoreMessage) + name_len + rd_ser_len;
1420   if (msg_size != msg_size_exp)
1421   {
1422     GNUNET_break (0);
1423     return GNUNET_SYSERR;
1424   }
1425   if ((0 == name_len) || (name_len > MAX_NAME_LEN))
1426   {
1427     GNUNET_break (0);
1428     return GNUNET_SYSERR;
1429   }
1430   name_tmp = (const char *) &rp_msg[1];
1431   if ('\0' != name_tmp[name_len - 1])
1432   {
1433     GNUNET_break (0);
1434     return GNUNET_SYSERR;
1435   }
1436   return GNUNET_OK;
1437 }
1438
1439
1440 /**
1441  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1442  *
1443  * @param cls client sending the message
1444  * @param rp_msg message of type `struct RecordStoreMessage`
1445  */
1446 static void
1447 handle_record_store (void *cls, const struct RecordStoreMessage *rp_msg)
1448 {
1449   struct NamestoreClient *nc = cls;
1450   size_t name_len;
1451   size_t rd_ser_len;
1452   uint32_t rid;
1453   const char *name_tmp;
1454   char *conv_name;
1455   const char *rd_ser;
1456   unsigned int rd_count;
1457   int res;
1458   struct StoreActivity *sa;
1459
1460   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1461               "Received NAMESTORE_RECORD_STORE message\n");
1462   rid = ntohl (rp_msg->gns_header.r_id);
1463   name_len = ntohs (rp_msg->name_len);
1464   rd_count = ntohs (rp_msg->rd_count);
1465   rd_ser_len = ntohs (rp_msg->rd_len);
1466   GNUNET_break (0 == ntohs (rp_msg->reserved));
1467   name_tmp = (const char *) &rp_msg[1];
1468   rd_ser = &name_tmp[name_len];
1469   {
1470     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL (rd_count)];
1471
1472     if (GNUNET_OK !=
1473         GNUNET_GNSRECORD_records_deserialize (rd_ser_len, rd_ser, rd_count, rd))
1474     {
1475       GNUNET_break (0);
1476       GNUNET_SERVICE_client_drop (nc->client);
1477       return;
1478     }
1479
1480     /* Extracting and converting private key */
1481     conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1482     if (NULL == conv_name)
1483     {
1484       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1485                   "Error converting name `%s'\n",
1486                   name_tmp);
1487       GNUNET_SERVICE_client_drop (nc->client);
1488       return;
1489     }
1490     GNUNET_STATISTICS_update (statistics,
1491                               "Well-formed store requests received",
1492                               1,
1493                               GNUNET_NO);
1494     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1495                 "Creating %u records for name `%s'\n",
1496                 (unsigned int) rd_count,
1497                 conv_name);
1498     if ((0 == rd_count) &&
1499         (GNUNET_NO == GSN_database->lookup_records (GSN_database->cls,
1500                                                     &rp_msg->private_key,
1501                                                     conv_name,
1502                                                     NULL,
1503                                                     0)))
1504     {
1505       /* This name does not exist, so cannot be removed */
1506       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1507                   "Name `%s' does not exist, no deletion required\n",
1508                   conv_name);
1509       res = GNUNET_NO;
1510     }
1511     else
1512     {
1513       /* remove "NICK" records, unless this is for the
1514        #GNUNET_GNS_EMPTY_LABEL_AT label */
1515       struct GNUNET_GNSRECORD_Data rd_clean[GNUNET_NZL (rd_count)];
1516       unsigned int rd_clean_off;
1517       int have_nick;
1518
1519       rd_clean_off = 0;
1520       have_nick = GNUNET_NO;
1521       for (unsigned int i = 0; i < rd_count; i++)
1522       {
1523         rd_clean[rd_clean_off] = rd[i];
1524         if ((0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT, conv_name)) ||
1525             (GNUNET_GNSRECORD_TYPE_NICK != rd[i].record_type))
1526           rd_clean_off++;
1527
1528         if ((0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT, conv_name)) &&
1529             (GNUNET_GNSRECORD_TYPE_NICK == rd[i].record_type))
1530         {
1531           cache_nick (&rp_msg->private_key, &rd[i]);
1532           have_nick = GNUNET_YES;
1533         }
1534       }
1535       if ((0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT, conv_name)) &&
1536           (GNUNET_NO == have_nick))
1537       {
1538         /* remove nick record from cache, in case we have one there */
1539         cache_nick (&rp_msg->private_key, NULL);
1540       }
1541       res = GSN_database->store_records (GSN_database->cls,
1542                                          &rp_msg->private_key,
1543                                          conv_name,
1544                                          rd_clean_off,
1545                                          rd_clean);
1546     }
1547
1548     if (GNUNET_OK != res)
1549     {
1550       /* store not successful, not need to tell monitors */
1551       send_store_response (nc, res, rid);
1552       GNUNET_SERVICE_client_continue (nc->client);
1553       GNUNET_free (conv_name);
1554       return;
1555     }
1556
1557     sa = GNUNET_malloc (sizeof(struct StoreActivity)
1558                         + ntohs (rp_msg->gns_header.header.size));
1559     GNUNET_CONTAINER_DLL_insert (sa_head, sa_tail, sa);
1560     sa->nc = nc;
1561     sa->rsm = (const struct RecordStoreMessage *) &sa[1];
1562     GNUNET_memcpy (&sa[1], rp_msg, ntohs (rp_msg->gns_header.header.size));
1563     sa->zm_pos = monitor_head;
1564     sa->conv_name = conv_name;
1565     continue_store_activity (sa);
1566   }
1567 }
1568
1569
1570 /**
1571  * Context for record remove operations passed from #handle_zone_to_name to
1572  * #handle_zone_to_name_it as closure
1573  */
1574 struct ZoneToNameCtx
1575 {
1576   /**
1577    * Namestore client
1578    */
1579   struct NamestoreClient *nc;
1580
1581   /**
1582    * Request id (to be used in the response to the client).
1583    */
1584   uint32_t rid;
1585
1586   /**
1587    * Set to #GNUNET_OK on success, #GNUNET_SYSERR on error.  Note that
1588    * not finding a name for the zone still counts as a 'success' here,
1589    * as this field is about the success of executing the IPC protocol.
1590    */
1591   int success;
1592 };
1593
1594
1595 /**
1596  * Zone to name iterator
1597  *
1598  * @param cls struct ZoneToNameCtx *
1599  * @param seq sequence number of the record, MUST NOT BE ZERO
1600  * @param zone_key the zone key
1601  * @param name name
1602  * @param rd_count number of records in @a rd
1603  * @param rd record data
1604  */
1605 static void
1606 handle_zone_to_name_it (void *cls,
1607                         uint64_t seq,
1608                         const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1609                         const char *name,
1610                         unsigned int rd_count,
1611                         const struct GNUNET_GNSRECORD_Data *rd)
1612 {
1613   struct ZoneToNameCtx *ztn_ctx = cls;
1614   struct GNUNET_MQ_Envelope *env;
1615   struct ZoneToNameResponseMessage *ztnr_msg;
1616   int16_t res;
1617   size_t name_len;
1618   ssize_t rd_ser_len;
1619   size_t msg_size;
1620   char *name_tmp;
1621   char *rd_tmp;
1622
1623   GNUNET_assert (0 != seq);
1624   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1625               "Found result for zone-to-name lookup: `%s'\n",
1626               name);
1627   res = GNUNET_YES;
1628   name_len = (NULL == name) ? 0 : strlen (name) + 1;
1629   rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count, rd);
1630   if (rd_ser_len < 0)
1631   {
1632     GNUNET_break (0);
1633     ztn_ctx->success = GNUNET_SYSERR;
1634     return;
1635   }
1636   msg_size = sizeof(struct ZoneToNameResponseMessage) + name_len + rd_ser_len;
1637   if (msg_size >= GNUNET_MAX_MESSAGE_SIZE)
1638   {
1639     GNUNET_break (0);
1640     ztn_ctx->success = GNUNET_SYSERR;
1641     return;
1642   }
1643   env =
1644     GNUNET_MQ_msg_extra (ztnr_msg,
1645                          name_len + rd_ser_len,
1646                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1647   ztnr_msg->gns_header.header.size = htons (msg_size);
1648   ztnr_msg->gns_header.r_id = htonl (ztn_ctx->rid);
1649   ztnr_msg->res = htons (res);
1650   ztnr_msg->rd_len = htons (rd_ser_len);
1651   ztnr_msg->rd_count = htons (rd_count);
1652   ztnr_msg->name_len = htons (name_len);
1653   ztnr_msg->zone = *zone_key;
1654   name_tmp = (char *) &ztnr_msg[1];
1655   GNUNET_memcpy (name_tmp, name, name_len);
1656   rd_tmp = &name_tmp[name_len];
1657   GNUNET_assert (
1658     rd_ser_len ==
1659     GNUNET_GNSRECORD_records_serialize (rd_count, rd, rd_ser_len, rd_tmp));
1660   ztn_ctx->success = GNUNET_OK;
1661   GNUNET_MQ_send (ztn_ctx->nc->mq, env);
1662 }
1663
1664
1665 /**
1666  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME message
1667  *
1668  * @param cls client client sending the message
1669  * @param ztn_msg message of type 'struct ZoneToNameMessage'
1670  */
1671 static void
1672 handle_zone_to_name (void *cls, const struct ZoneToNameMessage *ztn_msg)
1673 {
1674   struct NamestoreClient *nc = cls;
1675   struct ZoneToNameCtx ztn_ctx;
1676   struct GNUNET_MQ_Envelope *env;
1677   struct ZoneToNameResponseMessage *ztnr_msg;
1678
1679   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ZONE_TO_NAME message\n");
1680   ztn_ctx.rid = ntohl (ztn_msg->gns_header.r_id);
1681   ztn_ctx.nc = nc;
1682   ztn_ctx.success = GNUNET_NO;
1683   if (GNUNET_SYSERR == GSN_database->zone_to_name (GSN_database->cls,
1684                                                    &ztn_msg->zone,
1685                                                    &ztn_msg->value_zone,
1686                                                    &handle_zone_to_name_it,
1687                                                    &ztn_ctx))
1688   {
1689     /* internal error, hang up instead of signalling something
1690        that might be wrong */
1691     GNUNET_break (0);
1692     GNUNET_SERVICE_client_drop (nc->client);
1693     return;
1694   }
1695   if (GNUNET_NO == ztn_ctx.success)
1696   {
1697     /* no result found, send empty response */
1698     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1699                 "Found no result for zone-to-name lookup.\n");
1700     env = GNUNET_MQ_msg (ztnr_msg,
1701                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1702     ztnr_msg->gns_header.r_id = ztn_msg->gns_header.r_id;
1703     ztnr_msg->res = htons (GNUNET_NO);
1704     GNUNET_MQ_send (nc->mq, env);
1705   }
1706   GNUNET_SERVICE_client_continue (nc->client);
1707 }
1708
1709
1710 /**
1711  * Context for record remove operations passed from
1712  * #run_zone_iteration_round to #zone_iterate_proc as closure
1713  */
1714 struct ZoneIterationProcResult
1715 {
1716   /**
1717    * The zone iteration handle
1718    */
1719   struct ZoneIteration *zi;
1720
1721   /**
1722    * Number of results left to be returned in this iteration.
1723    */
1724   uint64_t limit;
1725 };
1726
1727
1728 /**
1729  * Process results for zone iteration from database
1730  *
1731  * @param cls struct ZoneIterationProcResult
1732  * @param seq sequence number of the record, MUST NOT BE ZERO
1733  * @param zone_key the zone key
1734  * @param name name
1735  * @param rd_count number of records for this name
1736  * @param rd record data
1737  */
1738 static void
1739 zone_iterate_proc (void *cls,
1740                    uint64_t seq,
1741                    const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1742                    const char *name,
1743                    unsigned int rd_count,
1744                    const struct GNUNET_GNSRECORD_Data *rd)
1745 {
1746   struct ZoneIterationProcResult *proc = cls;
1747   int do_refresh_block;
1748
1749   GNUNET_assert (0 != seq);
1750   if ((NULL == zone_key) && (NULL == name))
1751   {
1752     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration done\n");
1753     return;
1754   }
1755   if ((NULL == zone_key) || (NULL == name))
1756   {
1757     /* what is this!? should never happen */
1758     GNUNET_break (0);
1759     return;
1760   }
1761   if (0 == proc->limit)
1762   {
1763     /* what is this!? should never happen */
1764     GNUNET_break (0);
1765     return;
1766   }
1767   proc->limit--;
1768   proc->zi->seq = seq;
1769   send_lookup_response (proc->zi->nc,
1770                         proc->zi->request_id,
1771                         zone_key,
1772                         name,
1773                         rd_count,
1774                         rd);
1775
1776
1777   do_refresh_block = GNUNET_NO;
1778   for (unsigned int i = 0; i < rd_count; i++)
1779     if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
1780     {
1781       do_refresh_block = GNUNET_YES;
1782       break;
1783     }
1784   if (GNUNET_YES == do_refresh_block)
1785     refresh_block (NULL, proc->zi, 0, zone_key, name, rd_count, rd);
1786 }
1787
1788
1789 /**
1790  * Perform the next round of the zone iteration.
1791  *
1792  * @param zi zone iterator to process
1793  * @param limit number of results to return in one pass
1794  */
1795 static void
1796 run_zone_iteration_round (struct ZoneIteration *zi, uint64_t limit)
1797 {
1798   struct ZoneIterationProcResult proc;
1799   struct GNUNET_TIME_Absolute start;
1800   struct GNUNET_TIME_Relative duration;
1801
1802   memset (&proc, 0, sizeof(proc));
1803   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1804               "Asked to return up to %llu records at position %llu\n",
1805               (unsigned long long) limit,
1806               (unsigned long long) zi->seq);
1807   proc.zi = zi;
1808   proc.limit = limit;
1809   start = GNUNET_TIME_absolute_get ();
1810   GNUNET_break (GNUNET_SYSERR !=
1811                 GSN_database->iterate_records (GSN_database->cls,
1812                                                (0 == GNUNET_is_zero (&zi->zone))
1813                                                ? NULL
1814                                                : &zi->zone,
1815                                                zi->seq,
1816                                                limit,
1817                                                &zone_iterate_proc,
1818                                                &proc));
1819   duration = GNUNET_TIME_absolute_get_duration (start);
1820   duration = GNUNET_TIME_relative_divide (duration, limit - proc.limit);
1821   GNUNET_STATISTICS_set (statistics,
1822                          "NAMESTORE iteration delay (μs/record)",
1823                          duration.rel_value_us,
1824                          GNUNET_NO);
1825   if (0 == proc.limit)
1826     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1827                 "Returned %llu results, more results available\n",
1828                 (unsigned long long) limit);
1829   zi->send_end = (0 != proc.limit);
1830   if (0 == zi->cache_ops)
1831     zone_iteration_done_client_continue (zi);
1832 }
1833
1834
1835 /**
1836  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START message
1837  *
1838  * @param cls the client sending the message
1839  * @param zis_msg message from the client
1840  */
1841 static void
1842 handle_iteration_start (void *cls,
1843                         const struct ZoneIterationStartMessage *zis_msg)
1844 {
1845   struct NamestoreClient *nc = cls;
1846   struct ZoneIteration *zi;
1847
1848   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1849               "Received ZONE_ITERATION_START message\n");
1850   zi = GNUNET_new (struct ZoneIteration);
1851   zi->request_id = ntohl (zis_msg->gns_header.r_id);
1852   zi->offset = 0;
1853   zi->nc = nc;
1854   zi->zone = zis_msg->zone;
1855
1856   GNUNET_CONTAINER_DLL_insert (nc->op_head, nc->op_tail, zi);
1857   run_zone_iteration_round (zi, 1);
1858 }
1859
1860
1861 /**
1862  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP message
1863  *
1864  * @param cls the client sending the message
1865  * @param zis_msg message from the client
1866  */
1867 static void
1868 handle_iteration_stop (void *cls,
1869                        const struct ZoneIterationStopMessage *zis_msg)
1870 {
1871   struct NamestoreClient *nc = cls;
1872   struct ZoneIteration *zi;
1873   uint32_t rid;
1874
1875   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1876               "Received ZONE_ITERATION_STOP message\n");
1877   rid = ntohl (zis_msg->gns_header.r_id);
1878   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1879     if (zi->request_id == rid)
1880       break;
1881   if (NULL == zi)
1882   {
1883     GNUNET_break (0);
1884     GNUNET_SERVICE_client_drop (nc->client);
1885     return;
1886   }
1887   GNUNET_CONTAINER_DLL_remove (nc->op_head, nc->op_tail, zi);
1888   GNUNET_free (zi);
1889   GNUNET_SERVICE_client_continue (nc->client);
1890 }
1891
1892
1893 /**
1894  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message
1895  *
1896  * @param cls the client sending the message
1897  * @param message message from the client
1898  */
1899 static void
1900 handle_iteration_next (void *cls,
1901                        const struct ZoneIterationNextMessage *zis_msg)
1902 {
1903   struct NamestoreClient *nc = cls;
1904   struct ZoneIteration *zi;
1905   uint32_t rid;
1906   uint64_t limit;
1907
1908   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1909               "Received ZONE_ITERATION_NEXT message\n");
1910   GNUNET_STATISTICS_update (statistics,
1911                             "Iteration NEXT messages received",
1912                             1,
1913                             GNUNET_NO);
1914   rid = ntohl (zis_msg->gns_header.r_id);
1915   limit = GNUNET_ntohll (zis_msg->limit);
1916   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1917     if (zi->request_id == rid)
1918       break;
1919   if (NULL == zi)
1920   {
1921     GNUNET_break (0);
1922     GNUNET_SERVICE_client_drop (nc->client);
1923     return;
1924   }
1925   run_zone_iteration_round (zi, limit);
1926 }
1927
1928
1929 /**
1930  * Function called when the monitor is ready for more data, and we
1931  * should thus unblock PUT operations that were blocked on the
1932  * monitor not being ready.
1933  */
1934 static void
1935 monitor_unblock (struct ZoneMonitor *zm)
1936 {
1937   struct StoreActivity *sa = sa_head;
1938
1939   while ((NULL != sa) && (zm->limit > zm->iteration_cnt))
1940   {
1941     struct StoreActivity *sn = sa->next;
1942
1943     if (sa->zm_pos == zm)
1944       continue_store_activity (sa);
1945     sa = sn;
1946   }
1947   if (zm->limit > zm->iteration_cnt)
1948   {
1949     zm->sa_waiting = GNUNET_NO;
1950     if (NULL != zm->sa_wait_warning)
1951     {
1952       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1953       zm->sa_wait_warning = NULL;
1954     }
1955   }
1956   else if (GNUNET_YES == zm->sa_waiting)
1957   {
1958     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
1959     if (NULL != zm->sa_wait_warning)
1960       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1961     zm->sa_wait_warning =
1962       GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1963                                     &warn_monitor_slow,
1964                                     zm);
1965   }
1966 }
1967
1968
1969 /**
1970  * Send 'sync' message to zone monitor, we're now in sync.
1971  *
1972  * @param zm monitor that is now in sync
1973  */
1974 static void
1975 monitor_sync (struct ZoneMonitor *zm)
1976 {
1977   struct GNUNET_MQ_Envelope *env;
1978   struct GNUNET_MessageHeader *sync;
1979
1980   env = GNUNET_MQ_msg (sync, GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC);
1981   GNUNET_MQ_send (zm->nc->mq, env);
1982   /* mark iteration done */
1983   zm->in_first_iteration = GNUNET_NO;
1984   zm->iteration_cnt = 0;
1985   if ((zm->limit > 0) && (zm->sa_waiting))
1986     monitor_unblock (zm);
1987 }
1988
1989
1990 /**
1991  * Obtain the next datum during the zone monitor's zone initial iteration.
1992  *
1993  * @param cls zone monitor that does its initial iteration
1994  */
1995 static void
1996 monitor_iteration_next (void *cls);
1997
1998
1999 /**
2000  * A #GNUNET_NAMESTORE_RecordIterator for monitors.
2001  *
2002  * @param cls a 'struct ZoneMonitor *' with information about the monitor
2003  * @param seq sequence number of the record, MUST NOT BE ZERO
2004  * @param zone_key zone key of the zone
2005  * @param name name
2006  * @param rd_count number of records in @a rd
2007  * @param rd array of records
2008  */
2009 static void
2010 monitor_iterate_cb (void *cls,
2011                     uint64_t seq,
2012                     const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
2013                     const char *name,
2014                     unsigned int rd_count,
2015                     const struct GNUNET_GNSRECORD_Data *rd)
2016 {
2017   struct ZoneMonitor *zm = cls;
2018
2019   GNUNET_assert (0 != seq);
2020   zm->seq = seq;
2021   GNUNET_assert (NULL != name);
2022   GNUNET_STATISTICS_update (statistics,
2023                             "Monitor notifications sent",
2024                             1,
2025                             GNUNET_NO);
2026   zm->limit--;
2027   zm->iteration_cnt--;
2028   send_lookup_response (zm->nc, 0, zone_key, name, rd_count, rd);
2029   if ((0 == zm->iteration_cnt) && (0 != zm->limit))
2030   {
2031     /* We are done with the current iteration batch, AND the
2032        client would right now accept more, so go again! */
2033     GNUNET_assert (NULL == zm->task);
2034     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, zm);
2035   }
2036 }
2037
2038
2039 /**
2040  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START message
2041  *
2042  * @param cls the client sending the message
2043  * @param zis_msg message from the client
2044  */
2045 static void
2046 handle_monitor_start (void *cls, const struct ZoneMonitorStartMessage *zis_msg)
2047 {
2048   struct NamestoreClient *nc = cls;
2049   struct ZoneMonitor *zm;
2050
2051   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ZONE_MONITOR_START message\n");
2052   zm = GNUNET_new (struct ZoneMonitor);
2053   zm->nc = nc;
2054   zm->zone = zis_msg->zone;
2055   zm->limit = 1;
2056   zm->in_first_iteration = (GNUNET_YES == ntohl (zis_msg->iterate_first));
2057   GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, zm);
2058   GNUNET_SERVICE_client_mark_monitor (nc->client);
2059   GNUNET_SERVICE_client_continue (nc->client);
2060   GNUNET_notification_context_add (monitor_nc, nc->mq);
2061   if (zm->in_first_iteration)
2062     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, zm);
2063   else
2064     monitor_sync (zm);
2065 }
2066
2067
2068 /**
2069  * Obtain the next datum during the zone monitor's zone initial iteration.
2070  *
2071  * @param cls zone monitor that does its initial iteration
2072  */
2073 static void
2074 monitor_iteration_next (void *cls)
2075 {
2076   struct ZoneMonitor *zm = cls;
2077   int ret;
2078
2079   zm->task = NULL;
2080   GNUNET_assert (0 == zm->iteration_cnt);
2081   if (zm->limit > 16)
2082     zm->iteration_cnt = zm->limit / 2; /* leave half for monitor events */
2083   else
2084     zm->iteration_cnt = zm->limit; /* use it all */
2085   ret = GSN_database->iterate_records (GSN_database->cls,
2086                                        (0 == GNUNET_is_zero (&zm->zone))
2087                                        ? NULL
2088                                        : &zm->zone,
2089                                        zm->seq,
2090                                        zm->iteration_cnt,
2091                                        &monitor_iterate_cb,
2092                                        zm);
2093   if (GNUNET_SYSERR == ret)
2094   {
2095     GNUNET_SERVICE_client_drop (zm->nc->client);
2096     return;
2097   }
2098   if (GNUNET_NO == ret)
2099   {
2100     /* empty zone */
2101     monitor_sync (zm);
2102     return;
2103   }
2104 }
2105
2106
2107 /**
2108  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT message
2109  *
2110  * @param cls the client sending the message
2111  * @param nm message from the client
2112  */
2113 static void
2114 handle_monitor_next (void *cls, const struct ZoneMonitorNextMessage *nm)
2115 {
2116   struct NamestoreClient *nc = cls;
2117   struct ZoneMonitor *zm;
2118   uint64_t inc;
2119
2120   inc = GNUNET_ntohll (nm->limit);
2121   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2122               "Received ZONE_MONITOR_NEXT message with limit %llu\n",
2123               (unsigned long long) inc);
2124   for (zm = monitor_head; NULL != zm; zm = zm->next)
2125     if (zm->nc == nc)
2126       break;
2127   if (NULL == zm)
2128   {
2129     GNUNET_break (0);
2130     GNUNET_SERVICE_client_drop (nc->client);
2131     return;
2132   }
2133   GNUNET_SERVICE_client_continue (nc->client);
2134   if (zm->limit + inc < zm->limit)
2135   {
2136     GNUNET_break (0);
2137     GNUNET_SERVICE_client_drop (nc->client);
2138     return;
2139   }
2140   zm->limit += inc;
2141   if ((zm->in_first_iteration) && (zm->limit == inc))
2142   {
2143     /* We are still iterating, and the previous iteration must
2144        have stopped due to the client's limit, so continue it! */
2145     GNUNET_assert (NULL == zm->task);
2146     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, zm);
2147   }
2148   GNUNET_assert (zm->iteration_cnt <= zm->limit);
2149   if ((zm->limit > zm->iteration_cnt) && (zm->sa_waiting))
2150   {
2151     monitor_unblock (zm);
2152   }
2153   else if (GNUNET_YES == zm->sa_waiting)
2154   {
2155     if (NULL != zm->sa_wait_warning)
2156       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2157     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2158     zm->sa_wait_warning =
2159       GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2160                                     &warn_monitor_slow,
2161                                     zm);
2162   }
2163 }
2164
2165
2166 /**
2167  * Process namestore requests.
2168  *
2169  * @param cls closure
2170  * @param cfg configuration to use
2171  * @param service the initialized service
2172  */
2173 static void
2174 run (void *cls,
2175      const struct GNUNET_CONFIGURATION_Handle *cfg,
2176      struct GNUNET_SERVICE_Handle *service)
2177 {
2178   char *database;
2179
2180   (void) cls;
2181   (void) service;
2182   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting namestore service\n");
2183   cache_keys =
2184     GNUNET_CONFIGURATION_get_value_yesno (cfg, "namestore", "CACHE_KEYS");
2185   disable_namecache =
2186     GNUNET_CONFIGURATION_get_value_yesno (cfg, "namecache", "DISABLE");
2187   GSN_cfg = cfg;
2188   monitor_nc = GNUNET_notification_context_create (1);
2189   if (GNUNET_YES != disable_namecache)
2190   {
2191     namecache = GNUNET_NAMECACHE_connect (cfg);
2192     GNUNET_assert (NULL != namecache);
2193   }
2194   /* Loading database plugin */
2195   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg,
2196                                                           "namestore",
2197                                                           "database",
2198                                                           &database))
2199     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No database backend configured\n");
2200
2201   GNUNET_asprintf (&db_lib_name, "libgnunet_plugin_namestore_%s", database);
2202   GSN_database = GNUNET_PLUGIN_load (db_lib_name, (void *) GSN_cfg);
2203   GNUNET_free (database);
2204   statistics = GNUNET_STATISTICS_create ("namestore", cfg);
2205   GNUNET_SCHEDULER_add_shutdown (&cleanup_task, NULL);
2206   if (NULL == GSN_database)
2207   {
2208     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2209                 "Could not load database backend `%s'\n",
2210                 db_lib_name);
2211     GNUNET_SCHEDULER_shutdown ();
2212     return;
2213   }
2214 }
2215
2216
2217 /**
2218  * Define "main" method using service macro.
2219  */
2220 GNUNET_SERVICE_MAIN (
2221   "namestore",
2222   GNUNET_SERVICE_OPTION_NONE,
2223   &run,
2224   &client_connect_cb,
2225   &client_disconnect_cb,
2226   NULL,
2227   GNUNET_MQ_hd_var_size (record_store,
2228                          GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE,
2229                          struct RecordStoreMessage,
2230                          NULL),
2231   GNUNET_MQ_hd_var_size (record_lookup,
2232                          GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP,
2233                          struct LabelLookupMessage,
2234                          NULL),
2235   GNUNET_MQ_hd_fixed_size (zone_to_name,
2236                            GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME,
2237                            struct ZoneToNameMessage,
2238                            NULL),
2239   GNUNET_MQ_hd_fixed_size (iteration_start,
2240                            GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START,
2241                            struct ZoneIterationStartMessage,
2242                            NULL),
2243   GNUNET_MQ_hd_fixed_size (iteration_next,
2244                            GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT,
2245                            struct ZoneIterationNextMessage,
2246                            NULL),
2247   GNUNET_MQ_hd_fixed_size (iteration_stop,
2248                            GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP,
2249                            struct ZoneIterationStopMessage,
2250                            NULL),
2251   GNUNET_MQ_hd_fixed_size (monitor_start,
2252                            GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START,
2253                            struct ZoneMonitorStartMessage,
2254                            NULL),
2255   GNUNET_MQ_hd_fixed_size (monitor_next,
2256                            GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT,
2257                            struct ZoneMonitorNextMessage,
2258                            NULL),
2259   GNUNET_MQ_handler_end ());
2260
2261
2262 /* end of gnunet-service-namestore.c */