global reindent, now with uncrustify hook enabled
[oweals/gnunet.git] / src / namestore / gnunet-service-namestore.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file namestore/gnunet-service-namestore.c
23  * @brief namestore for the GNUnet naming system
24  * @author Matthias Wachs
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_dnsparser_lib.h"
30 #include "gnunet_gns_service.h"
31 #include "gnunet_namecache_service.h"
32 #include "gnunet_namestore_service.h"
33 #include "gnunet_namestore_plugin.h"
34 #include "gnunet_statistics_service.h"
35 #include "gnunet_signatures.h"
36 #include "namestore.h"
37
38 #define LOG_STRERROR_FILE(kind, syscall, filename) \
39   GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
40
41 /**
42  * If a monitor takes more than 1 minute to process an event, print a warning.
43  */
44 #define MONITOR_STALL_WARN_DELAY GNUNET_TIME_UNIT_MINUTES
45
46 /**
47  * Size of the cache used by #get_nick_record()
48  */
49 #define NC_SIZE 16
50
51 /**
52  * A namestore client
53  */
54 struct NamestoreClient;
55
56
57 /**
58  * A namestore iteration operation.
59  */
60 struct ZoneIteration
61 {
62   /**
63    * Next element in the DLL
64    */
65   struct ZoneIteration *next;
66
67   /**
68    * Previous element in the DLL
69    */
70   struct ZoneIteration *prev;
71
72   /**
73    * Namestore client which intiated this zone iteration
74    */
75   struct NamestoreClient *nc;
76
77   /**
78    * The nick to add to the records
79    */
80   struct GNUNET_GNSRECORD_Data *nick;
81
82   /**
83    * Key of the zone we are iterating over.
84    */
85   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
86
87   /**
88    * Last sequence number in the zone iteration used to address next
89    * result of the zone iteration in the store
90    *
91    * Initialy set to 0.
92    * Updated in #zone_iterate_proc()
93    */
94   uint64_t seq;
95
96   /**
97    * The operation id fot the zone iteration in the response for the client
98    */
99   uint32_t request_id;
100
101   /**
102    * Offset of the zone iteration used to address next result of the zone
103    * iteration in the store
104    *
105    * Initialy set to 0 in #handle_iteration_start
106    * Incremented with by every call to #handle_iteration_next
107    */
108   uint32_t offset;
109
110   /**
111    * Number of pending cache operations triggered by this zone iteration which we
112    * need to wait for before allowing the client to continue.
113    */
114   unsigned int cache_ops;
115
116   /**
117    * Set to #GNUNET_YES if the last iteration exhausted the limit set by the
118    * client and we should send the #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END
119    * message and free the data structure once @e cache_ops is zero.
120    */
121   int send_end;
122 };
123
124
125 /**
126  * A namestore client
127  */
128 struct NamestoreClient
129 {
130   /**
131    * The client
132    */
133   struct GNUNET_SERVICE_Client *client;
134
135   /**
136    * Message queue for transmission to @e client
137    */
138   struct GNUNET_MQ_Handle *mq;
139
140   /**
141    * Head of the DLL of
142    * Zone iteration operations in progress initiated by this client
143    */
144   struct ZoneIteration *op_head;
145
146   /**
147    * Tail of the DLL of
148    * Zone iteration operations in progress initiated by this client
149    */
150   struct ZoneIteration *op_tail;
151 };
152
153
154 /**
155  * A namestore monitor.
156  */
157 struct ZoneMonitor
158 {
159   /**
160    * Next element in the DLL
161    */
162   struct ZoneMonitor *next;
163
164   /**
165    * Previous element in the DLL
166    */
167   struct ZoneMonitor *prev;
168
169   /**
170    * Namestore client which intiated this zone monitor
171    */
172   struct NamestoreClient *nc;
173
174   /**
175    * Private key of the zone.
176    */
177   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
178
179   /**
180    * Task active during initial iteration.
181    */
182   struct GNUNET_SCHEDULER_Task *task;
183
184   /**
185    * Task to warn about slow monitors.
186    */
187   struct GNUNET_SCHEDULER_Task *sa_wait_warning;
188
189   /**
190    * Since when are we blocked on this monitor?
191    */
192   struct GNUNET_TIME_Absolute sa_waiting_start;
193
194   /**
195    * Last sequence number in the zone iteration used to address next
196    * result of the zone iteration in the store
197    *
198    * Initialy set to 0.
199    * Updated in #monitor_iterate_cb()
200    */
201   uint64_t seq;
202
203   /**
204    * Current limit of how many more messages we are allowed
205    * to queue to this monitor.
206    */
207   uint64_t limit;
208
209   /**
210    * How many more requests may we receive from the iterator
211    * before it is at the limit we gave it?  Will be below or
212    * equal to @e limit.  The effective limit for monitor
213    * events is thus @e iteration_cnt - @e limit!
214    */
215   uint64_t iteration_cnt;
216
217   /**
218    * Are we (still) in the initial iteration pass?
219    */
220   int in_first_iteration;
221
222   /**
223    * Is there a store activity waiting for this monitor?  We only raise the
224    * flag when it happens and search the DLL for the store activity when we
225    * had a limit increase.  If we cannot find any waiting store activity at
226    * that time, we clear the flag again.
227    */
228   int sa_waiting;
229 };
230
231
232 /**
233  * Pending operation on the namecache.
234  */
235 struct CacheOperation
236 {
237   /**
238    * Kept in a DLL.
239    */
240   struct CacheOperation *prev;
241
242   /**
243    * Kept in a DLL.
244    */
245   struct CacheOperation *next;
246
247   /**
248    * Handle to namecache queue.
249    */
250   struct GNUNET_NAMECACHE_QueueEntry *qe;
251
252   /**
253    * Client to notify about the result, can be NULL.
254    */
255   struct NamestoreClient *nc;
256
257   /**
258    * Zone iteration to call #zone_iteration_done_client_continue()
259    * for if applicable, can be NULL.
260    */
261   struct ZoneIteration *zi;
262
263   /**
264    * Client's request ID.
265    */
266   uint32_t rid;
267 };
268
269
270 /**
271  * Information for an ongoing #handle_record_store() operation.
272  * Needed as we may wait for monitors to be ready for the notification.
273  */
274 struct StoreActivity
275 {
276   /**
277    * Kept in a DLL.
278    */
279   struct StoreActivity *next;
280
281   /**
282    * Kept in a DLL.
283    */
284   struct StoreActivity *prev;
285
286   /**
287    * Which client triggered the store activity?
288    */
289   struct NamestoreClient *nc;
290
291   /**
292    * Copy of the original store message (as data fields in @e rd will
293    * point into it!).
294    */
295   const struct RecordStoreMessage *rsm;
296
297   /**
298    * Next zone monitor that still needs to be notified about this PUT.
299    */
300   struct ZoneMonitor *zm_pos;
301
302   /**
303    * Label nicely canonicalized (lower case).
304    */
305   char *conv_name;
306 };
307
308
309 /**
310  * Entry in list of cached nick resolutions.
311  */
312 struct NickCache
313 {
314   /**
315    * Zone the cache entry is for.
316    */
317   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
318
319   /**
320    * Cached record data.
321    */
322   struct GNUNET_GNSRECORD_Data *rd;
323
324   /**
325    * Timestamp when this cache entry was used last.
326    */
327   struct GNUNET_TIME_Absolute last_used;
328 };
329
330
331 /**
332  * We cache nick records to reduce DB load.
333  */
334 static struct NickCache nick_cache[NC_SIZE];
335
336 /**
337  * Public key of all zeros.
338  */
339 static const struct GNUNET_CRYPTO_EcdsaPrivateKey zero;
340
341 /**
342  * Configuration handle.
343  */
344 static const struct GNUNET_CONFIGURATION_Handle *GSN_cfg;
345
346 /**
347  * Handle to the statistics service
348  */
349 static struct GNUNET_STATISTICS_Handle *statistics;
350
351 /**
352  * Namecache handle.
353  */
354 static struct GNUNET_NAMECACHE_Handle *namecache;
355
356 /**
357  * Database handle
358  */
359 static struct GNUNET_NAMESTORE_PluginFunctions *GSN_database;
360
361 /**
362  * Name of the database plugin
363  */
364 static char *db_lib_name;
365
366 /**
367  * Head of cop DLL.
368  */
369 static struct CacheOperation *cop_head;
370
371 /**
372  * Tail of cop DLL.
373  */
374 static struct CacheOperation *cop_tail;
375
376 /**
377  * First active zone monitor.
378  */
379 static struct ZoneMonitor *monitor_head;
380
381 /**
382  * Last active zone monitor.
383  */
384 static struct ZoneMonitor *monitor_tail;
385
386 /**
387  * Head of DLL of monitor-blocked store activities.
388  */
389 static struct StoreActivity *sa_head;
390
391 /**
392  * Tail of DLL of monitor-blocked store activities.
393  */
394 static struct StoreActivity *sa_tail;
395
396 /**
397  * Notification context shared by all monitors.
398  */
399 static struct GNUNET_NotificationContext *monitor_nc;
400
401 /**
402  * Optimize block insertion by caching map of private keys to
403  * public keys in memory?
404  */
405 static int cache_keys;
406
407 /**
408  * Use the namecache? Doing so creates additional cryptographic
409  * operations whenever we touch a record.
410  */
411 static int disable_namecache;
412
413
414 /**
415  * Task run during shutdown.
416  *
417  * @param cls unused
418  */
419 static void
420 cleanup_task (void *cls)
421 {
422   struct CacheOperation *cop;
423
424   (void) cls;
425   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stopping namestore service\n");
426   while (NULL != (cop = cop_head))
427   {
428     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
429                 "Aborting incomplete namecache operation\n");
430     GNUNET_NAMECACHE_cancel (cop->qe);
431     GNUNET_CONTAINER_DLL_remove (cop_head, cop_tail, cop);
432     GNUNET_free (cop);
433   }
434   if (NULL != namecache)
435   {
436     GNUNET_NAMECACHE_disconnect (namecache);
437     namecache = NULL;
438   }
439   GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name, GSN_database));
440   GNUNET_free (db_lib_name);
441   db_lib_name = NULL;
442   if (NULL != monitor_nc)
443   {
444     GNUNET_notification_context_destroy (monitor_nc);
445     monitor_nc = NULL;
446   }
447   if (NULL != statistics)
448   {
449     GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
450     statistics = NULL;
451   }
452 }
453
454
455 /**
456  * Release memory used by @a sa.
457  *
458  * @param sa activity to free
459  */
460 static void
461 free_store_activity (struct StoreActivity *sa)
462 {
463   GNUNET_CONTAINER_DLL_remove (sa_head, sa_tail, sa);
464   GNUNET_free (sa->conv_name);
465   GNUNET_free (sa);
466 }
467
468
469 /**
470  * Function called with the records for the #GNUNET_GNS_EMPTY_LABEL_AT
471  * label in the zone.  Used to locate the #GNUNET_GNSRECORD_TYPE_NICK
472  * record, which (if found) is then copied to @a cls for future use.
473  *
474  * @param cls a `struct GNUNET_GNSRECORD_Data **` for storing the nick (if found)
475  * @param seq sequence number of the record, MUST NOT BE ZERO
476  * @param private_key the private key of the zone (unused)
477  * @param label should be #GNUNET_GNS_EMPTY_LABEL_AT
478  * @param rd_count number of records in @a rd
479  * @param rd records stored under @a label in the zone
480  */
481 static void
482 lookup_nick_it (void *cls,
483                 uint64_t seq,
484                 const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
485                 const char *label,
486                 unsigned int rd_count,
487                 const struct GNUNET_GNSRECORD_Data *rd)
488 {
489   struct GNUNET_GNSRECORD_Data **res = cls;
490
491   (void) private_key;
492   GNUNET_assert (0 != seq);
493   if (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT))
494   {
495     GNUNET_break (0);
496     return;
497   }
498   for (unsigned int c = 0; c < rd_count; c++)
499   {
500     if (GNUNET_GNSRECORD_TYPE_NICK == rd[c].record_type)
501     {
502       (*res) =
503         GNUNET_malloc (rd[c].data_size + sizeof(struct GNUNET_GNSRECORD_Data));
504       (*res)->data = &(*res)[1];
505       GNUNET_memcpy ((void *) (*res)->data, rd[c].data, rd[c].data_size);
506       (*res)->data_size = rd[c].data_size;
507       (*res)->expiration_time = rd[c].expiration_time;
508       (*res)->flags = rd[c].flags;
509       (*res)->record_type = GNUNET_GNSRECORD_TYPE_NICK;
510       return;
511     }
512   }
513   (*res) = NULL;
514 }
515
516
517 /**
518  * Add entry to the cache for @a zone and @a nick
519  *
520  * @param zone zone key to cache under
521  * @param nick nick entry to cache
522  */
523 static void
524 cache_nick (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone,
525             const struct GNUNET_GNSRECORD_Data *nick)
526 {
527   struct NickCache *oldest;
528
529   oldest = NULL;
530   for (unsigned int i = 0; i < NC_SIZE; i++)
531   {
532     struct NickCache *pos = &nick_cache[i];
533
534     if ((NULL == oldest) ||
535         (oldest->last_used.abs_value_us > pos->last_used.abs_value_us))
536       oldest = pos;
537     if (0 == GNUNET_memcmp (zone, &pos->zone))
538     {
539       oldest = pos;
540       break;
541     }
542   }
543   GNUNET_free_non_null (oldest->rd);
544   oldest->zone = *zone;
545   if (NULL != nick)
546   {
547     oldest->rd = GNUNET_malloc (sizeof(*nick) + nick->data_size);
548     *oldest->rd = *nick;
549     oldest->rd->data = &oldest->rd[1];
550     memcpy (&oldest->rd[1], nick->data, nick->data_size);
551   }
552   else
553   {
554     oldest->rd = NULL;
555   }
556   oldest->last_used = GNUNET_TIME_absolute_get ();
557 }
558
559
560 /**
561  * Return the NICK record for the zone (if it exists).
562  *
563  * @param zone private key for the zone to look for nick
564  * @return NULL if no NICK record was found
565  */
566 static struct GNUNET_GNSRECORD_Data *
567 get_nick_record (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone)
568 {
569   struct GNUNET_CRYPTO_EcdsaPublicKey pub;
570   struct GNUNET_GNSRECORD_Data *nick;
571   int res;
572
573   /* check cache first */
574   for (unsigned int i = 0; i < NC_SIZE; i++)
575   {
576     struct NickCache *pos = &nick_cache[i];
577     if ((NULL != pos->rd) && (0 == GNUNET_memcmp (zone, &pos->zone)))
578     {
579       if (NULL == pos->rd)
580         return NULL;
581       nick = GNUNET_malloc (sizeof(*nick) + pos->rd->data_size);
582       *nick = *pos->rd;
583       nick->data = &nick[1];
584       memcpy (&nick[1], pos->rd->data, pos->rd->data_size);
585       pos->last_used = GNUNET_TIME_absolute_get ();
586       return nick;
587     }
588   }
589
590   nick = NULL;
591   res = GSN_database->lookup_records (GSN_database->cls,
592                                       zone,
593                                       GNUNET_GNS_EMPTY_LABEL_AT,
594                                       &lookup_nick_it,
595                                       &nick);
596   if ((GNUNET_OK != res) || (NULL == nick))
597   {
598 #if ! defined(GNUNET_CULL_LOGGING)
599     static int do_log = GNUNET_LOG_CALL_STATUS;
600
601     if (0 == do_log)
602       do_log = GNUNET_get_log_call_status (GNUNET_ERROR_TYPE_DEBUG,
603                                            "namestore",
604                                            __FILE__,
605                                            __FUNCTION__,
606                                            __LINE__);
607     if (1 == do_log)
608     {
609       GNUNET_CRYPTO_ecdsa_key_get_public (zone, &pub);
610       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
611                   "No nick name set for zone `%s'\n",
612                   GNUNET_GNSRECORD_z2s (&pub));
613     }
614 #endif
615     /* update cache */
616     cache_nick (zone, NULL);
617     return NULL;
618   }
619
620   /* update cache */
621   cache_nick (zone, nick);
622   return nick;
623 }
624
625
626 /**
627  * Merge the nick record @a nick_rd with the rest of the
628  * record set given in @a rd2.  Store the result in @a rdc_res
629  * and @a rd_res.  The @a nick_rd's expiration time is set to
630  * the maximum expiration time of all of the records in @a rd2.
631  *
632  * @param nick_rd the nick record to integrate
633  * @param rd2_length length of the @a rd2 array
634  * @param rd2 array of records
635  * @param rdc_res[out] length of the resulting @a rd_res array
636  * @param rd_res[out] set to an array of records,
637  *                    including @a nick_rd and @a rd2;
638  *           all of the variable-size 'data' fields in @a rd2 are
639  *           allocated in the same chunk of memory!
640  */
641 static void
642 merge_with_nick_records (const struct GNUNET_GNSRECORD_Data *nick_rd,
643                          unsigned int rd2_length,
644                          const struct GNUNET_GNSRECORD_Data *rd2,
645                          unsigned int *rdc_res,
646                          struct GNUNET_GNSRECORD_Data **rd_res)
647 {
648   uint64_t latest_expiration;
649   size_t req;
650   char *data;
651   size_t data_offset;
652   struct GNUNET_GNSRECORD_Data *target;
653
654   (*rdc_res) = 1 + rd2_length;
655   if (0 == 1 + rd2_length)
656   {
657     GNUNET_break (0);
658     (*rd_res) = NULL;
659     return;
660   }
661   req = sizeof(struct GNUNET_GNSRECORD_Data) + nick_rd->data_size;
662   for (unsigned int i = 0; i < rd2_length; i++)
663   {
664     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
665
666     if (req + sizeof(struct GNUNET_GNSRECORD_Data) + orig->data_size < req)
667     {
668       GNUNET_break (0);
669       (*rd_res) = NULL;
670       return;
671     }
672     req += sizeof(struct GNUNET_GNSRECORD_Data) + orig->data_size;
673   }
674   target = GNUNET_malloc (req);
675   (*rd_res) = target;
676   data = (char *) &target[1 + rd2_length];
677   data_offset = 0;
678   latest_expiration = 0;
679   for (unsigned int i = 0; i < rd2_length; i++)
680   {
681     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
682
683     if (0 != (orig->flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
684     {
685       if ((GNUNET_TIME_absolute_get ().abs_value_us + orig->expiration_time) >
686           latest_expiration)
687         latest_expiration = orig->expiration_time;
688     }
689     else if (orig->expiration_time > latest_expiration)
690       latest_expiration = orig->expiration_time;
691     target[i] = *orig;
692     target[i].data = (void *) &data[data_offset];
693     GNUNET_memcpy (&data[data_offset], orig->data, orig->data_size);
694     data_offset += orig->data_size;
695   }
696   /* append nick */
697   target[rd2_length] = *nick_rd;
698   target[rd2_length].expiration_time = latest_expiration;
699   target[rd2_length].data = (void *) &data[data_offset];
700   GNUNET_memcpy (&data[data_offset], nick_rd->data, nick_rd->data_size);
701   data_offset += nick_rd->data_size;
702   GNUNET_assert (req == (sizeof(struct GNUNET_GNSRECORD_Data)) * (*rdc_res)
703                  + data_offset);
704 }
705
706
707 /**
708  * Generate a `struct LookupNameResponseMessage` and send it to the
709  * given client using the given notification context.
710  *
711  * @param nc client to unicast to
712  * @param request_id request ID to use
713  * @param zone_key zone key of the zone
714  * @param name name
715  * @param rd_count number of records in @a rd
716  * @param rd array of records
717  */
718 static void
719 send_lookup_response (struct NamestoreClient *nc,
720                       uint32_t request_id,
721                       const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
722                       const char *name,
723                       unsigned int rd_count,
724                       const struct GNUNET_GNSRECORD_Data *rd)
725 {
726   struct GNUNET_MQ_Envelope *env;
727   struct RecordResultMessage *zir_msg;
728   struct GNUNET_GNSRECORD_Data *nick;
729   struct GNUNET_GNSRECORD_Data *res;
730   unsigned int res_count;
731   size_t name_len;
732   ssize_t rd_ser_len;
733   char *name_tmp;
734   char *rd_ser;
735
736   nick = get_nick_record (zone_key);
737   GNUNET_assert (-1 != GNUNET_GNSRECORD_records_get_size (rd_count, rd));
738
739   if ((NULL != nick) && (0 != strcmp (name, GNUNET_GNS_EMPTY_LABEL_AT)))
740   {
741     nick->flags =
742       (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
743     merge_with_nick_records (nick, rd_count, rd, &res_count, &res);
744     GNUNET_free (nick);
745   }
746   else
747   {
748     res_count = rd_count;
749     res = (struct GNUNET_GNSRECORD_Data *) rd;
750   }
751
752   GNUNET_assert (-1 != GNUNET_GNSRECORD_records_get_size (res_count, res));
753
754
755   name_len = strlen (name) + 1;
756   rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count, res);
757   if (rd_ser_len < 0)
758   {
759     GNUNET_break (0);
760     GNUNET_SERVICE_client_drop (nc->client);
761     return;
762   }
763   if (((size_t) rd_ser_len) >= UINT16_MAX - name_len - sizeof(*zir_msg))
764   {
765     GNUNET_break (0);
766     GNUNET_SERVICE_client_drop (nc->client);
767     return;
768   }
769   env = GNUNET_MQ_msg_extra (zir_msg,
770                              name_len + rd_ser_len,
771                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
772   zir_msg->gns_header.r_id = htonl (request_id);
773   zir_msg->name_len = htons (name_len);
774   zir_msg->rd_count = htons (res_count);
775   zir_msg->rd_len = htons ((uint16_t) rd_ser_len);
776   zir_msg->private_key = *zone_key;
777   name_tmp = (char *) &zir_msg[1];
778   GNUNET_memcpy (name_tmp, name, name_len);
779   rd_ser = &name_tmp[name_len];
780   GNUNET_assert (
781     rd_ser_len ==
782     GNUNET_GNSRECORD_records_serialize (res_count, res, rd_ser_len, rd_ser));
783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784               "Sending RECORD_RESULT message with %u records\n",
785               res_count);
786   GNUNET_STATISTICS_update (statistics,
787                             "Record sets sent to clients",
788                             1,
789                             GNUNET_NO);
790   GNUNET_MQ_send (nc->mq, env);
791   if (rd != res)
792     GNUNET_free (res);
793 }
794
795
796 /**
797  * Send response to the store request to the client.
798  *
799  * @param client client to talk to
800  * @param res status of the operation
801  * @param rid client's request ID
802  */
803 static void
804 send_store_response (struct NamestoreClient *nc, int res, uint32_t rid)
805 {
806   struct GNUNET_MQ_Envelope *env;
807   struct RecordStoreResponseMessage *rcr_msg;
808
809   GNUNET_assert (NULL != nc);
810   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811               "Sending RECORD_STORE_RESPONSE message\n");
812   GNUNET_STATISTICS_update (statistics,
813                             "Store requests completed",
814                             1,
815                             GNUNET_NO);
816   env = GNUNET_MQ_msg (rcr_msg,
817                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE_RESPONSE);
818   rcr_msg->gns_header.r_id = htonl (rid);
819   rcr_msg->op_result = htonl (res);
820   GNUNET_MQ_send (nc->mq, env);
821 }
822
823
824 /**
825  * Function called once we are done with the zone iteration and
826  * allow the zone iteration client to send us more messages.
827  *
828  * @param zi zone iteration we are processing
829  */
830 static void
831 zone_iteration_done_client_continue (struct ZoneIteration *zi)
832 {
833   struct GNUNET_MQ_Envelope *env;
834   struct GNUNET_NAMESTORE_Header *em;
835
836   GNUNET_SERVICE_client_continue (zi->nc->client);
837   if (! zi->send_end)
838     return;
839   /* send empty response to indicate end of list */
840   env = GNUNET_MQ_msg (em, GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END);
841   em->r_id = htonl (zi->request_id);
842   GNUNET_MQ_send (zi->nc->mq, env);
843
844   GNUNET_CONTAINER_DLL_remove (zi->nc->op_head, zi->nc->op_tail, zi);
845   GNUNET_free (zi);
846 }
847
848
849 /**
850  * Cache operation complete, clean up.
851  *
852  * @param cls the `struct CacheOperation`
853  * @param success success
854  * @param emsg error messages
855  */
856 static void
857 finish_cache_operation (void *cls, int32_t success, const char *emsg)
858 {
859   struct CacheOperation *cop = cls;
860   struct ZoneIteration *zi;
861
862   if (NULL != emsg)
863     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
864                 _ ("Failed to replicate block in namecache: %s\n"),
865                 emsg);
866   else
867     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "CACHE operation completed\n");
868   GNUNET_CONTAINER_DLL_remove (cop_head, cop_tail, cop);
869   if (NULL != cop->nc)
870     send_store_response (cop->nc, success, cop->rid);
871   if (NULL != (zi = cop->zi))
872   {
873     zi->cache_ops--;
874     if (0 == zi->cache_ops)
875     {
876       /* unchoke zone iteration, cache has caught up */
877       zone_iteration_done_client_continue (zi);
878     }
879   }
880   GNUNET_free (cop);
881 }
882
883
884 /**
885  * We just touched the plaintext information about a name in our zone;
886  * refresh the corresponding (encrypted) block in the namecache.
887  *
888  * @param nc client responsible for the request, can be NULL
889  * @param zi zone iteration response for the request, can be NULL
890  * @param rid request ID of the client
891  * @param zone_key private key of the zone
892  * @param name label for the records
893  * @param rd_count number of records
894  * @param rd records stored under the given @a name
895  */
896 static void
897 refresh_block (struct NamestoreClient *nc,
898                struct ZoneIteration *zi,
899                uint32_t rid,
900                const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
901                const char *name,
902                unsigned int rd_count,
903                const struct GNUNET_GNSRECORD_Data *rd)
904 {
905   struct GNUNET_GNSRECORD_Block *block;
906   struct CacheOperation *cop;
907   struct GNUNET_CRYPTO_EcdsaPublicKey pkey;
908   struct GNUNET_GNSRECORD_Data *nick;
909   struct GNUNET_GNSRECORD_Data *res;
910   unsigned int res_count;
911   struct GNUNET_TIME_Absolute exp_time;
912
913   nick = get_nick_record (zone_key);
914   res_count = rd_count;
915   res = (struct GNUNET_GNSRECORD_Data *) rd;  /* fixme: a bit unclean... */
916   if (NULL != nick)
917   {
918     nick->flags =
919       (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
920     merge_with_nick_records (nick, rd_count, rd, &res_count, &res);
921     GNUNET_free (nick);
922   }
923   if (0 == res_count)
924   {
925     if (NULL != nc)
926       send_store_response (nc, GNUNET_OK, rid);
927     return;   /* no data, no need to update cache */
928   }
929   if (GNUNET_YES == disable_namecache)
930   {
931     GNUNET_STATISTICS_update (statistics,
932                               "Namecache updates skipped (NC disabled)",
933                               1,
934                               GNUNET_NO);
935     if (NULL != nc)
936       send_store_response (nc, GNUNET_OK, rid);
937     return;
938   }
939   exp_time = GNUNET_GNSRECORD_record_get_expiration_time (res_count, res);
940   if (cache_keys)
941     block =
942       GNUNET_GNSRECORD_block_create2 (zone_key, exp_time, name, res, res_count);
943   else
944     block =
945       GNUNET_GNSRECORD_block_create (zone_key, exp_time, name, res, res_count);
946   GNUNET_assert (NULL != block);
947   GNUNET_CRYPTO_ecdsa_key_get_public (zone_key, &pkey);
948   GNUNET_log (
949     GNUNET_ERROR_TYPE_DEBUG,
950     "Caching block for label `%s' with %u records and expiration %s in zone `%s' in namecache\n",
951     name,
952     res_count,
953     GNUNET_STRINGS_absolute_time_to_string (exp_time),
954     GNUNET_GNSRECORD_z2s (&pkey));
955   GNUNET_STATISTICS_update (statistics,
956                             "Namecache updates pushed",
957                             1,
958                             GNUNET_NO);
959   cop = GNUNET_new (struct CacheOperation);
960   cop->nc = nc;
961   cop->zi = zi;
962   if (NULL != zi)
963     zi->cache_ops++;
964   cop->rid = rid;
965   GNUNET_CONTAINER_DLL_insert (cop_head, cop_tail, cop);
966   cop->qe = GNUNET_NAMECACHE_block_cache (namecache,
967                                           block,
968                                           &finish_cache_operation,
969                                           cop);
970   GNUNET_free (block);
971 }
972
973
974 /**
975  * Print a warning that one of our monitors is no longer reacting.
976  *
977  * @param cls a `struct ZoneMonitor` to warn about
978  */
979 static void
980 warn_monitor_slow (void *cls)
981 {
982   struct ZoneMonitor *zm = cls;
983
984   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
985               "No response from monitor since %s\n",
986               GNUNET_STRINGS_absolute_time_to_string (zm->sa_waiting_start));
987   zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
988                                                       &warn_monitor_slow,
989                                                       zm);
990 }
991
992
993 /**
994  * Continue processing the @a sa.
995  *
996  * @param sa store activity to process
997  */
998 static void
999 continue_store_activity (struct StoreActivity *sa)
1000 {
1001   const struct RecordStoreMessage *rp_msg = sa->rsm;
1002   unsigned int rd_count;
1003   size_t name_len;
1004   size_t rd_ser_len;
1005   uint32_t rid;
1006   const char *name_tmp;
1007   const char *rd_ser;
1008
1009   rid = ntohl (rp_msg->gns_header.r_id);
1010   name_len = ntohs (rp_msg->name_len);
1011   rd_count = ntohs (rp_msg->rd_count);
1012   rd_ser_len = ntohs (rp_msg->rd_len);
1013   name_tmp = (const char *) &rp_msg[1];
1014   rd_ser = &name_tmp[name_len];
1015   {
1016     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL (rd_count)];
1017
1018     /* We did this before, must succeed again */
1019     GNUNET_assert (
1020       GNUNET_OK ==
1021       GNUNET_GNSRECORD_records_deserialize (rd_ser_len, rd_ser, rd_count, rd));
1022
1023     for (struct ZoneMonitor *zm = sa->zm_pos; NULL != zm; zm = sa->zm_pos)
1024     {
1025       if ((0 != GNUNET_memcmp (&rp_msg->private_key, &zm->zone)) &&
1026           (0 != GNUNET_memcmp (&zm->zone, &zero)))
1027       {
1028         sa->zm_pos = zm->next;     /* not interesting to this monitor */
1029         continue;
1030       }
1031       if (zm->limit == zm->iteration_cnt)
1032       {
1033         zm->sa_waiting = GNUNET_YES;
1034         zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
1035         if (NULL != zm->sa_wait_warning)
1036           GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1037         zm->sa_wait_warning =
1038           GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1039                                         &warn_monitor_slow,
1040                                         zm);
1041         return;     /* blocked on zone monitor */
1042       }
1043       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044                   "Notifying monitor about changes under label `%s'\n",
1045                   sa->conv_name);
1046       zm->limit--;
1047       send_lookup_response (zm->nc,
1048                             0,
1049                             &rp_msg->private_key,
1050                             sa->conv_name,
1051                             rd_count,
1052                             rd);
1053       sa->zm_pos = zm->next;
1054     }
1055     /* great, done with the monitors, unpack (again) for refresh_block operation */
1056     refresh_block (sa->nc,
1057                    NULL,
1058                    rid,
1059                    &rp_msg->private_key,
1060                    sa->conv_name,
1061                    rd_count,
1062                    rd);
1063   }
1064   GNUNET_SERVICE_client_continue (sa->nc->client);
1065   free_store_activity (sa);
1066 }
1067
1068
1069 /**
1070  * Called whenever a client is disconnected.
1071  * Frees our resources associated with that client.
1072  *
1073  * @param cls closure
1074  * @param client identification of the client
1075  * @param app_ctx the `struct NamestoreClient` of @a client
1076  */
1077 static void
1078 client_disconnect_cb (void *cls,
1079                       struct GNUNET_SERVICE_Client *client,
1080                       void *app_ctx)
1081 {
1082   struct NamestoreClient *nc = app_ctx;
1083   struct ZoneIteration *no;
1084   struct CacheOperation *cop;
1085
1086   (void) cls;
1087   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
1088   for (struct ZoneMonitor *zm = monitor_head; NULL != zm; zm = zm->next)
1089   {
1090     struct StoreActivity *san;
1091
1092     if (nc != zm->nc)
1093       continue;
1094     GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, zm);
1095     if (NULL != zm->task)
1096     {
1097       GNUNET_SCHEDULER_cancel (zm->task);
1098       zm->task = NULL;
1099     }
1100     if (NULL != zm->sa_wait_warning)
1101     {
1102       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1103       zm->sa_wait_warning = NULL;
1104     }
1105     for (struct StoreActivity *sa = sa_head; NULL != sa; sa = san)
1106     {
1107       san = sa->next;
1108       if (zm == sa->zm_pos)
1109       {
1110         sa->zm_pos = zm->next;
1111         /* this may free sa */
1112         continue_store_activity (sa);
1113       }
1114     }
1115     GNUNET_free (zm);
1116     break;
1117   }
1118   for (struct StoreActivity *sa = sa_head; NULL != sa; sa = sa->next)
1119   {
1120     if (sa->nc == nc)
1121     {
1122       /* this may free sa */
1123       free_store_activity (sa);
1124       break;     /* there can only be one per nc */
1125     }
1126   }
1127   while (NULL != (no = nc->op_head))
1128   {
1129     GNUNET_CONTAINER_DLL_remove (nc->op_head, nc->op_tail, no);
1130     GNUNET_free (no);
1131   }
1132   for (cop = cop_head; NULL != cop; cop = cop->next)
1133     if (nc == cop->nc)
1134       cop->nc = NULL;
1135   GNUNET_free (nc);
1136 }
1137
1138
1139 /**
1140  * Add a client to our list of active clients.
1141  *
1142  * @param cls NULL
1143  * @param client client to add
1144  * @param mq message queue for @a client
1145  * @return internal namestore client structure for this client
1146  */
1147 static void *
1148 client_connect_cb (void *cls,
1149                    struct GNUNET_SERVICE_Client *client,
1150                    struct GNUNET_MQ_Handle *mq)
1151 {
1152   struct NamestoreClient *nc;
1153
1154   (void) cls;
1155   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p connected\n", client);
1156   nc = GNUNET_new (struct NamestoreClient);
1157   nc->client = client;
1158   nc->mq = mq;
1159   return nc;
1160 }
1161
1162
1163 /**
1164  * Closure for #lookup_it().
1165  */
1166 struct RecordLookupContext
1167 {
1168   /**
1169    * FIXME.
1170    */
1171   const char *label;
1172
1173   /**
1174    * FIXME.
1175    */
1176   char *res_rd;
1177
1178   /**
1179    * FIXME.
1180    */
1181   struct GNUNET_GNSRECORD_Data *nick;
1182
1183   /**
1184    * FIXME.
1185    */
1186   int found;
1187
1188   /**
1189    * FIXME.
1190    */
1191   unsigned int res_rd_count;
1192
1193   /**
1194    * FIXME.
1195    */
1196   ssize_t rd_ser_len;
1197 };
1198
1199
1200 /**
1201  * Function called by the namestore plugin when we are trying to lookup
1202  * a record as part of #handle_record_lookup().  Merges all results into
1203  * the context.
1204  *
1205  * @param cls closure with a `struct RecordLookupContext`
1206  * @param seq unique serial number of the record, MUST NOT BE ZERO
1207  * @param zone_key private key of the zone
1208  * @param label name that is being mapped (at most 255 characters long)
1209  * @param rd_count number of entries in @a rd array
1210  * @param rd array of records with data to store
1211  */
1212 static void
1213 lookup_it (void *cls,
1214            uint64_t seq,
1215            const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
1216            const char *label,
1217            unsigned int rd_count,
1218            const struct GNUNET_GNSRECORD_Data *rd)
1219 {
1220   struct RecordLookupContext *rlc = cls;
1221
1222   (void) private_key;
1223   GNUNET_assert (0 != seq);
1224   if (0 != strcmp (label, rlc->label))
1225     return;
1226   rlc->found = GNUNET_YES;
1227   if (0 == rd_count)
1228   {
1229     rlc->rd_ser_len = 0;
1230     rlc->res_rd_count = 0;
1231     rlc->res_rd = NULL;
1232     return;
1233   }
1234   if ((NULL != rlc->nick) && (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT)))
1235   {
1236     /* Merge */
1237     struct GNUNET_GNSRECORD_Data *rd_res;
1238     unsigned int rdc_res;
1239
1240     rd_res = NULL;
1241     rdc_res = 0;
1242     rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE)
1243                        ^ GNUNET_GNSRECORD_RF_PRIVATE;
1244     merge_with_nick_records (rlc->nick, rd_count, rd, &rdc_res, &rd_res);
1245     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res, rd_res);
1246     if (rlc->rd_ser_len < 0)
1247     {
1248       GNUNET_break (0);
1249       GNUNET_free (rd_res);
1250       rlc->found = GNUNET_NO;
1251       rlc->rd_ser_len = 0;
1252       return;
1253     }
1254     rlc->res_rd_count = rdc_res;
1255     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1256     if (rlc->rd_ser_len != GNUNET_GNSRECORD_records_serialize (rdc_res,
1257                                                                rd_res,
1258                                                                rlc->rd_ser_len,
1259                                                                rlc->res_rd))
1260     {
1261       GNUNET_break (0);
1262       GNUNET_free (rlc->res_rd);
1263       rlc->res_rd = NULL;
1264       rlc->res_rd_count = 0;
1265       rlc->rd_ser_len = 0;
1266       GNUNET_free (rd_res);
1267       rlc->found = GNUNET_NO;
1268       return;
1269     }
1270     GNUNET_free (rd_res);
1271     GNUNET_free (rlc->nick);
1272     rlc->nick = NULL;
1273   }
1274   else
1275   {
1276     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count, rd);
1277     if (rlc->rd_ser_len < 0)
1278     {
1279       GNUNET_break (0);
1280       rlc->found = GNUNET_NO;
1281       rlc->rd_ser_len = 0;
1282       return;
1283     }
1284     rlc->res_rd_count = rd_count;
1285     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1286     if (rlc->rd_ser_len != GNUNET_GNSRECORD_records_serialize (rd_count,
1287                                                                rd,
1288                                                                rlc->rd_ser_len,
1289                                                                rlc->res_rd))
1290     {
1291       GNUNET_break (0);
1292       GNUNET_free (rlc->res_rd);
1293       rlc->res_rd = NULL;
1294       rlc->res_rd_count = 0;
1295       rlc->rd_ser_len = 0;
1296       rlc->found = GNUNET_NO;
1297       return;
1298     }
1299   }
1300 }
1301
1302
1303 /**
1304  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1305  *
1306  * @param cls client sending the message
1307  * @param ll_msg message of type `struct LabelLookupMessage`
1308  * @return #GNUNET_OK if @a ll_msg is well-formed
1309  */
1310 static int
1311 check_record_lookup (void *cls, const struct LabelLookupMessage *ll_msg)
1312 {
1313   uint32_t name_len;
1314   size_t src_size;
1315
1316   (void) cls;
1317   name_len = ntohl (ll_msg->label_len);
1318   src_size = ntohs (ll_msg->gns_header.header.size);
1319   if (name_len != src_size - sizeof(struct LabelLookupMessage))
1320   {
1321     GNUNET_break (0);
1322     return GNUNET_SYSERR;
1323   }
1324   GNUNET_MQ_check_zero_termination (ll_msg);
1325   return GNUNET_OK;
1326 }
1327
1328
1329 /**
1330  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1331  *
1332  * @param cls client sending the message
1333  * @param ll_msg message of type `struct LabelLookupMessage`
1334  */
1335 static void
1336 handle_record_lookup (void *cls, const struct LabelLookupMessage *ll_msg)
1337 {
1338   struct NamestoreClient *nc = cls;
1339   struct GNUNET_MQ_Envelope *env;
1340   struct LabelLookupResponseMessage *llr_msg;
1341   struct RecordLookupContext rlc;
1342   const char *name_tmp;
1343   char *res_name;
1344   char *conv_name;
1345   uint32_t name_len;
1346   int res;
1347
1348   name_len = ntohl (ll_msg->label_len);
1349   name_tmp = (const char *) &ll_msg[1];
1350   GNUNET_SERVICE_client_continue (nc->client);
1351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1352               "Received NAMESTORE_RECORD_LOOKUP message for name `%s'\n",
1353               name_tmp);
1354
1355   conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1356   if (NULL == conv_name)
1357   {
1358     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1359                 "Error converting name `%s'\n",
1360                 name_tmp);
1361     GNUNET_SERVICE_client_drop (nc->client);
1362     return;
1363   }
1364   rlc.label = conv_name;
1365   rlc.found = GNUNET_NO;
1366   rlc.res_rd_count = 0;
1367   rlc.res_rd = NULL;
1368   rlc.rd_ser_len = 0;
1369   rlc.nick = get_nick_record (&ll_msg->zone);
1370   res = GSN_database->lookup_records (GSN_database->cls,
1371                                       &ll_msg->zone,
1372                                       conv_name,
1373                                       &lookup_it,
1374                                       &rlc);
1375   GNUNET_free (conv_name);
1376   env =
1377     GNUNET_MQ_msg_extra (llr_msg,
1378                          name_len + rlc.rd_ser_len,
1379                          GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP_RESPONSE);
1380   llr_msg->gns_header.r_id = ll_msg->gns_header.r_id;
1381   llr_msg->private_key = ll_msg->zone;
1382   llr_msg->name_len = htons (name_len);
1383   llr_msg->rd_count = htons (rlc.res_rd_count);
1384   llr_msg->rd_len = htons (rlc.rd_ser_len);
1385   res_name = (char *) &llr_msg[1];
1386   if ((GNUNET_YES == rlc.found) && (GNUNET_OK == res))
1387     llr_msg->found = ntohs (GNUNET_YES);
1388   else
1389     llr_msg->found = ntohs (GNUNET_NO);
1390   GNUNET_memcpy (&llr_msg[1], name_tmp, name_len);
1391   GNUNET_memcpy (&res_name[name_len], rlc.res_rd, rlc.rd_ser_len);
1392   GNUNET_MQ_send (nc->mq, env);
1393   GNUNET_free_non_null (rlc.res_rd);
1394 }
1395
1396
1397 /**
1398  * Checks a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1399  *
1400  * @param cls client sending the message
1401  * @param rp_msg message of type `struct RecordStoreMessage`
1402  * @return #GNUNET_OK if @a rp_msg is well-formed
1403  */
1404 static int
1405 check_record_store (void *cls, const struct RecordStoreMessage *rp_msg)
1406 {
1407   size_t name_len;
1408   size_t msg_size;
1409   size_t msg_size_exp;
1410   size_t rd_ser_len;
1411   const char *name_tmp;
1412
1413   (void) cls;
1414   name_len = ntohs (rp_msg->name_len);
1415   msg_size = ntohs (rp_msg->gns_header.header.size);
1416   rd_ser_len = ntohs (rp_msg->rd_len);
1417   msg_size_exp = sizeof(struct RecordStoreMessage) + name_len + rd_ser_len;
1418   if (msg_size != msg_size_exp)
1419   {
1420     GNUNET_break (0);
1421     return GNUNET_SYSERR;
1422   }
1423   if ((0 == name_len) || (name_len > MAX_NAME_LEN))
1424   {
1425     GNUNET_break (0);
1426     return GNUNET_SYSERR;
1427   }
1428   name_tmp = (const char *) &rp_msg[1];
1429   if ('\0' != name_tmp[name_len - 1])
1430   {
1431     GNUNET_break (0);
1432     return GNUNET_SYSERR;
1433   }
1434   return GNUNET_OK;
1435 }
1436
1437
1438 /**
1439  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1440  *
1441  * @param cls client sending the message
1442  * @param rp_msg message of type `struct RecordStoreMessage`
1443  */
1444 static void
1445 handle_record_store (void *cls, const struct RecordStoreMessage *rp_msg)
1446 {
1447   struct NamestoreClient *nc = cls;
1448   size_t name_len;
1449   size_t rd_ser_len;
1450   uint32_t rid;
1451   const char *name_tmp;
1452   char *conv_name;
1453   const char *rd_ser;
1454   unsigned int rd_count;
1455   int res;
1456   struct StoreActivity *sa;
1457
1458   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1459               "Received NAMESTORE_RECORD_STORE message\n");
1460   rid = ntohl (rp_msg->gns_header.r_id);
1461   name_len = ntohs (rp_msg->name_len);
1462   rd_count = ntohs (rp_msg->rd_count);
1463   rd_ser_len = ntohs (rp_msg->rd_len);
1464   GNUNET_break (0 == ntohs (rp_msg->reserved));
1465   name_tmp = (const char *) &rp_msg[1];
1466   rd_ser = &name_tmp[name_len];
1467   {
1468     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL (rd_count)];
1469
1470     if (GNUNET_OK !=
1471         GNUNET_GNSRECORD_records_deserialize (rd_ser_len, rd_ser, rd_count, rd))
1472     {
1473       GNUNET_break (0);
1474       GNUNET_SERVICE_client_drop (nc->client);
1475       return;
1476     }
1477
1478     /* Extracting and converting private key */
1479     conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1480     if (NULL == conv_name)
1481     {
1482       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1483                   "Error converting name `%s'\n",
1484                   name_tmp);
1485       GNUNET_SERVICE_client_drop (nc->client);
1486       return;
1487     }
1488     GNUNET_STATISTICS_update (statistics,
1489                               "Well-formed store requests received",
1490                               1,
1491                               GNUNET_NO);
1492     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493                 "Creating %u records for name `%s'\n",
1494                 (unsigned int) rd_count,
1495                 conv_name);
1496     if ((0 == rd_count) &&
1497         (GNUNET_NO == GSN_database->lookup_records (GSN_database->cls,
1498                                                     &rp_msg->private_key,
1499                                                     conv_name,
1500                                                     NULL,
1501                                                     0)))
1502     {
1503       /* This name does not exist, so cannot be removed */
1504       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1505                   "Name `%s' does not exist, no deletion required\n",
1506                   conv_name);
1507       res = GNUNET_NO;
1508     }
1509     else
1510     {
1511       /* remove "NICK" records, unless this is for the
1512        #GNUNET_GNS_EMPTY_LABEL_AT label */
1513       struct GNUNET_GNSRECORD_Data rd_clean[GNUNET_NZL (rd_count)];
1514       unsigned int rd_clean_off;
1515       int have_nick;
1516
1517       rd_clean_off = 0;
1518       have_nick = GNUNET_NO;
1519       for (unsigned int i = 0; i < rd_count; i++)
1520       {
1521         rd_clean[rd_clean_off] = rd[i];
1522         if ((0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT, conv_name)) ||
1523             (GNUNET_GNSRECORD_TYPE_NICK != rd[i].record_type))
1524           rd_clean_off++;
1525
1526         if ((0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT, conv_name)) &&
1527             (GNUNET_GNSRECORD_TYPE_NICK == rd[i].record_type))
1528         {
1529           cache_nick (&rp_msg->private_key, &rd[i]);
1530           have_nick = GNUNET_YES;
1531         }
1532       }
1533       if ((0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT, conv_name)) &&
1534           (GNUNET_NO == have_nick))
1535       {
1536         /* remove nick record from cache, in case we have one there */
1537         cache_nick (&rp_msg->private_key, NULL);
1538       }
1539       res = GSN_database->store_records (GSN_database->cls,
1540                                          &rp_msg->private_key,
1541                                          conv_name,
1542                                          rd_clean_off,
1543                                          rd_clean);
1544     }
1545
1546     if (GNUNET_OK != res)
1547     {
1548       /* store not successful, not need to tell monitors */
1549       send_store_response (nc, res, rid);
1550       GNUNET_SERVICE_client_continue (nc->client);
1551       GNUNET_free (conv_name);
1552       return;
1553     }
1554
1555     sa = GNUNET_malloc (sizeof(struct StoreActivity)
1556                         + ntohs (rp_msg->gns_header.header.size));
1557     GNUNET_CONTAINER_DLL_insert (sa_head, sa_tail, sa);
1558     sa->nc = nc;
1559     sa->rsm = (const struct RecordStoreMessage *) &sa[1];
1560     GNUNET_memcpy (&sa[1], rp_msg, ntohs (rp_msg->gns_header.header.size));
1561     sa->zm_pos = monitor_head;
1562     sa->conv_name = conv_name;
1563     continue_store_activity (sa);
1564   }
1565 }
1566
1567
1568 /**
1569  * Context for record remove operations passed from #handle_zone_to_name to
1570  * #handle_zone_to_name_it as closure
1571  */
1572 struct ZoneToNameCtx
1573 {
1574   /**
1575    * Namestore client
1576    */
1577   struct NamestoreClient *nc;
1578
1579   /**
1580    * Request id (to be used in the response to the client).
1581    */
1582   uint32_t rid;
1583
1584   /**
1585    * Set to #GNUNET_OK on success, #GNUNET_SYSERR on error.  Note that
1586    * not finding a name for the zone still counts as a 'success' here,
1587    * as this field is about the success of executing the IPC protocol.
1588    */
1589   int success;
1590 };
1591
1592
1593 /**
1594  * Zone to name iterator
1595  *
1596  * @param cls struct ZoneToNameCtx *
1597  * @param seq sequence number of the record, MUST NOT BE ZERO
1598  * @param zone_key the zone key
1599  * @param name name
1600  * @param rd_count number of records in @a rd
1601  * @param rd record data
1602  */
1603 static void
1604 handle_zone_to_name_it (void *cls,
1605                         uint64_t seq,
1606                         const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1607                         const char *name,
1608                         unsigned int rd_count,
1609                         const struct GNUNET_GNSRECORD_Data *rd)
1610 {
1611   struct ZoneToNameCtx *ztn_ctx = cls;
1612   struct GNUNET_MQ_Envelope *env;
1613   struct ZoneToNameResponseMessage *ztnr_msg;
1614   int16_t res;
1615   size_t name_len;
1616   ssize_t rd_ser_len;
1617   size_t msg_size;
1618   char *name_tmp;
1619   char *rd_tmp;
1620
1621   GNUNET_assert (0 != seq);
1622   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1623               "Found result for zone-to-name lookup: `%s'\n",
1624               name);
1625   res = GNUNET_YES;
1626   name_len = (NULL == name) ? 0 : strlen (name) + 1;
1627   rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count, rd);
1628   if (rd_ser_len < 0)
1629   {
1630     GNUNET_break (0);
1631     ztn_ctx->success = GNUNET_SYSERR;
1632     return;
1633   }
1634   msg_size = sizeof(struct ZoneToNameResponseMessage) + name_len + rd_ser_len;
1635   if (msg_size >= GNUNET_MAX_MESSAGE_SIZE)
1636   {
1637     GNUNET_break (0);
1638     ztn_ctx->success = GNUNET_SYSERR;
1639     return;
1640   }
1641   env =
1642     GNUNET_MQ_msg_extra (ztnr_msg,
1643                          name_len + rd_ser_len,
1644                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1645   ztnr_msg->gns_header.header.size = htons (msg_size);
1646   ztnr_msg->gns_header.r_id = htonl (ztn_ctx->rid);
1647   ztnr_msg->res = htons (res);
1648   ztnr_msg->rd_len = htons (rd_ser_len);
1649   ztnr_msg->rd_count = htons (rd_count);
1650   ztnr_msg->name_len = htons (name_len);
1651   ztnr_msg->zone = *zone_key;
1652   name_tmp = (char *) &ztnr_msg[1];
1653   GNUNET_memcpy (name_tmp, name, name_len);
1654   rd_tmp = &name_tmp[name_len];
1655   GNUNET_assert (
1656     rd_ser_len ==
1657     GNUNET_GNSRECORD_records_serialize (rd_count, rd, rd_ser_len, rd_tmp));
1658   ztn_ctx->success = GNUNET_OK;
1659   GNUNET_MQ_send (ztn_ctx->nc->mq, env);
1660 }
1661
1662
1663 /**
1664  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME message
1665  *
1666  * @param cls client client sending the message
1667  * @param ztn_msg message of type 'struct ZoneToNameMessage'
1668  */
1669 static void
1670 handle_zone_to_name (void *cls, const struct ZoneToNameMessage *ztn_msg)
1671 {
1672   struct NamestoreClient *nc = cls;
1673   struct ZoneToNameCtx ztn_ctx;
1674   struct GNUNET_MQ_Envelope *env;
1675   struct ZoneToNameResponseMessage *ztnr_msg;
1676
1677   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ZONE_TO_NAME message\n");
1678   ztn_ctx.rid = ntohl (ztn_msg->gns_header.r_id);
1679   ztn_ctx.nc = nc;
1680   ztn_ctx.success = GNUNET_NO;
1681   if (GNUNET_SYSERR == GSN_database->zone_to_name (GSN_database->cls,
1682                                                    &ztn_msg->zone,
1683                                                    &ztn_msg->value_zone,
1684                                                    &handle_zone_to_name_it,
1685                                                    &ztn_ctx))
1686   {
1687     /* internal error, hang up instead of signalling something
1688        that might be wrong */
1689     GNUNET_break (0);
1690     GNUNET_SERVICE_client_drop (nc->client);
1691     return;
1692   }
1693   if (GNUNET_NO == ztn_ctx.success)
1694   {
1695     /* no result found, send empty response */
1696     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1697                 "Found no result for zone-to-name lookup.\n");
1698     env = GNUNET_MQ_msg (ztnr_msg,
1699                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1700     ztnr_msg->gns_header.r_id = ztn_msg->gns_header.r_id;
1701     ztnr_msg->res = htons (GNUNET_NO);
1702     GNUNET_MQ_send (nc->mq, env);
1703   }
1704   GNUNET_SERVICE_client_continue (nc->client);
1705 }
1706
1707
1708 /**
1709  * Context for record remove operations passed from
1710  * #run_zone_iteration_round to #zone_iterate_proc as closure
1711  */
1712 struct ZoneIterationProcResult
1713 {
1714   /**
1715    * The zone iteration handle
1716    */
1717   struct ZoneIteration *zi;
1718
1719   /**
1720    * Number of results left to be returned in this iteration.
1721    */
1722   uint64_t limit;
1723 };
1724
1725
1726 /**
1727  * Process results for zone iteration from database
1728  *
1729  * @param cls struct ZoneIterationProcResult
1730  * @param seq sequence number of the record, MUST NOT BE ZERO
1731  * @param zone_key the zone key
1732  * @param name name
1733  * @param rd_count number of records for this name
1734  * @param rd record data
1735  */
1736 static void
1737 zone_iterate_proc (void *cls,
1738                    uint64_t seq,
1739                    const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1740                    const char *name,
1741                    unsigned int rd_count,
1742                    const struct GNUNET_GNSRECORD_Data *rd)
1743 {
1744   struct ZoneIterationProcResult *proc = cls;
1745   int do_refresh_block;
1746
1747   GNUNET_assert (0 != seq);
1748   if ((NULL == zone_key) && (NULL == name))
1749   {
1750     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration done\n");
1751     return;
1752   }
1753   if ((NULL == zone_key) || (NULL == name))
1754   {
1755     /* what is this!? should never happen */
1756     GNUNET_break (0);
1757     return;
1758   }
1759   if (0 == proc->limit)
1760   {
1761     /* what is this!? should never happen */
1762     GNUNET_break (0);
1763     return;
1764   }
1765   proc->limit--;
1766   proc->zi->seq = seq;
1767   send_lookup_response (proc->zi->nc,
1768                         proc->zi->request_id,
1769                         zone_key,
1770                         name,
1771                         rd_count,
1772                         rd);
1773
1774
1775   do_refresh_block = GNUNET_NO;
1776   for (unsigned int i = 0; i < rd_count; i++)
1777     if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
1778     {
1779       do_refresh_block = GNUNET_YES;
1780       break;
1781     }
1782   if (GNUNET_YES == do_refresh_block)
1783     refresh_block (NULL, proc->zi, 0, zone_key, name, rd_count, rd);
1784 }
1785
1786
1787 /**
1788  * Perform the next round of the zone iteration.
1789  *
1790  * @param zi zone iterator to process
1791  * @param limit number of results to return in one pass
1792  */
1793 static void
1794 run_zone_iteration_round (struct ZoneIteration *zi, uint64_t limit)
1795 {
1796   struct ZoneIterationProcResult proc;
1797   struct GNUNET_TIME_Absolute start;
1798   struct GNUNET_TIME_Relative duration;
1799
1800   memset (&proc, 0, sizeof(proc));
1801   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1802               "Asked to return up to %llu records at position %llu\n",
1803               (unsigned long long) limit,
1804               (unsigned long long) zi->seq);
1805   proc.zi = zi;
1806   proc.limit = limit;
1807   start = GNUNET_TIME_absolute_get ();
1808   GNUNET_break (GNUNET_SYSERR !=
1809                 GSN_database->iterate_records (GSN_database->cls,
1810                                                (0 == GNUNET_is_zero (&zi->zone))
1811                                                ? NULL
1812                                                : &zi->zone,
1813                                                zi->seq,
1814                                                limit,
1815                                                &zone_iterate_proc,
1816                                                &proc));
1817   duration = GNUNET_TIME_absolute_get_duration (start);
1818   duration = GNUNET_TIME_relative_divide (duration, limit - proc.limit);
1819   GNUNET_STATISTICS_set (statistics,
1820                          "NAMESTORE iteration delay (μs/record)",
1821                          duration.rel_value_us,
1822                          GNUNET_NO);
1823   if (0 == proc.limit)
1824     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1825                 "Returned %llu results, more results available\n",
1826                 (unsigned long long) limit);
1827   zi->send_end = (0 != proc.limit);
1828   if (0 == zi->cache_ops)
1829     zone_iteration_done_client_continue (zi);
1830 }
1831
1832
1833 /**
1834  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START message
1835  *
1836  * @param cls the client sending the message
1837  * @param zis_msg message from the client
1838  */
1839 static void
1840 handle_iteration_start (void *cls,
1841                         const struct ZoneIterationStartMessage *zis_msg)
1842 {
1843   struct NamestoreClient *nc = cls;
1844   struct ZoneIteration *zi;
1845
1846   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1847               "Received ZONE_ITERATION_START message\n");
1848   zi = GNUNET_new (struct ZoneIteration);
1849   zi->request_id = ntohl (zis_msg->gns_header.r_id);
1850   zi->offset = 0;
1851   zi->nc = nc;
1852   zi->zone = zis_msg->zone;
1853
1854   GNUNET_CONTAINER_DLL_insert (nc->op_head, nc->op_tail, zi);
1855   run_zone_iteration_round (zi, 1);
1856 }
1857
1858
1859 /**
1860  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP message
1861  *
1862  * @param cls the client sending the message
1863  * @param zis_msg message from the client
1864  */
1865 static void
1866 handle_iteration_stop (void *cls,
1867                        const struct ZoneIterationStopMessage *zis_msg)
1868 {
1869   struct NamestoreClient *nc = cls;
1870   struct ZoneIteration *zi;
1871   uint32_t rid;
1872
1873   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1874               "Received ZONE_ITERATION_STOP message\n");
1875   rid = ntohl (zis_msg->gns_header.r_id);
1876   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1877     if (zi->request_id == rid)
1878       break;
1879   if (NULL == zi)
1880   {
1881     GNUNET_break (0);
1882     GNUNET_SERVICE_client_drop (nc->client);
1883     return;
1884   }
1885   GNUNET_CONTAINER_DLL_remove (nc->op_head, nc->op_tail, zi);
1886   GNUNET_free (zi);
1887   GNUNET_SERVICE_client_continue (nc->client);
1888 }
1889
1890
1891 /**
1892  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message
1893  *
1894  * @param cls the client sending the message
1895  * @param message message from the client
1896  */
1897 static void
1898 handle_iteration_next (void *cls,
1899                        const struct ZoneIterationNextMessage *zis_msg)
1900 {
1901   struct NamestoreClient *nc = cls;
1902   struct ZoneIteration *zi;
1903   uint32_t rid;
1904   uint64_t limit;
1905
1906   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1907               "Received ZONE_ITERATION_NEXT message\n");
1908   GNUNET_STATISTICS_update (statistics,
1909                             "Iteration NEXT messages received",
1910                             1,
1911                             GNUNET_NO);
1912   rid = ntohl (zis_msg->gns_header.r_id);
1913   limit = GNUNET_ntohll (zis_msg->limit);
1914   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1915     if (zi->request_id == rid)
1916       break;
1917   if (NULL == zi)
1918   {
1919     GNUNET_break (0);
1920     GNUNET_SERVICE_client_drop (nc->client);
1921     return;
1922   }
1923   run_zone_iteration_round (zi, limit);
1924 }
1925
1926
1927 /**
1928  * Function called when the monitor is ready for more data, and we
1929  * should thus unblock PUT operations that were blocked on the
1930  * monitor not being ready.
1931  */
1932 static void
1933 monitor_unblock (struct ZoneMonitor *zm)
1934 {
1935   struct StoreActivity *sa = sa_head;
1936
1937   while ((NULL != sa) && (zm->limit > zm->iteration_cnt))
1938   {
1939     struct StoreActivity *sn = sa->next;
1940
1941     if (sa->zm_pos == zm)
1942       continue_store_activity (sa);
1943     sa = sn;
1944   }
1945   if (zm->limit > zm->iteration_cnt)
1946   {
1947     zm->sa_waiting = GNUNET_NO;
1948     if (NULL != zm->sa_wait_warning)
1949     {
1950       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1951       zm->sa_wait_warning = NULL;
1952     }
1953   }
1954   else if (GNUNET_YES == zm->sa_waiting)
1955   {
1956     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
1957     if (NULL != zm->sa_wait_warning)
1958       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1959     zm->sa_wait_warning =
1960       GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1961                                     &warn_monitor_slow,
1962                                     zm);
1963   }
1964 }
1965
1966
1967 /**
1968  * Send 'sync' message to zone monitor, we're now in sync.
1969  *
1970  * @param zm monitor that is now in sync
1971  */
1972 static void
1973 monitor_sync (struct ZoneMonitor *zm)
1974 {
1975   struct GNUNET_MQ_Envelope *env;
1976   struct GNUNET_MessageHeader *sync;
1977
1978   env = GNUNET_MQ_msg (sync, GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC);
1979   GNUNET_MQ_send (zm->nc->mq, env);
1980   /* mark iteration done */
1981   zm->in_first_iteration = GNUNET_NO;
1982   zm->iteration_cnt = 0;
1983   if ((zm->limit > 0) && (zm->sa_waiting))
1984     monitor_unblock (zm);
1985 }
1986
1987
1988 /**
1989  * Obtain the next datum during the zone monitor's zone initial iteration.
1990  *
1991  * @param cls zone monitor that does its initial iteration
1992  */
1993 static void
1994 monitor_iteration_next (void *cls);
1995
1996
1997 /**
1998  * A #GNUNET_NAMESTORE_RecordIterator for monitors.
1999  *
2000  * @param cls a 'struct ZoneMonitor *' with information about the monitor
2001  * @param seq sequence number of the record, MUST NOT BE ZERO
2002  * @param zone_key zone key of the zone
2003  * @param name name
2004  * @param rd_count number of records in @a rd
2005  * @param rd array of records
2006  */
2007 static void
2008 monitor_iterate_cb (void *cls,
2009                     uint64_t seq,
2010                     const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
2011                     const char *name,
2012                     unsigned int rd_count,
2013                     const struct GNUNET_GNSRECORD_Data *rd)
2014 {
2015   struct ZoneMonitor *zm = cls;
2016
2017   GNUNET_assert (0 != seq);
2018   zm->seq = seq;
2019   GNUNET_assert (NULL != name);
2020   GNUNET_STATISTICS_update (statistics,
2021                             "Monitor notifications sent",
2022                             1,
2023                             GNUNET_NO);
2024   zm->limit--;
2025   zm->iteration_cnt--;
2026   send_lookup_response (zm->nc, 0, zone_key, name, rd_count, rd);
2027   if ((0 == zm->iteration_cnt) && (0 != zm->limit))
2028   {
2029     /* We are done with the current iteration batch, AND the
2030        client would right now accept more, so go again! */
2031     GNUNET_assert (NULL == zm->task);
2032     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, zm);
2033   }
2034 }
2035
2036
2037 /**
2038  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START message
2039  *
2040  * @param cls the client sending the message
2041  * @param zis_msg message from the client
2042  */
2043 static void
2044 handle_monitor_start (void *cls, const struct ZoneMonitorStartMessage *zis_msg)
2045 {
2046   struct NamestoreClient *nc = cls;
2047   struct ZoneMonitor *zm;
2048
2049   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ZONE_MONITOR_START message\n");
2050   zm = GNUNET_new (struct ZoneMonitor);
2051   zm->nc = nc;
2052   zm->zone = zis_msg->zone;
2053   zm->limit = 1;
2054   zm->in_first_iteration = (GNUNET_YES == ntohl (zis_msg->iterate_first));
2055   GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, zm);
2056   GNUNET_SERVICE_client_mark_monitor (nc->client);
2057   GNUNET_SERVICE_client_continue (nc->client);
2058   GNUNET_notification_context_add (monitor_nc, nc->mq);
2059   if (zm->in_first_iteration)
2060     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, zm);
2061   else
2062     monitor_sync (zm);
2063 }
2064
2065
2066 /**
2067  * Obtain the next datum during the zone monitor's zone initial iteration.
2068  *
2069  * @param cls zone monitor that does its initial iteration
2070  */
2071 static void
2072 monitor_iteration_next (void *cls)
2073 {
2074   struct ZoneMonitor *zm = cls;
2075   int ret;
2076
2077   zm->task = NULL;
2078   GNUNET_assert (0 == zm->iteration_cnt);
2079   if (zm->limit > 16)
2080     zm->iteration_cnt = zm->limit / 2; /* leave half for monitor events */
2081   else
2082     zm->iteration_cnt = zm->limit; /* use it all */
2083   ret = GSN_database->iterate_records (GSN_database->cls,
2084                                        (0 == GNUNET_is_zero (&zm->zone))
2085                                        ? NULL
2086                                        : &zm->zone,
2087                                        zm->seq,
2088                                        zm->iteration_cnt,
2089                                        &monitor_iterate_cb,
2090                                        zm);
2091   if (GNUNET_SYSERR == ret)
2092   {
2093     GNUNET_SERVICE_client_drop (zm->nc->client);
2094     return;
2095   }
2096   if (GNUNET_NO == ret)
2097   {
2098     /* empty zone */
2099     monitor_sync (zm);
2100     return;
2101   }
2102 }
2103
2104
2105 /**
2106  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT message
2107  *
2108  * @param cls the client sending the message
2109  * @param nm message from the client
2110  */
2111 static void
2112 handle_monitor_next (void *cls, const struct ZoneMonitorNextMessage *nm)
2113 {
2114   struct NamestoreClient *nc = cls;
2115   struct ZoneMonitor *zm;
2116   uint64_t inc;
2117
2118   inc = GNUNET_ntohll (nm->limit);
2119   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2120               "Received ZONE_MONITOR_NEXT message with limit %llu\n",
2121               (unsigned long long) inc);
2122   for (zm = monitor_head; NULL != zm; zm = zm->next)
2123     if (zm->nc == nc)
2124       break;
2125   if (NULL == zm)
2126   {
2127     GNUNET_break (0);
2128     GNUNET_SERVICE_client_drop (nc->client);
2129     return;
2130   }
2131   GNUNET_SERVICE_client_continue (nc->client);
2132   if (zm->limit + inc < zm->limit)
2133   {
2134     GNUNET_break (0);
2135     GNUNET_SERVICE_client_drop (nc->client);
2136     return;
2137   }
2138   zm->limit += inc;
2139   if ((zm->in_first_iteration) && (zm->limit == inc))
2140   {
2141     /* We are still iterating, and the previous iteration must
2142        have stopped due to the client's limit, so continue it! */
2143     GNUNET_assert (NULL == zm->task);
2144     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, zm);
2145   }
2146   GNUNET_assert (zm->iteration_cnt <= zm->limit);
2147   if ((zm->limit > zm->iteration_cnt) && (zm->sa_waiting))
2148   {
2149     monitor_unblock (zm);
2150   }
2151   else if (GNUNET_YES == zm->sa_waiting)
2152   {
2153     if (NULL != zm->sa_wait_warning)
2154       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2155     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2156     zm->sa_wait_warning =
2157       GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2158                                     &warn_monitor_slow,
2159                                     zm);
2160   }
2161 }
2162
2163
2164 /**
2165  * Process namestore requests.
2166  *
2167  * @param cls closure
2168  * @param cfg configuration to use
2169  * @param service the initialized service
2170  */
2171 static void
2172 run (void *cls,
2173      const struct GNUNET_CONFIGURATION_Handle *cfg,
2174      struct GNUNET_SERVICE_Handle *service)
2175 {
2176   char *database;
2177
2178   (void) cls;
2179   (void) service;
2180   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting namestore service\n");
2181   cache_keys =
2182     GNUNET_CONFIGURATION_get_value_yesno (cfg, "namestore", "CACHE_KEYS");
2183   disable_namecache =
2184     GNUNET_CONFIGURATION_get_value_yesno (cfg, "namecache", "DISABLE");
2185   GSN_cfg = cfg;
2186   monitor_nc = GNUNET_notification_context_create (1);
2187   if (GNUNET_YES != disable_namecache)
2188   {
2189     namecache = GNUNET_NAMECACHE_connect (cfg);
2190     GNUNET_assert (NULL != namecache);
2191   }
2192   /* Loading database plugin */
2193   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg,
2194                                                           "namestore",
2195                                                           "database",
2196                                                           &database))
2197     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No database backend configured\n");
2198
2199   GNUNET_asprintf (&db_lib_name, "libgnunet_plugin_namestore_%s", database);
2200   GSN_database = GNUNET_PLUGIN_load (db_lib_name, (void *) GSN_cfg);
2201   GNUNET_free (database);
2202   statistics = GNUNET_STATISTICS_create ("namestore", cfg);
2203   GNUNET_SCHEDULER_add_shutdown (&cleanup_task, NULL);
2204   if (NULL == GSN_database)
2205   {
2206     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2207                 "Could not load database backend `%s'\n",
2208                 db_lib_name);
2209     GNUNET_SCHEDULER_shutdown ();
2210     return;
2211   }
2212 }
2213
2214
2215 /**
2216  * Define "main" method using service macro.
2217  */
2218 GNUNET_SERVICE_MAIN (
2219   "namestore",
2220   GNUNET_SERVICE_OPTION_NONE,
2221   &run,
2222   &client_connect_cb,
2223   &client_disconnect_cb,
2224   NULL,
2225   GNUNET_MQ_hd_var_size (record_store,
2226                          GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE,
2227                          struct RecordStoreMessage,
2228                          NULL),
2229   GNUNET_MQ_hd_var_size (record_lookup,
2230                          GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP,
2231                          struct LabelLookupMessage,
2232                          NULL),
2233   GNUNET_MQ_hd_fixed_size (zone_to_name,
2234                            GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME,
2235                            struct ZoneToNameMessage,
2236                            NULL),
2237   GNUNET_MQ_hd_fixed_size (iteration_start,
2238                            GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START,
2239                            struct ZoneIterationStartMessage,
2240                            NULL),
2241   GNUNET_MQ_hd_fixed_size (iteration_next,
2242                            GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT,
2243                            struct ZoneIterationNextMessage,
2244                            NULL),
2245   GNUNET_MQ_hd_fixed_size (iteration_stop,
2246                            GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP,
2247                            struct ZoneIterationStopMessage,
2248                            NULL),
2249   GNUNET_MQ_hd_fixed_size (monitor_start,
2250                            GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START,
2251                            struct ZoneMonitorStartMessage,
2252                            NULL),
2253   GNUNET_MQ_hd_fixed_size (monitor_next,
2254                            GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT,
2255                            struct ZoneMonitorNextMessage,
2256                            NULL),
2257   GNUNET_MQ_handler_end ());
2258
2259
2260 /* end of gnunet-service-namestore.c */