src: for every AGPL3.0 file, add SPDX identifier.
[oweals/gnunet.git] / src / namestore / gnunet-service-namestore.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file namestore/gnunet-service-namestore.c
23  * @brief namestore for the GNUnet naming system
24  * @author Matthias Wachs
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - "get_nick_record" is a bottleneck, introduce a cache to
29  *   avoid looking it up again and again (for the same few
30  *   zones that the user will typically manage!)
31  * - run testcases, make sure everything works!
32  */
33 #include "platform.h"
34 #include "gnunet_util_lib.h"
35 #include "gnunet_dnsparser_lib.h"
36 #include "gnunet_gns_service.h"
37 #include "gnunet_namecache_service.h"
38 #include "gnunet_namestore_service.h"
39 #include "gnunet_namestore_plugin.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet_signatures.h"
42 #include "namestore.h"
43
44 #define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
45
46 /**
47  * If a monitor takes more than 1 minute to process an event, print a warning.
48  */
49 #define MONITOR_STALL_WARN_DELAY GNUNET_TIME_UNIT_MINUTES
50
51 /**
52  * Size of the cache used by #get_nick_record()
53  */
54 #define NC_SIZE 16
55
56 /**
57  * A namestore client
58  */
59 struct NamestoreClient;
60
61
62 /**
63  * A namestore iteration operation.
64  */
65 struct ZoneIteration
66 {
67   /**
68    * Next element in the DLL
69    */
70   struct ZoneIteration *next;
71
72   /**
73    * Previous element in the DLL
74    */
75   struct ZoneIteration *prev;
76
77   /**
78    * Namestore client which intiated this zone iteration
79    */
80   struct NamestoreClient *nc;
81
82   /**
83    * The nick to add to the records
84    */
85   struct GNUNET_GNSRECORD_Data *nick;
86
87   /**
88    * Key of the zone we are iterating over.
89    */
90   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
91
92   /**
93    * Last sequence number in the zone iteration used to address next
94    * result of the zone iteration in the store
95    *
96    * Initialy set to 0.
97    * Updated in #zone_iterate_proc()
98    */
99   uint64_t seq;
100
101   /**
102    * The operation id fot the zone iteration in the response for the client
103    */
104   uint32_t request_id;
105
106   /**
107    * Offset of the zone iteration used to address next result of the zone
108    * iteration in the store
109    *
110    * Initialy set to 0 in #handle_iteration_start
111    * Incremented with by every call to #handle_iteration_next
112    */
113   uint32_t offset;
114
115   /**
116    * Number of pending cache operations triggered by this zone iteration which we
117    * need to wait for before allowing the client to continue.
118    */
119   unsigned int cache_ops;
120
121   /**
122    * Set to #GNUNET_YES if the last iteration exhausted the limit set by the
123    * client and we should send the #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END
124    * message and free the data structure once @e cache_ops is zero.
125    */
126   int send_end;
127   
128 };
129
130
131 /**
132  * A namestore client
133  */
134 struct NamestoreClient
135 {
136
137   /**
138    * The client
139    */
140   struct GNUNET_SERVICE_Client *client;
141
142   /**
143    * Message queue for transmission to @e client
144    */
145   struct GNUNET_MQ_Handle *mq;
146
147   /**
148    * Head of the DLL of
149    * Zone iteration operations in progress initiated by this client
150    */
151   struct ZoneIteration *op_head;
152
153   /**
154    * Tail of the DLL of
155    * Zone iteration operations in progress initiated by this client
156    */
157   struct ZoneIteration *op_tail;
158 };
159
160
161 /**
162  * A namestore monitor.
163  */
164 struct ZoneMonitor
165 {
166   /**
167    * Next element in the DLL
168    */
169   struct ZoneMonitor *next;
170
171   /**
172    * Previous element in the DLL
173    */
174   struct ZoneMonitor *prev;
175
176   /**
177    * Namestore client which intiated this zone monitor
178    */
179   struct NamestoreClient *nc;
180
181   /**
182    * Private key of the zone.
183    */
184   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
185
186   /**
187    * Task active during initial iteration.
188    */
189   struct GNUNET_SCHEDULER_Task *task;
190
191   /**
192    * Task to warn about slow monitors.
193    */
194   struct GNUNET_SCHEDULER_Task *sa_wait_warning;
195
196   /**
197    * Since when are we blocked on this monitor?
198    */
199   struct GNUNET_TIME_Absolute sa_waiting_start;
200
201   /**
202    * Last sequence number in the zone iteration used to address next
203    * result of the zone iteration in the store
204    *
205    * Initialy set to 0.
206    * Updated in #monitor_iterate_cb()
207    */
208   uint64_t seq;
209
210   /**
211    * Current limit of how many more messages we are allowed
212    * to queue to this monitor.
213    */
214   uint64_t limit;
215
216   /**
217    * How many more requests may we receive from the iterator
218    * before it is at the limit we gave it?  Will be below or
219    * equal to @e limit.  The effective limit for monitor
220    * events is thus @e iteration_cnt - @e limit!
221    */
222   uint64_t iteration_cnt;
223
224   /**
225    * Are we (still) in the initial iteration pass?
226    */
227   int in_first_iteration;
228
229   /**
230    * Is there a store activity waiting for this monitor?  We only raise the
231    * flag when it happens and search the DLL for the store activity when we
232    * had a limit increase.  If we cannot find any waiting store activity at
233    * that time, we clear the flag again.
234    */
235   int sa_waiting;
236
237 };
238
239
240 /**
241  * Pending operation on the namecache.
242  */
243 struct CacheOperation
244 {
245
246   /**
247    * Kept in a DLL.
248    */
249   struct CacheOperation *prev;
250
251   /**
252    * Kept in a DLL.
253    */
254   struct CacheOperation *next;
255
256   /**
257    * Handle to namecache queue.
258    */
259   struct GNUNET_NAMECACHE_QueueEntry *qe;
260
261   /**
262    * Client to notify about the result, can be NULL.
263    */
264   struct NamestoreClient *nc;
265
266   /**
267    * Zone iteration to call #zone_iteration_done_client_continue()
268    * for if applicable, can be NULL.
269    */
270   struct ZoneIteration *zi;
271   
272   /**
273    * Client's request ID.
274    */
275   uint32_t rid;
276 };
277
278
279 /**
280  * Information for an ongoing #handle_record_store() operation.
281  * Needed as we may wait for monitors to be ready for the notification.
282  */
283 struct StoreActivity
284 {
285   /**
286    * Kept in a DLL.
287    */
288   struct StoreActivity *next;
289
290   /**
291    * Kept in a DLL.
292    */
293   struct StoreActivity *prev;
294
295   /**
296    * Which client triggered the store activity?
297    */
298   struct NamestoreClient *nc;
299
300   /**
301    * Copy of the original store message (as data fields in @e rd will
302    * point into it!).
303    */
304   const struct RecordStoreMessage *rsm;
305
306   /**
307    * Next zone monitor that still needs to be notified about this PUT.
308    */
309   struct ZoneMonitor *zm_pos;
310
311   /**
312    * Label nicely canonicalized (lower case).
313    */
314   char *conv_name;
315
316 };
317
318
319 /**
320  * Entry in list of cached nick resolutions.
321  */ 
322 struct NickCache
323 {
324   /**
325    * Zone the cache entry is for.
326    */ 
327   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
328
329   /**
330    * Cached record data.
331    */
332   struct GNUNET_GNSRECORD_Data *rd;
333
334   /**
335    * Timestamp when this cache entry was used last.
336    */
337   struct GNUNET_TIME_Absolute last_used;
338 };
339
340
341 /**
342  * We cache nick records to reduce DB load. 
343  */
344 static struct NickCache nick_cache[NC_SIZE];
345
346 /**
347  * Public key of all zeros.
348  */
349 static const struct GNUNET_CRYPTO_EcdsaPrivateKey zero;
350
351 /**
352  * Configuration handle.
353  */
354 static const struct GNUNET_CONFIGURATION_Handle *GSN_cfg;
355
356 /**
357  * Handle to the statistics service
358  */
359 static struct GNUNET_STATISTICS_Handle *statistics;
360
361 /**
362  * Namecache handle.
363  */
364 static struct GNUNET_NAMECACHE_Handle *namecache;
365
366 /**
367  * Database handle
368  */
369 static struct GNUNET_NAMESTORE_PluginFunctions *GSN_database;
370
371 /**
372  * Name of the database plugin
373  */
374 static char *db_lib_name;
375
376 /**
377  * Head of cop DLL.
378  */
379 static struct CacheOperation *cop_head;
380
381 /**
382  * Tail of cop DLL.
383  */
384 static struct CacheOperation *cop_tail;
385
386 /**
387  * First active zone monitor.
388  */
389 static struct ZoneMonitor *monitor_head;
390
391 /**
392  * Last active zone monitor.
393  */
394 static struct ZoneMonitor *monitor_tail;
395
396 /**
397  * Head of DLL of monitor-blocked store activities.
398  */
399 static struct StoreActivity *sa_head;
400
401 /**
402  * Tail of DLL of monitor-blocked store activities.
403  */
404 static struct StoreActivity *sa_tail;
405
406 /**
407  * Notification context shared by all monitors.
408  */
409 static struct GNUNET_NotificationContext *monitor_nc;
410
411 /**
412  * Optimize block insertion by caching map of private keys to
413  * public keys in memory?
414  */
415 static int cache_keys;
416
417 /**
418  * Use the namecache? Doing so creates additional cryptographic
419  * operations whenever we touch a record.
420  */
421 static int disable_namecache;
422
423
424 /**
425  * Task run during shutdown.
426  *
427  * @param cls unused
428  */
429 static void
430 cleanup_task (void *cls)
431 {
432   struct CacheOperation *cop;
433
434   (void) cls;
435   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
436               "Stopping namestore service\n");
437   while (NULL != (cop = cop_head))
438   {
439     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
440                 "Aborting incomplete namecache operation\n");
441     GNUNET_NAMECACHE_cancel (cop->qe);
442     GNUNET_CONTAINER_DLL_remove (cop_head,
443                                  cop_tail,
444                                  cop);
445     GNUNET_free (cop);
446   }
447   if (NULL != namecache)
448   {
449     GNUNET_NAMECACHE_disconnect (namecache);
450     namecache = NULL;
451   }
452   GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name,
453                                               GSN_database));
454   GNUNET_free (db_lib_name);
455   db_lib_name = NULL;
456   if (NULL != monitor_nc)
457   {
458     GNUNET_notification_context_destroy (monitor_nc);
459     monitor_nc = NULL;
460   }
461   if (NULL != statistics)
462   {
463     GNUNET_STATISTICS_destroy (statistics,
464                                GNUNET_NO);
465     statistics = NULL;
466   }
467 }
468
469
470 /**
471  * Release memory used by @a sa.
472  *
473  * @param sa activity to free
474  */
475 static void
476 free_store_activity (struct StoreActivity *sa)
477 {
478   GNUNET_CONTAINER_DLL_remove (sa_head,
479                                sa_tail,
480                                sa);
481   GNUNET_free (sa->conv_name);
482   GNUNET_free (sa);
483 }
484
485
486 /**
487  * Function called with the records for the #GNUNET_GNS_EMPTY_LABEL_AT
488  * label in the zone.  Used to locate the #GNUNET_GNSRECORD_TYPE_NICK
489  * record, which (if found) is then copied to @a cls for future use.
490  *
491  * @param cls a `struct GNUNET_GNSRECORD_Data **` for storing the nick (if found)
492  * @param seq sequence number of the record
493  * @param private_key the private key of the zone (unused)
494  * @param label should be #GNUNET_GNS_EMPTY_LABEL_AT
495  * @param rd_count number of records in @a rd
496  * @param rd records stored under @a label in the zone
497  */
498 static void
499 lookup_nick_it (void *cls,
500                 uint64_t seq,
501                 const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
502                 const char *label,
503                 unsigned int rd_count,
504                 const struct GNUNET_GNSRECORD_Data *rd)
505 {
506   struct GNUNET_GNSRECORD_Data **res = cls;
507
508   (void) private_key;
509   (void) seq;
510   if (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT))
511   {
512     GNUNET_break (0);
513     return;
514   }
515   for (unsigned int c = 0; c < rd_count; c++)
516   {
517     if (GNUNET_GNSRECORD_TYPE_NICK == rd[c].record_type)
518     {
519       (*res) = GNUNET_malloc (rd[c].data_size + sizeof (struct GNUNET_GNSRECORD_Data));
520       (*res)->data = &(*res)[1];
521       GNUNET_memcpy ((void *) (*res)->data,
522                      rd[c].data,
523                      rd[c].data_size);
524       (*res)->data_size = rd[c].data_size;
525       (*res)->expiration_time = rd[c].expiration_time;
526       (*res)->flags = rd[c].flags;
527       (*res)->record_type = GNUNET_GNSRECORD_TYPE_NICK;
528       return;
529     }
530   }
531   (*res) = NULL;
532 }
533
534
535 /**
536  * Add entry to the cache for @a zone and @a nick
537  *
538  * @param zone zone key to cache under
539  * @param nick nick entry to cache
540  */
541 static void
542 cache_nick (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone,
543             const struct GNUNET_GNSRECORD_Data *nick)
544 {
545   struct NickCache *oldest;
546
547   oldest = NULL;
548   for (unsigned int i=0;i<NC_SIZE;i++)
549   {
550     struct NickCache *pos = &nick_cache[i];
551
552     if ( (NULL == oldest) ||
553          (oldest->last_used.abs_value_us >
554           pos->last_used.abs_value_us) )
555       oldest = pos;
556     if (0 == memcmp (zone,
557                      &pos->zone,
558                      sizeof (*zone)))
559     {
560       oldest = pos;
561       break;
562     }
563   }
564   GNUNET_free_non_null (oldest->rd);
565   oldest->zone = *zone;
566   oldest->rd = GNUNET_malloc (sizeof (*nick) +
567                               nick->data_size);
568   *oldest->rd = *nick;
569   oldest->rd->data = &oldest->rd[1];
570   memcpy (&oldest->rd[1],
571           nick->data,
572           nick->data_size);
573   oldest->last_used = GNUNET_TIME_absolute_get ();
574 }
575
576
577 /**
578  * Return the NICK record for the zone (if it exists).
579  *
580  * @param zone private key for the zone to look for nick
581  * @return NULL if no NICK record was found
582  */
583 static struct GNUNET_GNSRECORD_Data *
584 get_nick_record (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone)
585 {
586   struct GNUNET_CRYPTO_EcdsaPublicKey pub;
587   struct GNUNET_GNSRECORD_Data *nick;
588   int res;
589
590   /* check cache first */
591   for (unsigned int i=0;i<NC_SIZE;i++)
592   {
593     struct NickCache *pos = &nick_cache[i];
594     if ( (NULL != pos->rd) &&
595          (0 == memcmp (zone,
596                        &pos->zone,
597                        sizeof (*zone))) )
598     {
599       nick = GNUNET_malloc (sizeof (*nick) +
600                             pos->rd->data_size);
601       *nick = *pos->rd;
602       nick->data = &nick[1];
603       memcpy (&nick[1],
604               pos->rd->data,
605               pos->rd->data_size);
606       pos->last_used = GNUNET_TIME_absolute_get ();
607       return nick;
608     }
609   }
610   
611   nick = NULL;
612   res = GSN_database->lookup_records (GSN_database->cls,
613                                       zone,
614                                       GNUNET_GNS_EMPTY_LABEL_AT,
615                                       &lookup_nick_it,
616                                       &nick);
617   if ( (GNUNET_OK != res) ||
618        (NULL == nick) )
619   {
620     GNUNET_CRYPTO_ecdsa_key_get_public (zone, &pub);
621     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
622                 "No nick name set for zone `%s'\n",
623                 GNUNET_GNSRECORD_z2s (&pub));
624     return NULL;
625   }
626
627   /* update cache */
628   cache_nick (zone,
629               nick);
630   return nick;
631 }
632
633
634 /**
635  * Merge the nick record @a nick_rd with the rest of the
636  * record set given in @a rd2.  Store the result in @a rdc_res
637  * and @a rd_res.  The @a nick_rd's expiration time is set to
638  * the maximum expiration time of all of the records in @a rd2.
639  *
640  * @param nick_rd the nick record to integrate
641  * @param rd2_length length of the @a rd2 array
642  * @param rd2 array of records
643  * @param rdc_res[out] length of the resulting @a rd_res array
644  * @param rd_res[out] set to an array of records,
645  *                    including @a nick_rd and @a rd2;
646  *           all of the variable-size 'data' fields in @a rd2 are
647  *           allocated in the same chunk of memory!
648  */
649 static void
650 merge_with_nick_records (const struct GNUNET_GNSRECORD_Data *nick_rd,
651                          unsigned int rd2_length,
652                          const struct GNUNET_GNSRECORD_Data *rd2,
653                          unsigned int *rdc_res,
654                          struct GNUNET_GNSRECORD_Data **rd_res)
655 {
656   uint64_t latest_expiration;
657   size_t req;
658   char *data;
659   size_t data_offset;
660   struct GNUNET_GNSRECORD_Data *target;
661
662   (*rdc_res) = 1 + rd2_length;
663   if (0 == 1 + rd2_length)
664   {
665     GNUNET_break (0);
666     (*rd_res) = NULL;
667     return;
668   }
669   req = sizeof (struct GNUNET_GNSRECORD_Data) + nick_rd->data_size;
670   for (unsigned int i=0; i<rd2_length; i++)
671   {
672     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
673
674     if (req + sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size < req)
675     {
676       GNUNET_break (0);
677       (*rd_res) = NULL;
678       return;
679     }
680     req += sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size;
681   }
682   target = GNUNET_malloc (req);
683   (*rd_res) = target;
684   data = (char *) &target[1 + rd2_length];
685   data_offset = 0;
686   latest_expiration = 0;
687   for (unsigned int i=0;i<rd2_length;i++)
688   {
689     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
690
691     if (0 != (orig->flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
692     {
693       if ((GNUNET_TIME_absolute_get().abs_value_us + orig->expiration_time) >
694           latest_expiration)
695         latest_expiration = orig->expiration_time;
696     }
697     else if (orig->expiration_time > latest_expiration)
698       latest_expiration = orig->expiration_time;
699     target[i] = *orig;
700     target[i].data = (void *) &data[data_offset];
701     GNUNET_memcpy (&data[data_offset],
702                    orig->data,
703                    orig->data_size);
704     data_offset += orig->data_size;
705   }
706   /* append nick */
707   target[rd2_length] = *nick_rd;
708   target[rd2_length].expiration_time = latest_expiration;
709   target[rd2_length].data = (void *) &data[data_offset];
710   GNUNET_memcpy (&data[data_offset],
711                  nick_rd->data,
712                  nick_rd->data_size);
713   data_offset += nick_rd->data_size;
714   GNUNET_assert (req ==
715                  (sizeof (struct GNUNET_GNSRECORD_Data)) * (*rdc_res) + data_offset);
716 }
717
718
719 /**
720  * Generate a `struct LookupNameResponseMessage` and send it to the
721  * given client using the given notification context.
722  *
723  * @param nc client to unicast to
724  * @param request_id request ID to use
725  * @param zone_key zone key of the zone
726  * @param name name
727  * @param rd_count number of records in @a rd
728  * @param rd array of records
729  */
730 static void
731 send_lookup_response (struct NamestoreClient *nc,
732                       uint32_t request_id,
733                       const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
734                       const char *name,
735                       unsigned int rd_count,
736                       const struct GNUNET_GNSRECORD_Data *rd)
737 {
738   struct GNUNET_MQ_Envelope *env;
739   struct RecordResultMessage *zir_msg;
740   struct GNUNET_GNSRECORD_Data *nick;
741   struct GNUNET_GNSRECORD_Data *res;
742   unsigned int res_count;
743   size_t name_len;
744   ssize_t rd_ser_len;
745   char *name_tmp;
746   char *rd_ser;
747
748   nick = get_nick_record (zone_key);
749   GNUNET_assert (-1 !=
750                  GNUNET_GNSRECORD_records_get_size (rd_count,
751                                                     rd));
752
753   if ( (NULL != nick) &&
754        (0 != strcmp (name,
755                      GNUNET_GNS_EMPTY_LABEL_AT)))
756   {
757     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
758     merge_with_nick_records (nick,
759                              rd_count,
760                              rd,
761                              &res_count,
762                              &res);
763     GNUNET_free (nick);
764   }
765   else
766   {
767     res_count = rd_count;
768     res = (struct GNUNET_GNSRECORD_Data *) rd;
769   }
770
771   GNUNET_assert (-1 !=
772                  GNUNET_GNSRECORD_records_get_size (res_count,
773                                                     res));
774
775
776   name_len = strlen (name) + 1;
777   rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count,
778                                                   res);
779   if (rd_ser_len < 0)
780   {
781     GNUNET_break (0);
782     GNUNET_SERVICE_client_drop (nc->client);
783     return;
784   }
785   if (((size_t) rd_ser_len) >= UINT16_MAX - name_len - sizeof (*zir_msg))
786   {
787     GNUNET_break (0);
788     GNUNET_SERVICE_client_drop (nc->client);
789     return;
790   }
791   env = GNUNET_MQ_msg_extra (zir_msg,
792                              name_len + rd_ser_len,
793                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
794   zir_msg->gns_header.r_id = htonl (request_id);
795   zir_msg->name_len = htons (name_len);
796   zir_msg->rd_count = htons (res_count);
797   zir_msg->rd_len = htons ((uint16_t) rd_ser_len);
798   zir_msg->private_key = *zone_key;
799   name_tmp = (char *) &zir_msg[1];
800   GNUNET_memcpy (name_tmp,
801                  name,
802                  name_len);
803   rd_ser = &name_tmp[name_len];
804   GNUNET_assert (rd_ser_len ==
805                  GNUNET_GNSRECORD_records_serialize (res_count,
806                                                      res,
807                                                      rd_ser_len,
808                                                      rd_ser));
809   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810               "Sending RECORD_RESULT message with %u records\n",
811               res_count);
812   GNUNET_STATISTICS_update (statistics,
813                             "Record sets sent to clients",
814                             1,
815                             GNUNET_NO);
816   GNUNET_MQ_send (nc->mq,
817                   env);
818   if (rd != res)
819     GNUNET_free (res);
820 }
821
822
823 /**
824  * Send response to the store request to the client.
825  *
826  * @param client client to talk to
827  * @param res status of the operation
828  * @param rid client's request ID
829  */
830 static void
831 send_store_response (struct NamestoreClient *nc,
832                      int res,
833                      uint32_t rid)
834 {
835   struct GNUNET_MQ_Envelope *env;
836   struct RecordStoreResponseMessage *rcr_msg;
837
838   GNUNET_assert (NULL != nc);
839   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
840               "Sending RECORD_STORE_RESPONSE message\n");
841   GNUNET_STATISTICS_update (statistics,
842                             "Store requests completed",
843                             1,
844                             GNUNET_NO);
845   env = GNUNET_MQ_msg (rcr_msg,
846                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE_RESPONSE);
847   rcr_msg->gns_header.r_id = htonl (rid);
848   rcr_msg->op_result = htonl (res);
849   GNUNET_MQ_send (nc->mq,
850                   env);
851 }
852
853
854 /**
855  * Function called once we are done with the zone iteration and
856  * allow the zone iteration client to send us more messages.
857  *
858  * @param zi zone iteration we are processing
859  */
860 static void
861 zone_iteration_done_client_continue (struct ZoneIteration *zi)
862 {
863   struct GNUNET_MQ_Envelope *env;
864   struct GNUNET_NAMESTORE_Header *em;
865
866   GNUNET_SERVICE_client_continue (zi->nc->client);
867   if (! zi->send_end)
868     return;
869   /* send empty response to indicate end of list */
870   env = GNUNET_MQ_msg (em,
871                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END);
872   em->r_id = htonl (zi->request_id);
873   GNUNET_MQ_send (zi->nc->mq,
874                   env);
875       
876   GNUNET_CONTAINER_DLL_remove (zi->nc->op_head,
877                                zi->nc->op_tail,
878                                zi);
879   GNUNET_free (zi);
880 }
881
882
883 /**
884  * Cache operation complete, clean up.
885  *
886  * @param cls the `struct CacheOperation`
887  * @param success success
888  * @param emsg error messages
889  */
890 static void
891 finish_cache_operation (void *cls,
892                         int32_t success,
893                         const char *emsg)
894 {
895   struct CacheOperation *cop = cls;
896   struct ZoneIteration *zi;
897
898   if (NULL != emsg)
899     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
900                 _("Failed to replicate block in namecache: %s\n"),
901                 emsg);
902   else
903     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
904                 "CACHE operation completed\n");
905   GNUNET_CONTAINER_DLL_remove (cop_head,
906                                cop_tail,
907                                cop);
908   if (NULL != cop->nc)
909     send_store_response (cop->nc,
910                          success,
911                          cop->rid);
912   if (NULL != (zi = cop->zi))
913     {
914       zi->cache_ops--;
915       if (0 == zi->cache_ops)
916       {
917         /* unchoke zone iteration, cache has caught up */
918         zone_iteration_done_client_continue (zi);
919       }
920     }
921   GNUNET_free (cop);
922 }
923
924
925 /**
926  * We just touched the plaintext information about a name in our zone;
927  * refresh the corresponding (encrypted) block in the namecache.
928  *
929  * @param nc client responsible for the request, can be NULL
930  * @param zi zone iteration response for the request, can be NULL
931  * @param rid request ID of the client
932  * @param zone_key private key of the zone
933  * @param name label for the records
934  * @param rd_count number of records
935  * @param rd records stored under the given @a name
936  */
937 static void
938 refresh_block (struct NamestoreClient *nc,
939                struct ZoneIteration *zi,
940                uint32_t rid,
941                const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
942                const char *name,
943                unsigned int rd_count,
944                const struct GNUNET_GNSRECORD_Data *rd)
945 {
946   struct GNUNET_GNSRECORD_Block *block;
947   struct CacheOperation *cop;
948   struct GNUNET_CRYPTO_EcdsaPublicKey pkey;
949   struct GNUNET_GNSRECORD_Data *nick;
950   struct GNUNET_GNSRECORD_Data *res;
951   unsigned int res_count;
952   struct GNUNET_TIME_Absolute exp_time;
953
954   nick = get_nick_record (zone_key);
955   res_count = rd_count;
956   res = (struct GNUNET_GNSRECORD_Data *) rd; /* fixme: a bit unclean... */
957   if (NULL != nick)
958   {
959     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
960     merge_with_nick_records (nick,
961                              rd_count,rd,
962                              &res_count,
963                              &res);
964     GNUNET_free (nick);
965   }
966   if (0 == res_count)
967   {
968     if (NULL != nc)
969       send_store_response (nc,
970                            GNUNET_OK,
971                            rid);
972     return; /* no data, no need to update cache */
973   }
974   if (GNUNET_YES == disable_namecache)
975   {
976     GNUNET_STATISTICS_update (statistics,
977                               "Namecache updates skipped (NC disabled)",
978                               1,
979                               GNUNET_NO);
980     if (NULL != nc)
981       send_store_response (nc,
982                            GNUNET_OK,
983                            rid);
984     return;
985   }
986   exp_time = GNUNET_GNSRECORD_record_get_expiration_time (res_count,
987                                                           res);
988   if (cache_keys)
989     block = GNUNET_GNSRECORD_block_create2 (zone_key,
990                                             exp_time,
991                                             name,
992                                             res,
993                                             res_count);
994   else
995     block = GNUNET_GNSRECORD_block_create (zone_key,
996                                            exp_time,
997                                            name,
998                                            res,
999                                            res_count);
1000   GNUNET_assert (NULL != block);
1001   GNUNET_CRYPTO_ecdsa_key_get_public (zone_key,
1002                                       &pkey);
1003   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1004               "Caching block for label `%s' with %u records and expiration %s in zone `%s' in namecache\n",
1005               name,
1006               res_count,
1007               GNUNET_STRINGS_absolute_time_to_string (exp_time),
1008               GNUNET_GNSRECORD_z2s (&pkey));
1009   GNUNET_STATISTICS_update (statistics,
1010                             "Namecache updates pushed",
1011                             1,
1012                             GNUNET_NO);
1013   cop = GNUNET_new (struct CacheOperation);
1014   cop->nc = nc;
1015   cop->zi = zi;
1016   if (NULL != zi)
1017     zi->cache_ops++;
1018   cop->rid = rid;
1019   GNUNET_CONTAINER_DLL_insert (cop_head,
1020                                cop_tail,
1021                                cop);
1022   cop->qe = GNUNET_NAMECACHE_block_cache (namecache,
1023                                           block,
1024                                           &finish_cache_operation,
1025                                           cop);
1026   GNUNET_free (block);
1027 }
1028
1029
1030 /**
1031  * Print a warning that one of our monitors is no longer reacting.
1032  *
1033  * @param cls a `struct ZoneMonitor` to warn about
1034  */
1035 static void
1036 warn_monitor_slow (void *cls)
1037 {
1038   struct ZoneMonitor *zm = cls;
1039
1040   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1041               "No response from monitor since %s\n",
1042               GNUNET_STRINGS_absolute_time_to_string (zm->sa_waiting_start));
1043   zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1044                                                       &warn_monitor_slow,
1045                                                       zm);
1046 }
1047
1048
1049 /**
1050  * Continue processing the @a sa.
1051  *
1052  * @param sa store activity to process
1053  */
1054 static void
1055 continue_store_activity (struct StoreActivity *sa)
1056 {
1057   const struct RecordStoreMessage *rp_msg = sa->rsm;
1058   unsigned int rd_count;
1059   size_t name_len;
1060   size_t rd_ser_len;
1061   uint32_t rid;
1062   const char *name_tmp;
1063   const char *rd_ser;
1064
1065   rid = ntohl (rp_msg->gns_header.r_id);
1066   name_len = ntohs (rp_msg->name_len);
1067   rd_count = ntohs (rp_msg->rd_count);
1068   rd_ser_len = ntohs (rp_msg->rd_len);
1069   name_tmp = (const char *) &rp_msg[1];
1070   rd_ser = &name_tmp[name_len];
1071   {
1072     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1073
1074     /* We did this before, must succeed again */
1075     GNUNET_assert (GNUNET_OK ==
1076                    GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1077                                                          rd_ser,
1078                                                          rd_count,
1079                                                          rd));
1080
1081     for (struct ZoneMonitor *zm = sa->zm_pos;
1082          NULL != zm;
1083          zm = sa->zm_pos)
1084     {
1085       if ( (0 != memcmp (&rp_msg->private_key,
1086                          &zm->zone,
1087                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) &&
1088            (0 != memcmp (&zm->zone,
1089                          &zero,
1090                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) )
1091         {
1092           sa->zm_pos = zm->next; /* not interesting to this monitor */
1093           continue;
1094         }
1095       if (zm->limit == zm->iteration_cnt)
1096       {
1097         zm->sa_waiting = GNUNET_YES;
1098         zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
1099         if (NULL != zm->sa_wait_warning)
1100           GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1101         zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1102                                                             &warn_monitor_slow,
1103                                                             zm);
1104         return; /* blocked on zone monitor */
1105       }
1106       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107                   "Notifying monitor about changes under label `%s'\n",
1108                   sa->conv_name);
1109       zm->limit--;
1110       send_lookup_response (zm->nc,
1111                             0,
1112                             &rp_msg->private_key,
1113                             sa->conv_name,
1114                             rd_count,
1115                             rd);
1116       sa->zm_pos = zm->next;
1117     }
1118     /* great, done with the monitors, unpack (again) for refresh_block operation */
1119     refresh_block (sa->nc,
1120                    NULL,
1121                    rid,
1122                    &rp_msg->private_key,
1123                    sa->conv_name,
1124                    rd_count,
1125                    rd);
1126   }
1127   GNUNET_SERVICE_client_continue (sa->nc->client);
1128   free_store_activity (sa);
1129 }
1130
1131
1132 /**
1133  * Called whenever a client is disconnected.
1134  * Frees our resources associated with that client.
1135  *
1136  * @param cls closure
1137  * @param client identification of the client
1138  * @param app_ctx the `struct NamestoreClient` of @a client
1139  */
1140 static void
1141 client_disconnect_cb (void *cls,
1142                       struct GNUNET_SERVICE_Client *client,
1143                       void *app_ctx)
1144 {
1145   struct NamestoreClient *nc = app_ctx;
1146   struct ZoneIteration *no;
1147   struct CacheOperation *cop;
1148
1149   (void) cls;
1150   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1151               "Client %p disconnected\n",
1152               client);
1153   for (struct ZoneMonitor *zm = monitor_head; NULL != zm; zm = zm->next)
1154   {
1155     struct StoreActivity *san;
1156
1157     if (nc != zm->nc)
1158       continue;
1159     GNUNET_CONTAINER_DLL_remove (monitor_head,
1160                                  monitor_tail,
1161                                  zm);
1162     if (NULL != zm->task)
1163     {
1164       GNUNET_SCHEDULER_cancel (zm->task);
1165       zm->task = NULL;
1166     }
1167     if (NULL != zm->sa_wait_warning)
1168     {
1169       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1170       zm->sa_wait_warning = NULL;
1171     }
1172     for (struct StoreActivity *sa = sa_head; NULL != sa; sa = san)
1173     {
1174       san = sa->next;
1175       if (zm == sa->zm_pos)
1176       {
1177         sa->zm_pos = zm->next;
1178         /* this may free sa */
1179         continue_store_activity (sa);
1180       }
1181     }
1182     GNUNET_free (zm);
1183     break;
1184   }
1185   for (struct StoreActivity *sa = sa_head; NULL != sa; sa = sa->next)
1186   {
1187     if (sa->nc == nc)
1188     {
1189       /* this may free sa */
1190       free_store_activity (sa);
1191       break; /* there can only be one per nc */
1192     }
1193   }
1194   while (NULL != (no = nc->op_head))
1195   {
1196     GNUNET_CONTAINER_DLL_remove (nc->op_head,
1197                                  nc->op_tail,
1198                                  no);
1199     GNUNET_free (no);
1200   }
1201   for (cop = cop_head; NULL != cop; cop = cop->next)
1202     if (nc == cop->nc)
1203       cop->nc = NULL;
1204   GNUNET_free (nc);
1205 }
1206
1207
1208 /**
1209  * Add a client to our list of active clients.
1210  *
1211  * @param cls NULL
1212  * @param client client to add
1213  * @param mq message queue for @a client
1214  * @return internal namestore client structure for this client
1215  */
1216 static void *
1217 client_connect_cb (void *cls,
1218                    struct GNUNET_SERVICE_Client *client,
1219                    struct GNUNET_MQ_Handle *mq)
1220 {
1221   struct NamestoreClient *nc;
1222
1223   (void) cls;
1224   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1225               "Client %p connected\n",
1226               client);
1227   nc = GNUNET_new (struct NamestoreClient);
1228   nc->client = client;
1229   nc->mq = mq;
1230   return nc;
1231 }
1232
1233
1234 /**
1235  * Closure for #lookup_it().
1236  */
1237 struct RecordLookupContext
1238 {
1239
1240   /**
1241    * FIXME.
1242    */
1243   const char *label;
1244
1245   /**
1246    * FIXME.
1247    */
1248   char *res_rd;
1249
1250   /**
1251    * FIXME.
1252    */
1253   struct GNUNET_GNSRECORD_Data *nick;
1254
1255   /**
1256    * FIXME.
1257    */
1258   int found;
1259
1260   /**
1261    * FIXME.
1262    */
1263   unsigned int res_rd_count;
1264
1265   /**
1266    * FIXME.
1267    */
1268   ssize_t rd_ser_len;
1269 };
1270
1271
1272 /**
1273  * FIXME.
1274  *
1275  * @param seq sequence number of the record
1276  */
1277 static void
1278 lookup_it (void *cls,
1279            uint64_t seq,
1280            const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
1281            const char *label,
1282            unsigned int rd_count,
1283            const struct GNUNET_GNSRECORD_Data *rd)
1284 {
1285   struct RecordLookupContext *rlc = cls;
1286
1287   (void) private_key;
1288   (void) seq;
1289   if (0 != strcmp (label,
1290                    rlc->label))
1291     return;
1292   rlc->found = GNUNET_YES;
1293   if (0 == rd_count)
1294   {
1295     rlc->rd_ser_len = 0;
1296     rlc->res_rd_count = 0;
1297     rlc->res_rd = NULL;
1298     return;
1299   }
1300   if ( (NULL != rlc->nick) &&
1301        (0 != strcmp (label,
1302                      GNUNET_GNS_EMPTY_LABEL_AT)) )
1303   {
1304     /* Merge */
1305     struct GNUNET_GNSRECORD_Data *rd_res;
1306     unsigned int rdc_res;
1307
1308     rd_res = NULL;
1309     rdc_res = 0;
1310     rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
1311     merge_with_nick_records (rlc->nick,
1312                              rd_count,
1313                              rd,
1314                              &rdc_res,
1315                              &rd_res);
1316     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res,
1317                                                          rd_res);
1318     if (rlc->rd_ser_len < 0)
1319     {
1320       GNUNET_break (0);
1321       GNUNET_free  (rd_res);
1322       rlc->found = GNUNET_NO;
1323       rlc->rd_ser_len = 0;
1324       return;
1325     }
1326     rlc->res_rd_count = rdc_res;
1327     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1328     if (rlc->rd_ser_len !=
1329         GNUNET_GNSRECORD_records_serialize (rdc_res,
1330                                             rd_res,
1331                                             rlc->rd_ser_len,
1332                                             rlc->res_rd))
1333     {
1334       GNUNET_break (0);
1335       GNUNET_free  (rlc->res_rd);
1336       rlc->res_rd = NULL;
1337       rlc->res_rd_count = 0;
1338       rlc->rd_ser_len = 0;
1339       GNUNET_free  (rd_res);
1340       rlc->found = GNUNET_NO;
1341       return;
1342     }
1343     GNUNET_free (rd_res);
1344     GNUNET_free (rlc->nick);
1345     rlc->nick = NULL;
1346   }
1347   else
1348   {
1349     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1350                                                          rd);
1351     if (rlc->rd_ser_len < 0)
1352     {
1353       GNUNET_break (0);
1354       rlc->found = GNUNET_NO;
1355       rlc->rd_ser_len = 0;
1356       return;
1357     }
1358     rlc->res_rd_count = rd_count;
1359     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1360     if (rlc->rd_ser_len !=
1361         GNUNET_GNSRECORD_records_serialize (rd_count,
1362                                             rd,
1363                                             rlc->rd_ser_len,
1364                                             rlc->res_rd))
1365     {
1366       GNUNET_break (0);
1367       GNUNET_free  (rlc->res_rd);
1368       rlc->res_rd = NULL;
1369       rlc->res_rd_count = 0;
1370       rlc->rd_ser_len = 0;
1371       rlc->found = GNUNET_NO;
1372       return;
1373     }
1374   }
1375 }
1376
1377
1378 /**
1379  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1380  *
1381  * @param cls client sending the message
1382  * @param ll_msg message of type `struct LabelLookupMessage`
1383  * @return #GNUNET_OK if @a ll_msg is well-formed
1384  */
1385 static int
1386 check_record_lookup (void *cls,
1387                      const struct LabelLookupMessage *ll_msg)
1388 {
1389   uint32_t name_len;
1390   size_t src_size;
1391
1392   (void) cls;
1393   name_len = ntohl (ll_msg->label_len);
1394   src_size = ntohs (ll_msg->gns_header.header.size);
1395   if (name_len != src_size - sizeof (struct LabelLookupMessage))
1396   {
1397     GNUNET_break (0);
1398     return GNUNET_SYSERR;
1399   }
1400   GNUNET_MQ_check_zero_termination (ll_msg);
1401   return GNUNET_OK;
1402 }
1403
1404
1405 /**
1406  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1407  *
1408  * @param cls client sending the message
1409  * @param ll_msg message of type `struct LabelLookupMessage`
1410  */
1411 static void
1412 handle_record_lookup (void *cls,
1413                       const struct LabelLookupMessage *ll_msg)
1414 {
1415   struct NamestoreClient *nc = cls;
1416   struct GNUNET_MQ_Envelope *env;
1417   struct LabelLookupResponseMessage *llr_msg;
1418   struct RecordLookupContext rlc;
1419   const char *name_tmp;
1420   char *res_name;
1421   char *conv_name;
1422   uint32_t name_len;
1423   int res;
1424
1425   name_len = ntohl (ll_msg->label_len);
1426   name_tmp = (const char *) &ll_msg[1];
1427   GNUNET_SERVICE_client_continue (nc->client);
1428   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1429               "Received NAMESTORE_RECORD_LOOKUP message for name `%s'\n",
1430               name_tmp);
1431
1432   conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1433   if (NULL == conv_name)
1434   {
1435     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1436                 "Error converting name `%s'\n",
1437                 name_tmp);
1438     GNUNET_SERVICE_client_drop (nc->client);
1439     return;
1440   }
1441   rlc.label = conv_name;
1442   rlc.found = GNUNET_NO;
1443   rlc.res_rd_count = 0;
1444   rlc.res_rd = NULL;
1445   rlc.rd_ser_len = 0;
1446   rlc.nick = get_nick_record (&ll_msg->zone);
1447   res = GSN_database->lookup_records (GSN_database->cls,
1448                                       &ll_msg->zone,
1449                                       conv_name,
1450                                       &lookup_it,
1451                                       &rlc);
1452   GNUNET_free (conv_name);
1453   env = GNUNET_MQ_msg_extra (llr_msg,
1454                              name_len + rlc.rd_ser_len,
1455                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP_RESPONSE);
1456   llr_msg->gns_header.r_id = ll_msg->gns_header.r_id;
1457   llr_msg->private_key = ll_msg->zone;
1458   llr_msg->name_len = htons (name_len);
1459   llr_msg->rd_count = htons (rlc.res_rd_count);
1460   llr_msg->rd_len = htons (rlc.rd_ser_len);
1461   res_name = (char *) &llr_msg[1];
1462   if  ((GNUNET_YES == rlc.found) && (GNUNET_OK == res))
1463     llr_msg->found = ntohs (GNUNET_YES);
1464   else
1465     llr_msg->found = ntohs (GNUNET_NO);
1466   GNUNET_memcpy (&llr_msg[1],
1467                  name_tmp,
1468                  name_len);
1469   GNUNET_memcpy (&res_name[name_len],
1470                  rlc.res_rd,
1471                  rlc.rd_ser_len);
1472   GNUNET_MQ_send (nc->mq,
1473                   env);
1474   GNUNET_free_non_null (rlc.res_rd);
1475 }
1476
1477
1478 /**
1479  * Checks a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1480  *
1481  * @param cls client sending the message
1482  * @param rp_msg message of type `struct RecordStoreMessage`
1483  * @return #GNUNET_OK if @a rp_msg is well-formed
1484  */
1485 static int
1486 check_record_store (void *cls,
1487                     const struct RecordStoreMessage *rp_msg)
1488 {
1489   size_t name_len;
1490   size_t msg_size;
1491   size_t msg_size_exp;
1492   size_t rd_ser_len;
1493   const char *name_tmp;
1494
1495   (void) cls;
1496   name_len = ntohs (rp_msg->name_len);
1497   msg_size = ntohs (rp_msg->gns_header.header.size);
1498   rd_ser_len = ntohs (rp_msg->rd_len);
1499   msg_size_exp = sizeof (struct RecordStoreMessage) + name_len + rd_ser_len;
1500   if (msg_size != msg_size_exp)
1501   {
1502     GNUNET_break (0);
1503     return GNUNET_SYSERR;
1504   }
1505   if ( (0 == name_len) ||
1506        (name_len > MAX_NAME_LEN) )
1507   {
1508     GNUNET_break (0);
1509     return GNUNET_SYSERR;
1510   }
1511   name_tmp = (const char *) &rp_msg[1];
1512   if ('\0' != name_tmp[name_len -1])
1513   {
1514     GNUNET_break (0);
1515     return GNUNET_SYSERR;
1516   }
1517   return GNUNET_OK;
1518 }
1519
1520
1521 /**
1522  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1523  *
1524  * @param cls client sending the message
1525  * @param rp_msg message of type `struct RecordStoreMessage`
1526  */
1527 static void
1528 handle_record_store (void *cls,
1529                      const struct RecordStoreMessage *rp_msg)
1530 {
1531   struct NamestoreClient *nc = cls;
1532   size_t name_len;
1533   size_t rd_ser_len;
1534   uint32_t rid;
1535   const char *name_tmp;
1536   char *conv_name;
1537   const char *rd_ser;
1538   unsigned int rd_count;
1539   int res;
1540   struct StoreActivity *sa;
1541
1542   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1543               "Received NAMESTORE_RECORD_STORE message\n");
1544   rid = ntohl (rp_msg->gns_header.r_id);
1545   name_len = ntohs (rp_msg->name_len);
1546   rd_count = ntohs (rp_msg->rd_count);
1547   rd_ser_len = ntohs (rp_msg->rd_len);
1548   GNUNET_break (0 == ntohs (rp_msg->reserved));
1549   name_tmp = (const char *) &rp_msg[1];
1550   rd_ser = &name_tmp[name_len];
1551   {
1552     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1553
1554     if (GNUNET_OK !=
1555         GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1556                                               rd_ser,
1557                                               rd_count,
1558                                               rd))
1559     {
1560       GNUNET_break (0);
1561       GNUNET_SERVICE_client_drop (nc->client);
1562       return;
1563     }
1564
1565     /* Extracting and converting private key */
1566     conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1567     if (NULL == conv_name)
1568     {
1569       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1570                   "Error converting name `%s'\n",
1571                   name_tmp);
1572       GNUNET_SERVICE_client_drop (nc->client);
1573       return;
1574     }
1575     GNUNET_STATISTICS_update (statistics,
1576                               "Well-formed store requests received",
1577                               1,
1578                               GNUNET_NO);
1579     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1580                 "Creating %u records for name `%s'\n",
1581                 (unsigned int) rd_count,
1582                 conv_name);
1583     if ( (0 == rd_count) &&
1584          (GNUNET_NO ==
1585           GSN_database->lookup_records (GSN_database->cls,
1586                                         &rp_msg->private_key,
1587                                         conv_name,
1588                                         NULL,
1589                                         0)) )
1590     {
1591       /* This name does not exist, so cannot be removed */
1592       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1593                   "Name `%s' does not exist, no deletion required\n",
1594                   conv_name);
1595       res = GNUNET_NO;
1596     }
1597     else
1598     {
1599       /* remove "NICK" records, unless this is for the
1600          #GNUNET_GNS_EMPTY_LABEL_AT label */
1601       struct GNUNET_GNSRECORD_Data rd_clean[GNUNET_NZL(rd_count)];
1602       unsigned int rd_clean_off;
1603
1604       rd_clean_off = 0;
1605       for (unsigned int i=0;i<rd_count;i++)
1606       {
1607         rd_clean[rd_clean_off] = rd[i];
1608         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1609                            conv_name)) ||
1610              (GNUNET_GNSRECORD_TYPE_NICK != rd[i].record_type) )
1611           rd_clean_off++;
1612         
1613         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1614                            conv_name)) &&
1615              (GNUNET_GNSRECORD_TYPE_NICK == rd[i].record_type) )
1616           cache_nick (&rp_msg->private_key,
1617                       &rd[i]);
1618       }
1619       res = GSN_database->store_records (GSN_database->cls,
1620                                          &rp_msg->private_key,
1621                                          conv_name,
1622                                          rd_clean_off,
1623                                          rd_clean);
1624     }
1625
1626     if (GNUNET_OK != res)
1627     {
1628       /* store not successful, not need to tell monitors */
1629       send_store_response (nc,
1630                            res,
1631                            rid);
1632       GNUNET_SERVICE_client_continue (nc->client);
1633       GNUNET_free (conv_name);
1634       return;
1635     }
1636
1637     sa = GNUNET_malloc (sizeof (struct StoreActivity) +
1638                         ntohs (rp_msg->gns_header.header.size));
1639     GNUNET_CONTAINER_DLL_insert (sa_head,
1640                                  sa_tail,
1641                                  sa);
1642     sa->nc = nc;
1643     sa->rsm = (const struct RecordStoreMessage *) &sa[1];
1644     GNUNET_memcpy (&sa[1],
1645                    rp_msg,
1646                    ntohs (rp_msg->gns_header.header.size));
1647     sa->zm_pos = monitor_head;
1648     sa->conv_name = conv_name;
1649     continue_store_activity (sa);
1650   }
1651 }
1652
1653
1654 /**
1655  * Context for record remove operations passed from #handle_zone_to_name to
1656  * #handle_zone_to_name_it as closure
1657  */
1658 struct ZoneToNameCtx
1659 {
1660   /**
1661    * Namestore client
1662    */
1663   struct NamestoreClient *nc;
1664
1665   /**
1666    * Request id (to be used in the response to the client).
1667    */
1668   uint32_t rid;
1669
1670   /**
1671    * Set to #GNUNET_OK on success, #GNUNET_SYSERR on error.  Note that
1672    * not finding a name for the zone still counts as a 'success' here,
1673    * as this field is about the success of executing the IPC protocol.
1674    */
1675   int success;
1676 };
1677
1678
1679 /**
1680  * Zone to name iterator
1681  *
1682  * @param cls struct ZoneToNameCtx *
1683  * @param seq sequence number of the record
1684  * @param zone_key the zone key
1685  * @param name name
1686  * @param rd_count number of records in @a rd
1687  * @param rd record data
1688  */
1689 static void
1690 handle_zone_to_name_it (void *cls,
1691                         uint64_t seq,
1692                         const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1693                         const char *name,
1694                         unsigned int rd_count,
1695                         const struct GNUNET_GNSRECORD_Data *rd)
1696 {
1697   struct ZoneToNameCtx *ztn_ctx = cls;
1698   struct GNUNET_MQ_Envelope *env;
1699   struct ZoneToNameResponseMessage *ztnr_msg;
1700   int16_t res;
1701   size_t name_len;
1702   ssize_t rd_ser_len;
1703   size_t msg_size;
1704   char *name_tmp;
1705   char *rd_tmp;
1706
1707   (void) seq;
1708   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1709               "Found result for zone-to-name lookup: `%s'\n",
1710               name);
1711   res = GNUNET_YES;
1712   name_len = (NULL == name) ? 0 : strlen (name) + 1;
1713   rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1714                                                   rd);
1715   if (rd_ser_len < 0)
1716   {
1717     GNUNET_break (0);
1718     ztn_ctx->success = GNUNET_SYSERR;
1719     return;
1720   }
1721   msg_size = sizeof (struct ZoneToNameResponseMessage) + name_len + rd_ser_len;
1722   if (msg_size >= GNUNET_MAX_MESSAGE_SIZE)
1723   {
1724     GNUNET_break (0);
1725     ztn_ctx->success = GNUNET_SYSERR;
1726     return;
1727   }
1728   env = GNUNET_MQ_msg_extra (ztnr_msg,
1729                              name_len + rd_ser_len,
1730                              GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1731   ztnr_msg->gns_header.header.size = htons (msg_size);
1732   ztnr_msg->gns_header.r_id = htonl (ztn_ctx->rid);
1733   ztnr_msg->res = htons (res);
1734   ztnr_msg->rd_len = htons (rd_ser_len);
1735   ztnr_msg->rd_count = htons (rd_count);
1736   ztnr_msg->name_len = htons (name_len);
1737   ztnr_msg->zone = *zone_key;
1738   name_tmp = (char *) &ztnr_msg[1];
1739   GNUNET_memcpy (name_tmp,
1740                  name,
1741                  name_len);
1742   rd_tmp = &name_tmp[name_len];
1743   GNUNET_assert (rd_ser_len ==
1744                  GNUNET_GNSRECORD_records_serialize (rd_count,
1745                                                      rd,
1746                                                      rd_ser_len,
1747                                                      rd_tmp));
1748   ztn_ctx->success = GNUNET_OK;
1749   GNUNET_MQ_send (ztn_ctx->nc->mq,
1750                   env);
1751 }
1752
1753
1754 /**
1755  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME message
1756  *
1757  * @param cls client client sending the message
1758  * @param ztn_msg message of type 'struct ZoneToNameMessage'
1759  */
1760 static void
1761 handle_zone_to_name (void *cls,
1762                      const struct ZoneToNameMessage *ztn_msg)
1763 {
1764   struct NamestoreClient *nc = cls;
1765   struct ZoneToNameCtx ztn_ctx;
1766   struct GNUNET_MQ_Envelope *env;
1767   struct ZoneToNameResponseMessage *ztnr_msg;
1768
1769   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1770               "Received ZONE_TO_NAME message\n");
1771   ztn_ctx.rid = ntohl (ztn_msg->gns_header.r_id);
1772   ztn_ctx.nc = nc;
1773   ztn_ctx.success = GNUNET_NO;
1774   if (GNUNET_SYSERR ==
1775       GSN_database->zone_to_name (GSN_database->cls,
1776                                   &ztn_msg->zone,
1777                                   &ztn_msg->value_zone,
1778                                   &handle_zone_to_name_it, &ztn_ctx))
1779   {
1780     /* internal error, hang up instead of signalling something
1781        that might be wrong */
1782     GNUNET_break (0);
1783     GNUNET_SERVICE_client_drop (nc->client);
1784     return;
1785   }
1786   if (GNUNET_NO == ztn_ctx.success)
1787   {
1788     /* no result found, send empty response */
1789     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1790                 "Found no result for zone-to-name lookup.\n");
1791     env = GNUNET_MQ_msg (ztnr_msg,
1792                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1793     ztnr_msg->gns_header.r_id = ztn_msg->gns_header.r_id;
1794     ztnr_msg->res = htons (GNUNET_NO);
1795     GNUNET_MQ_send (nc->mq,
1796                     env);
1797   }
1798   GNUNET_SERVICE_client_continue (nc->client);
1799 }
1800
1801
1802 /**
1803  * Context for record remove operations passed from
1804  * #run_zone_iteration_round to #zone_iterate_proc as closure
1805  */
1806 struct ZoneIterationProcResult
1807 {
1808   /**
1809    * The zone iteration handle
1810    */
1811   struct ZoneIteration *zi;
1812
1813   /**
1814    * Number of results left to be returned in this iteration.
1815    */
1816   uint64_t limit;
1817
1818 };
1819
1820
1821 /**
1822  * Process results for zone iteration from database
1823  *
1824  * @param cls struct ZoneIterationProcResult
1825  * @param seq sequence number of the record
1826  * @param zone_key the zone key
1827  * @param name name
1828  * @param rd_count number of records for this name
1829  * @param rd record data
1830  */
1831 static void
1832 zone_iterate_proc (void *cls,
1833                    uint64_t seq,
1834                    const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1835                    const char *name,
1836                    unsigned int rd_count,
1837                    const struct GNUNET_GNSRECORD_Data *rd)
1838 {
1839   struct ZoneIterationProcResult *proc = cls;
1840   int do_refresh_block;
1841
1842   if ( (NULL == zone_key) &&
1843        (NULL == name) )
1844   {
1845     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1846                 "Iteration done\n");
1847     return;
1848   }
1849   if ( (NULL == zone_key) ||
1850        (NULL == name) )
1851   {
1852     /* what is this!? should never happen */
1853     GNUNET_break (0);
1854     return;
1855   }
1856   if (0 == proc->limit)
1857   {
1858     /* what is this!? should never happen */
1859     GNUNET_break (0);
1860     return;
1861   }
1862   proc->limit--;
1863   proc->zi->seq = seq;
1864   send_lookup_response (proc->zi->nc,
1865                         proc->zi->request_id,
1866                         zone_key,
1867                         name,
1868                         rd_count,
1869                         rd);
1870
1871
1872   do_refresh_block = GNUNET_NO;
1873   for (unsigned int i=0;i<rd_count;i++)
1874     if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
1875     {
1876       do_refresh_block = GNUNET_YES;
1877       break;
1878     }
1879   if (GNUNET_YES == do_refresh_block)    
1880     refresh_block (NULL,
1881                    proc->zi,
1882                    0,
1883                    zone_key,
1884                    name,
1885                    rd_count,
1886                    rd);
1887 }
1888
1889
1890 /**
1891  * Perform the next round of the zone iteration.
1892  *
1893  * @param zi zone iterator to process
1894  * @param limit number of results to return in one pass
1895  */
1896 static void
1897 run_zone_iteration_round (struct ZoneIteration *zi,
1898                           uint64_t limit)
1899 {
1900   struct ZoneIterationProcResult proc;
1901   struct GNUNET_TIME_Absolute start;
1902   struct GNUNET_TIME_Relative duration;
1903
1904   memset (&proc,
1905           0,
1906           sizeof (proc));
1907   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1908               "Asked to return up to %llu records at position %llu\n",
1909               (unsigned long long) limit,
1910               (unsigned long long) zi->seq);
1911   proc.zi = zi;
1912   proc.limit = limit;
1913   start = GNUNET_TIME_absolute_get ();
1914   GNUNET_break (GNUNET_SYSERR !=
1915                 GSN_database->iterate_records (GSN_database->cls,
1916                                                (0 == memcmp (&zi->zone,
1917                                                              &zero,
1918                                                              sizeof (zero)))
1919                                                ? NULL
1920                                                : &zi->zone,
1921                                                zi->seq,
1922                                                limit,
1923                                                &zone_iterate_proc,
1924                                                &proc));
1925   duration = GNUNET_TIME_absolute_get_duration (start);
1926   duration = GNUNET_TIME_relative_divide (duration,
1927                                           limit - proc.limit);
1928   GNUNET_STATISTICS_set (statistics,
1929                          "NAMESTORE iteration delay (μs/record)",
1930                          duration.rel_value_us,
1931                          GNUNET_NO);
1932   if (0 == proc.limit)
1933     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1934                 "Returned %llu results, more results available\n",
1935                 (unsigned long long) limit);
1936   zi->send_end = (0 != proc.limit);
1937   if (0 == zi->cache_ops)
1938     zone_iteration_done_client_continue (zi);
1939 }
1940
1941
1942 /**
1943  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START message
1944  *
1945  * @param cls the client sending the message
1946  * @param zis_msg message from the client
1947  */
1948 static void
1949 handle_iteration_start (void *cls,
1950                         const struct ZoneIterationStartMessage *zis_msg)
1951 {
1952   struct NamestoreClient *nc = cls;
1953   struct ZoneIteration *zi;
1954
1955   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1956               "Received ZONE_ITERATION_START message\n");
1957   zi = GNUNET_new (struct ZoneIteration);
1958   zi->request_id = ntohl (zis_msg->gns_header.r_id);
1959   zi->offset = 0;
1960   zi->nc = nc;
1961   zi->zone = zis_msg->zone;
1962
1963   GNUNET_CONTAINER_DLL_insert (nc->op_head,
1964                                nc->op_tail,
1965                                zi);
1966   run_zone_iteration_round (zi,
1967                             1);
1968 }
1969
1970
1971 /**
1972  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP message
1973  *
1974  * @param cls the client sending the message
1975  * @param zis_msg message from the client
1976  */
1977 static void
1978 handle_iteration_stop (void *cls,
1979                        const struct ZoneIterationStopMessage *zis_msg)
1980 {
1981   struct NamestoreClient *nc = cls;
1982   struct ZoneIteration *zi;
1983   uint32_t rid;
1984
1985   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1986               "Received ZONE_ITERATION_STOP message\n");
1987   rid = ntohl (zis_msg->gns_header.r_id);
1988   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1989     if (zi->request_id == rid)
1990       break;
1991   if (NULL == zi)
1992   {
1993     GNUNET_break (0);
1994     GNUNET_SERVICE_client_drop (nc->client);
1995     return;
1996   }
1997   GNUNET_CONTAINER_DLL_remove (nc->op_head,
1998                                nc->op_tail,
1999                                zi);
2000   GNUNET_free (zi);
2001   GNUNET_SERVICE_client_continue (nc->client);
2002 }
2003
2004
2005 /**
2006  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message
2007  *
2008  * @param cls the client sending the message
2009  * @param message message from the client
2010  */
2011 static void
2012 handle_iteration_next (void *cls,
2013                        const struct ZoneIterationNextMessage *zis_msg)
2014 {
2015   struct NamestoreClient *nc = cls;
2016   struct ZoneIteration *zi;
2017   uint32_t rid;
2018   uint64_t limit;
2019
2020   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2021               "Received ZONE_ITERATION_NEXT message\n");
2022   GNUNET_STATISTICS_update (statistics,
2023                             "Iteration NEXT messages received",
2024                             1,
2025                             GNUNET_NO);
2026   rid = ntohl (zis_msg->gns_header.r_id);
2027   limit = GNUNET_ntohll (zis_msg->limit);
2028   for (zi = nc->op_head; NULL != zi; zi = zi->next)
2029     if (zi->request_id == rid)
2030       break;
2031   if (NULL == zi)
2032   {
2033     GNUNET_break (0);
2034     GNUNET_SERVICE_client_drop (nc->client);
2035     return;
2036   }
2037   run_zone_iteration_round (zi,
2038                             limit);
2039 }
2040
2041
2042 /**
2043  * Function called when the monitor is ready for more data, and we
2044  * should thus unblock PUT operations that were blocked on the
2045  * monitor not being ready.
2046  */
2047 static void
2048 monitor_unblock (struct ZoneMonitor *zm)
2049 {
2050   struct StoreActivity *sa = sa_head;
2051
2052   while ( (NULL != sa) &&
2053           (zm->limit > zm->iteration_cnt) )
2054   {
2055     struct StoreActivity *sn = sa->next;
2056
2057     if (sa->zm_pos == zm)
2058       continue_store_activity (sa);
2059     sa = sn;
2060   }
2061   if (zm->limit > zm->iteration_cnt)
2062   {
2063     zm->sa_waiting = GNUNET_NO;
2064     if (NULL != zm->sa_wait_warning)
2065     {
2066       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2067       zm->sa_wait_warning = NULL;
2068     }
2069   }
2070   else if (GNUNET_YES == zm->sa_waiting)
2071   {
2072     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2073     if (NULL != zm->sa_wait_warning)
2074       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2075     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2076                                                         &warn_monitor_slow,
2077                                                         zm);
2078   }
2079 }
2080
2081
2082 /**
2083  * Send 'sync' message to zone monitor, we're now in sync.
2084  *
2085  * @param zm monitor that is now in sync
2086  */
2087 static void
2088 monitor_sync (struct ZoneMonitor *zm)
2089 {
2090   struct GNUNET_MQ_Envelope *env;
2091   struct GNUNET_MessageHeader *sync;
2092
2093   env = GNUNET_MQ_msg (sync,
2094                        GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC);
2095   GNUNET_MQ_send (zm->nc->mq,
2096                   env);
2097   /* mark iteration done */
2098   zm->in_first_iteration = GNUNET_NO;
2099   zm->iteration_cnt = 0;
2100   if ( (zm->limit > 0) &&
2101        (zm->sa_waiting) )
2102     monitor_unblock (zm);
2103 }
2104
2105
2106 /**
2107  * Obtain the next datum during the zone monitor's zone initial iteration.
2108  *
2109  * @param cls zone monitor that does its initial iteration
2110  */
2111 static void
2112 monitor_iteration_next (void *cls);
2113
2114
2115 /**
2116  * A #GNUNET_NAMESTORE_RecordIterator for monitors.
2117  *
2118  * @param cls a 'struct ZoneMonitor *' with information about the monitor
2119  * @param seq sequence number of the record
2120  * @param zone_key zone key of the zone
2121  * @param name name
2122  * @param rd_count number of records in @a rd
2123  * @param rd array of records
2124  */
2125 static void
2126 monitor_iterate_cb (void *cls,
2127                     uint64_t seq,
2128                     const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
2129                     const char *name,
2130                     unsigned int rd_count,
2131                     const struct GNUNET_GNSRECORD_Data *rd)
2132 {
2133   struct ZoneMonitor *zm = cls;
2134
2135   zm->seq = seq;
2136   GNUNET_assert (NULL != name);
2137   GNUNET_STATISTICS_update (statistics,
2138                             "Monitor notifications sent",
2139                             1,
2140                             GNUNET_NO);
2141   zm->limit--;
2142   zm->iteration_cnt--;
2143   send_lookup_response (zm->nc,
2144                         0,
2145                         zone_key,
2146                         name,
2147                         rd_count,
2148                         rd);
2149   if ( (0 == zm->iteration_cnt) &&
2150        (0 != zm->limit) )
2151   {
2152     /* We are done with the current iteration batch, AND the
2153        client would right now accept more, so go again! */
2154     GNUNET_assert (NULL == zm->task);
2155     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2156                                          zm);
2157   }
2158 }
2159
2160
2161 /**
2162  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START message
2163  *
2164  * @param cls the client sending the message
2165  * @param zis_msg message from the client
2166  */
2167 static void
2168 handle_monitor_start (void *cls,
2169                       const struct ZoneMonitorStartMessage *zis_msg)
2170 {
2171   struct NamestoreClient *nc = cls;
2172   struct ZoneMonitor *zm;
2173
2174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2175               "Received ZONE_MONITOR_START message\n");
2176   zm = GNUNET_new (struct ZoneMonitor);
2177   zm->nc = nc;
2178   zm->zone = zis_msg->zone;
2179   zm->limit = 1;
2180   zm->in_first_iteration = (GNUNET_YES == ntohl (zis_msg->iterate_first));
2181   GNUNET_CONTAINER_DLL_insert (monitor_head,
2182                                monitor_tail,
2183                                zm);
2184   GNUNET_SERVICE_client_mark_monitor (nc->client);
2185   GNUNET_SERVICE_client_continue (nc->client);
2186   GNUNET_notification_context_add (monitor_nc,
2187                                    nc->mq);
2188   if (zm->in_first_iteration)
2189     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2190                                          zm);
2191   else
2192     monitor_sync (zm);
2193 }
2194
2195
2196 /**
2197  * Obtain the next datum during the zone monitor's zone initial iteration.
2198  *
2199  * @param cls zone monitor that does its initial iteration
2200  */
2201 static void
2202 monitor_iteration_next (void *cls)
2203 {
2204   struct ZoneMonitor *zm = cls;
2205   int ret;
2206
2207   zm->task = NULL;
2208   GNUNET_assert (0 == zm->iteration_cnt);
2209   if (zm->limit > 16)
2210     zm->iteration_cnt = zm->limit / 2; /* leave half for monitor events */
2211   else
2212     zm->iteration_cnt = zm->limit; /* use it all */
2213   ret = GSN_database->iterate_records (GSN_database->cls,
2214                                        (0 == memcmp (&zm->zone,
2215                                                      &zero,
2216                                                      sizeof (zero)))
2217                                        ? NULL
2218                                        : &zm->zone,
2219                                        zm->seq,
2220                                        zm->iteration_cnt,
2221                                        &monitor_iterate_cb,
2222                                        zm);
2223   if (GNUNET_SYSERR == ret)
2224   {
2225     GNUNET_SERVICE_client_drop (zm->nc->client);
2226     return;
2227   }
2228   if (GNUNET_NO == ret)
2229   {
2230     /* empty zone */
2231     monitor_sync (zm);
2232     return;
2233   }
2234 }
2235
2236
2237 /**
2238  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT message
2239  *
2240  * @param cls the client sending the message
2241  * @param nm message from the client
2242  */
2243 static void
2244 handle_monitor_next (void *cls,
2245                      const struct ZoneMonitorNextMessage *nm)
2246 {
2247   struct NamestoreClient *nc = cls;
2248   struct ZoneMonitor *zm;
2249   uint64_t inc;
2250
2251   inc = GNUNET_ntohll (nm->limit);
2252   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2253               "Received ZONE_MONITOR_NEXT message with limit %llu\n",
2254               (unsigned long long) inc);
2255   for (zm = monitor_head; NULL != zm; zm = zm->next)
2256     if (zm->nc == nc)
2257       break;
2258   if (NULL == zm)
2259   {
2260     GNUNET_break (0);
2261     GNUNET_SERVICE_client_drop (nc->client);
2262     return;
2263   }
2264   GNUNET_SERVICE_client_continue (nc->client);
2265   if (zm->limit + inc < zm->limit)
2266   {
2267     GNUNET_break (0);
2268     GNUNET_SERVICE_client_drop (nc->client);
2269     return;
2270   }
2271   zm->limit += inc;
2272   if ( (zm->in_first_iteration) &&
2273        (zm->limit == inc) )
2274   {
2275     /* We are still iterating, and the previous iteration must
2276        have stopped due to the client's limit, so continue it! */
2277     GNUNET_assert (NULL == zm->task);
2278     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2279                                          zm);
2280   }
2281   GNUNET_assert (zm->iteration_cnt <= zm->limit);
2282   if ( (zm->limit > zm->iteration_cnt) &&
2283        (zm->sa_waiting) )
2284   {
2285     monitor_unblock (zm);
2286   }
2287   else if (GNUNET_YES == zm->sa_waiting)
2288   {
2289     if (NULL != zm->sa_wait_warning)
2290       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2291     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2292     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2293                                                         &warn_monitor_slow,
2294                                                         zm);
2295   }
2296 }
2297
2298
2299 /**
2300  * Process namestore requests.
2301  *
2302  * @param cls closure
2303  * @param cfg configuration to use
2304  * @param service the initialized service
2305  */
2306 static void
2307 run (void *cls,
2308      const struct GNUNET_CONFIGURATION_Handle *cfg,
2309      struct GNUNET_SERVICE_Handle *service)
2310 {
2311   char *database;
2312
2313   (void) cls;
2314   (void) service;
2315   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2316               "Starting namestore service\n");
2317   cache_keys = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2318                                                      "namestore",
2319                                                      "CACHE_KEYS");
2320   disable_namecache = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2321                                                             "namecache",
2322                                                             "DISABLE");
2323   GSN_cfg = cfg;
2324   monitor_nc = GNUNET_notification_context_create (1);
2325   if (GNUNET_YES != disable_namecache)
2326   {
2327     namecache = GNUNET_NAMECACHE_connect (cfg);
2328     GNUNET_assert (NULL != namecache);
2329   }
2330   /* Loading database plugin */
2331   if (GNUNET_OK !=
2332       GNUNET_CONFIGURATION_get_value_string (cfg,
2333                                              "namestore",
2334                                              "database",
2335                                              &database))
2336     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2337                 "No database backend configured\n");
2338
2339   GNUNET_asprintf (&db_lib_name,
2340                    "libgnunet_plugin_namestore_%s",
2341                    database);
2342   GSN_database = GNUNET_PLUGIN_load (db_lib_name,
2343                                      (void *) GSN_cfg);
2344   GNUNET_free (database);
2345   statistics = GNUNET_STATISTICS_create ("namestore",
2346                                          cfg);
2347   GNUNET_SCHEDULER_add_shutdown (&cleanup_task,
2348                                  NULL);
2349   if (NULL == GSN_database)
2350   {
2351     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2352                 "Could not load database backend `%s'\n",
2353                 db_lib_name);
2354     GNUNET_SCHEDULER_shutdown ();
2355     return;
2356   }
2357 }
2358
2359
2360 /**
2361  * Define "main" method using service macro.
2362  */
2363 GNUNET_SERVICE_MAIN
2364 ("namestore",
2365  GNUNET_SERVICE_OPTION_NONE,
2366  &run,
2367  &client_connect_cb,
2368  &client_disconnect_cb,
2369  NULL,
2370  GNUNET_MQ_hd_var_size (record_store,
2371                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE,
2372                         struct RecordStoreMessage,
2373                         NULL),
2374  GNUNET_MQ_hd_var_size (record_lookup,
2375                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP,
2376                         struct LabelLookupMessage,
2377                         NULL),
2378  GNUNET_MQ_hd_fixed_size (zone_to_name,
2379                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME,
2380                           struct ZoneToNameMessage,
2381                           NULL),
2382  GNUNET_MQ_hd_fixed_size (iteration_start,
2383                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START,
2384                           struct ZoneIterationStartMessage,
2385                           NULL),
2386  GNUNET_MQ_hd_fixed_size (iteration_next,
2387                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT,
2388                           struct ZoneIterationNextMessage,
2389                           NULL),
2390  GNUNET_MQ_hd_fixed_size (iteration_stop,
2391                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP,
2392                           struct ZoneIterationStopMessage,
2393                           NULL),
2394  GNUNET_MQ_hd_fixed_size (monitor_start,
2395                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START,
2396                           struct ZoneMonitorStartMessage,
2397                           NULL),
2398  GNUNET_MQ_hd_fixed_size (monitor_next,
2399                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT,
2400                           struct ZoneMonitorNextMessage,
2401                           NULL),
2402  GNUNET_MQ_handler_end ());
2403
2404
2405 /* end of gnunet-service-namestore.c */