try helping rexxnor with flow control
[oweals/gnunet.git] / src / namestore / gnunet-service-namestore.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013, 2014, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file namestore/gnunet-service-namestore.c
21  * @brief namestore for the GNUnet naming system
22  * @author Matthias Wachs
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - "get_nick_record" is a bottleneck, introduce a cache to
27  *   avoid looking it up again and again (for the same few
28  *   zones that the user will typically manage!)
29  * - run testcases, make sure everything works!
30  */
31 #include "platform.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_dnsparser_lib.h"
34 #include "gnunet_gns_service.h"
35 #include "gnunet_namecache_service.h"
36 #include "gnunet_namestore_service.h"
37 #include "gnunet_namestore_plugin.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_signatures.h"
40 #include "namestore.h"
41
42 #define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
43
44 /**
45  * If a monitor takes more than 1 minute to process an event, print a warning.
46  */
47 #define MONITOR_STALL_WARN_DELAY GNUNET_TIME_UNIT_MINUTES
48
49 /**
50  * Size of the cache used by #get_nick_record()
51  */
52 #define NC_SIZE 16
53
54 /**
55  * A namestore client
56  */
57 struct NamestoreClient;
58
59
60 /**
61  * A namestore iteration operation.
62  */
63 struct ZoneIteration
64 {
65   /**
66    * Next element in the DLL
67    */
68   struct ZoneIteration *next;
69
70   /**
71    * Previous element in the DLL
72    */
73   struct ZoneIteration *prev;
74
75   /**
76    * Namestore client which intiated this zone iteration
77    */
78   struct NamestoreClient *nc;
79
80   /**
81    * The nick to add to the records
82    */
83   struct GNUNET_GNSRECORD_Data *nick;
84
85   /**
86    * Key of the zone we are iterating over.
87    */
88   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
89
90   /**
91    * Last sequence number in the zone iteration used to address next
92    * result of the zone iteration in the store
93    *
94    * Initialy set to 0.
95    * Updated in #zone_iterate_proc()
96    */
97   uint64_t seq;
98
99   /**
100    * The operation id fot the zone iteration in the response for the client
101    */
102   uint32_t request_id;
103
104   /**
105    * Offset of the zone iteration used to address next result of the zone
106    * iteration in the store
107    *
108    * Initialy set to 0 in #handle_iteration_start
109    * Incremented with by every call to #handle_iteration_next
110    */
111   uint32_t offset;
112
113   /**
114    * Number of pending cache operations triggered by this zone iteration which we
115    * need to wait for before allowing the client to continue.
116    */
117   unsigned int cache_ops;
118
119   /**
120    * Set to #GNUNET_YES if the last iteration exhausted the limit set by the
121    * client and we should send the #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END
122    * message and free the data structure once @e cache_ops is zero.
123    */
124   int send_end;
125   
126 };
127
128
129 /**
130  * A namestore client
131  */
132 struct NamestoreClient
133 {
134
135   /**
136    * The client
137    */
138   struct GNUNET_SERVICE_Client *client;
139
140   /**
141    * Message queue for transmission to @e client
142    */
143   struct GNUNET_MQ_Handle *mq;
144
145   /**
146    * Head of the DLL of
147    * Zone iteration operations in progress initiated by this client
148    */
149   struct ZoneIteration *op_head;
150
151   /**
152    * Tail of the DLL of
153    * Zone iteration operations in progress initiated by this client
154    */
155   struct ZoneIteration *op_tail;
156 };
157
158
159 /**
160  * A namestore monitor.
161  */
162 struct ZoneMonitor
163 {
164   /**
165    * Next element in the DLL
166    */
167   struct ZoneMonitor *next;
168
169   /**
170    * Previous element in the DLL
171    */
172   struct ZoneMonitor *prev;
173
174   /**
175    * Namestore client which intiated this zone monitor
176    */
177   struct NamestoreClient *nc;
178
179   /**
180    * Private key of the zone.
181    */
182   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
183
184   /**
185    * Task active during initial iteration.
186    */
187   struct GNUNET_SCHEDULER_Task *task;
188
189   /**
190    * Task to warn about slow monitors.
191    */
192   struct GNUNET_SCHEDULER_Task *sa_wait_warning;
193
194   /**
195    * Since when are we blocked on this monitor?
196    */
197   struct GNUNET_TIME_Absolute sa_waiting_start;
198
199   /**
200    * Last sequence number in the zone iteration used to address next
201    * result of the zone iteration in the store
202    *
203    * Initialy set to 0.
204    * Updated in #monitor_iterate_cb()
205    */
206   uint64_t seq;
207
208   /**
209    * Current limit of how many more messages we are allowed
210    * to queue to this monitor.
211    */
212   uint64_t limit;
213
214   /**
215    * How many more requests may we receive from the iterator
216    * before it is at the limit we gave it?  Will be below or
217    * equal to @e limit.  The effective limit for monitor
218    * events is thus @e iteration_cnt - @e limit!
219    */
220   uint64_t iteration_cnt;
221
222   /**
223    * Are we (still) in the initial iteration pass?
224    */
225   int in_first_iteration;
226
227   /**
228    * Is there a store activity waiting for this monitor?  We only raise the
229    * flag when it happens and search the DLL for the store activity when we
230    * had a limit increase.  If we cannot find any waiting store activity at
231    * that time, we clear the flag again.
232    */
233   int sa_waiting;
234
235 };
236
237
238 /**
239  * Pending operation on the namecache.
240  */
241 struct CacheOperation
242 {
243
244   /**
245    * Kept in a DLL.
246    */
247   struct CacheOperation *prev;
248
249   /**
250    * Kept in a DLL.
251    */
252   struct CacheOperation *next;
253
254   /**
255    * Handle to namecache queue.
256    */
257   struct GNUNET_NAMECACHE_QueueEntry *qe;
258
259   /**
260    * Client to notify about the result, can be NULL.
261    */
262   struct NamestoreClient *nc;
263
264   /**
265    * Zone iteration to call #zone_iteration_done_client_continue()
266    * for if applicable, can be NULL.
267    */
268   struct ZoneIteration *zi;
269   
270   /**
271    * Client's request ID.
272    */
273   uint32_t rid;
274 };
275
276
277 /**
278  * Information for an ongoing #handle_record_store() operation.
279  * Needed as we may wait for monitors to be ready for the notification.
280  */
281 struct StoreActivity
282 {
283   /**
284    * Kept in a DLL.
285    */
286   struct StoreActivity *next;
287
288   /**
289    * Kept in a DLL.
290    */
291   struct StoreActivity *prev;
292
293   /**
294    * Which client triggered the store activity?
295    */
296   struct NamestoreClient *nc;
297
298   /**
299    * Copy of the original store message (as data fields in @e rd will
300    * point into it!).
301    */
302   const struct RecordStoreMessage *rsm;
303
304   /**
305    * Next zone monitor that still needs to be notified about this PUT.
306    */
307   struct ZoneMonitor *zm_pos;
308
309   /**
310    * Label nicely canonicalized (lower case).
311    */
312   char *conv_name;
313
314 };
315
316
317 /**
318  * Entry in list of cached nick resolutions.
319  */ 
320 struct NickCache
321 {
322   /**
323    * Zone the cache entry is for.
324    */ 
325   struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
326
327   /**
328    * Cached record data.
329    */
330   struct GNUNET_GNSRECORD_Data *rd;
331
332   /**
333    * Timestamp when this cache entry was used last.
334    */
335   struct GNUNET_TIME_Absolute last_used;
336 };
337
338
339 /**
340  * We cache nick records to reduce DB load. 
341  */
342 static struct NickCache nick_cache[NC_SIZE];
343
344 /**
345  * Public key of all zeros.
346  */
347 static const struct GNUNET_CRYPTO_EcdsaPrivateKey zero;
348
349 /**
350  * Configuration handle.
351  */
352 static const struct GNUNET_CONFIGURATION_Handle *GSN_cfg;
353
354 /**
355  * Handle to the statistics service
356  */
357 static struct GNUNET_STATISTICS_Handle *statistics;
358
359 /**
360  * Namecache handle.
361  */
362 static struct GNUNET_NAMECACHE_Handle *namecache;
363
364 /**
365  * Database handle
366  */
367 static struct GNUNET_NAMESTORE_PluginFunctions *GSN_database;
368
369 /**
370  * Name of the database plugin
371  */
372 static char *db_lib_name;
373
374 /**
375  * Head of cop DLL.
376  */
377 static struct CacheOperation *cop_head;
378
379 /**
380  * Tail of cop DLL.
381  */
382 static struct CacheOperation *cop_tail;
383
384 /**
385  * First active zone monitor.
386  */
387 static struct ZoneMonitor *monitor_head;
388
389 /**
390  * Last active zone monitor.
391  */
392 static struct ZoneMonitor *monitor_tail;
393
394 /**
395  * Head of DLL of monitor-blocked store activities.
396  */
397 static struct StoreActivity *sa_head;
398
399 /**
400  * Tail of DLL of monitor-blocked store activities.
401  */
402 static struct StoreActivity *sa_tail;
403
404 /**
405  * Notification context shared by all monitors.
406  */
407 static struct GNUNET_NotificationContext *monitor_nc;
408
409 /**
410  * Optimize block insertion by caching map of private keys to
411  * public keys in memory?
412  */
413 static int cache_keys;
414
415 /**
416  * Use the namecache? Doing so creates additional cryptographic
417  * operations whenever we touch a record.
418  */
419 static int disable_namecache;
420
421
422 /**
423  * Task run during shutdown.
424  *
425  * @param cls unused
426  */
427 static void
428 cleanup_task (void *cls)
429 {
430   struct CacheOperation *cop;
431
432   (void) cls;
433   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
434               "Stopping namestore service\n");
435   while (NULL != (cop = cop_head))
436   {
437     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
438                 "Aborting incomplete namecache operation\n");
439     GNUNET_NAMECACHE_cancel (cop->qe);
440     GNUNET_CONTAINER_DLL_remove (cop_head,
441                                  cop_tail,
442                                  cop);
443     GNUNET_free (cop);
444   }
445   if (NULL != namecache)
446   {
447     GNUNET_NAMECACHE_disconnect (namecache);
448     namecache = NULL;
449   }
450   GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name,
451                                               GSN_database));
452   GNUNET_free (db_lib_name);
453   db_lib_name = NULL;
454   if (NULL != monitor_nc)
455   {
456     GNUNET_notification_context_destroy (monitor_nc);
457     monitor_nc = NULL;
458   }
459   if (NULL != statistics)
460   {
461     GNUNET_STATISTICS_destroy (statistics,
462                                GNUNET_NO);
463     statistics = NULL;
464   }
465 }
466
467
468 /**
469  * Release memory used by @a sa.
470  *
471  * @param sa activity to free
472  */
473 static void
474 free_store_activity (struct StoreActivity *sa)
475 {
476   GNUNET_CONTAINER_DLL_remove (sa_head,
477                                sa_tail,
478                                sa);
479   GNUNET_free (sa->conv_name);
480   GNUNET_free (sa);
481 }
482
483
484 /**
485  * Function called with the records for the #GNUNET_GNS_EMPTY_LABEL_AT
486  * label in the zone.  Used to locate the #GNUNET_GNSRECORD_TYPE_NICK
487  * record, which (if found) is then copied to @a cls for future use.
488  *
489  * @param cls a `struct GNUNET_GNSRECORD_Data **` for storing the nick (if found)
490  * @param seq sequence number of the record
491  * @param private_key the private key of the zone (unused)
492  * @param label should be #GNUNET_GNS_EMPTY_LABEL_AT
493  * @param rd_count number of records in @a rd
494  * @param rd records stored under @a label in the zone
495  */
496 static void
497 lookup_nick_it (void *cls,
498                 uint64_t seq,
499                 const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
500                 const char *label,
501                 unsigned int rd_count,
502                 const struct GNUNET_GNSRECORD_Data *rd)
503 {
504   struct GNUNET_GNSRECORD_Data **res = cls;
505
506   (void) private_key;
507   (void) seq;
508   if (0 != strcmp (label, GNUNET_GNS_EMPTY_LABEL_AT))
509   {
510     GNUNET_break (0);
511     return;
512   }
513   for (unsigned int c = 0; c < rd_count; c++)
514   {
515     if (GNUNET_GNSRECORD_TYPE_NICK == rd[c].record_type)
516     {
517       (*res) = GNUNET_malloc (rd[c].data_size + sizeof (struct GNUNET_GNSRECORD_Data));
518       (*res)->data = &(*res)[1];
519       GNUNET_memcpy ((void *) (*res)->data,
520                      rd[c].data,
521                      rd[c].data_size);
522       (*res)->data_size = rd[c].data_size;
523       (*res)->expiration_time = rd[c].expiration_time;
524       (*res)->flags = rd[c].flags;
525       (*res)->record_type = GNUNET_GNSRECORD_TYPE_NICK;
526       return;
527     }
528   }
529   (*res) = NULL;
530 }
531
532
533 /**
534  * Add entry to the cache for @a zone and @a nick
535  *
536  * @param zone zone key to cache under
537  * @param nick nick entry to cache
538  */
539 static void
540 cache_nick (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone,
541             const struct GNUNET_GNSRECORD_Data *nick)
542 {
543   struct NickCache *oldest;
544
545   oldest = NULL;
546   for (unsigned int i=0;i<NC_SIZE;i++)
547   {
548     struct NickCache *pos = &nick_cache[i];
549
550     if ( (NULL == oldest) ||
551          (oldest->last_used.abs_value_us >
552           pos->last_used.abs_value_us) )
553       oldest = pos;
554     if (0 == memcmp (zone,
555                      &pos->zone,
556                      sizeof (*zone)))
557     {
558       oldest = pos;
559       break;
560     }
561   }
562   GNUNET_free_non_null (oldest->rd);
563   oldest->zone = *zone;
564   oldest->rd = GNUNET_malloc (sizeof (*nick) +
565                               nick->data_size);
566   *oldest->rd = *nick;
567   oldest->rd->data = &oldest->rd[1];
568   memcpy (&oldest->rd[1],
569           nick->data,
570           nick->data_size);
571   oldest->last_used = GNUNET_TIME_absolute_get ();
572 }
573
574
575 /**
576  * Return the NICK record for the zone (if it exists).
577  *
578  * @param zone private key for the zone to look for nick
579  * @return NULL if no NICK record was found
580  */
581 static struct GNUNET_GNSRECORD_Data *
582 get_nick_record (const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone)
583 {
584   struct GNUNET_CRYPTO_EcdsaPublicKey pub;
585   struct GNUNET_GNSRECORD_Data *nick;
586   int res;
587
588   /* check cache first */
589   for (unsigned int i=0;i<NC_SIZE;i++)
590   {
591     struct NickCache *pos = &nick_cache[i];
592     if ( (NULL != pos->rd) &&
593          (0 == memcmp (zone,
594                        &pos->zone,
595                        sizeof (*zone))) )
596     {
597       nick = GNUNET_malloc (sizeof (*nick) +
598                             pos->rd->data_size);
599       *nick = *pos->rd;
600       nick->data = &nick[1];
601       memcpy (&nick[1],
602               pos->rd->data,
603               pos->rd->data_size);
604       pos->last_used = GNUNET_TIME_absolute_get ();
605       return nick;
606     }
607   }
608   
609   nick = NULL;
610   res = GSN_database->lookup_records (GSN_database->cls,
611                                       zone,
612                                       GNUNET_GNS_EMPTY_LABEL_AT,
613                                       &lookup_nick_it,
614                                       &nick);
615   if ( (GNUNET_OK != res) ||
616        (NULL == nick) )
617   {
618     GNUNET_CRYPTO_ecdsa_key_get_public (zone, &pub);
619     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
620                 "No nick name set for zone `%s'\n",
621                 GNUNET_GNSRECORD_z2s (&pub));
622     return NULL;
623   }
624
625   /* update cache */
626   cache_nick (zone,
627               nick);
628   return nick;
629 }
630
631
632 /**
633  * Merge the nick record @a nick_rd with the rest of the
634  * record set given in @a rd2.  Store the result in @a rdc_res
635  * and @a rd_res.  The @a nick_rd's expiration time is set to
636  * the maximum expiration time of all of the records in @a rd2.
637  *
638  * @param nick_rd the nick record to integrate
639  * @param rd2_length length of the @a rd2 array
640  * @param rd2 array of records
641  * @param rdc_res[out] length of the resulting @a rd_res array
642  * @param rd_res[out] set to an array of records,
643  *                    including @a nick_rd and @a rd2;
644  *           all of the variable-size 'data' fields in @a rd2 are
645  *           allocated in the same chunk of memory!
646  */
647 static void
648 merge_with_nick_records (const struct GNUNET_GNSRECORD_Data *nick_rd,
649                          unsigned int rd2_length,
650                          const struct GNUNET_GNSRECORD_Data *rd2,
651                          unsigned int *rdc_res,
652                          struct GNUNET_GNSRECORD_Data **rd_res)
653 {
654   uint64_t latest_expiration;
655   size_t req;
656   char *data;
657   size_t data_offset;
658   struct GNUNET_GNSRECORD_Data *target;
659
660   (*rdc_res) = 1 + rd2_length;
661   if (0 == 1 + rd2_length)
662   {
663     GNUNET_break (0);
664     (*rd_res) = NULL;
665     return;
666   }
667   req = sizeof (struct GNUNET_GNSRECORD_Data) + nick_rd->data_size;
668   for (unsigned int i=0; i<rd2_length; i++)
669   {
670     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
671
672     if (req + sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size < req)
673     {
674       GNUNET_break (0);
675       (*rd_res) = NULL;
676       return;
677     }
678     req += sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size;
679   }
680   target = GNUNET_malloc (req);
681   (*rd_res) = target;
682   data = (char *) &target[1 + rd2_length];
683   data_offset = 0;
684   latest_expiration = 0;
685   for (unsigned int i=0;i<rd2_length;i++)
686   {
687     const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
688
689     if (0 != (orig->flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
690     {
691       if ((GNUNET_TIME_absolute_get().abs_value_us + orig->expiration_time) >
692           latest_expiration)
693         latest_expiration = orig->expiration_time;
694     }
695     else if (orig->expiration_time > latest_expiration)
696       latest_expiration = orig->expiration_time;
697     target[i] = *orig;
698     target[i].data = (void *) &data[data_offset];
699     GNUNET_memcpy (&data[data_offset],
700                    orig->data,
701                    orig->data_size);
702     data_offset += orig->data_size;
703   }
704   /* append nick */
705   target[rd2_length] = *nick_rd;
706   target[rd2_length].expiration_time = latest_expiration;
707   target[rd2_length].data = (void *) &data[data_offset];
708   GNUNET_memcpy (&data[data_offset],
709                  nick_rd->data,
710                  nick_rd->data_size);
711   data_offset += nick_rd->data_size;
712   GNUNET_assert (req ==
713                  (sizeof (struct GNUNET_GNSRECORD_Data)) * (*rdc_res) + data_offset);
714 }
715
716
717 /**
718  * Generate a `struct LookupNameResponseMessage` and send it to the
719  * given client using the given notification context.
720  *
721  * @param nc client to unicast to
722  * @param request_id request ID to use
723  * @param zone_key zone key of the zone
724  * @param name name
725  * @param rd_count number of records in @a rd
726  * @param rd array of records
727  */
728 static void
729 send_lookup_response (struct NamestoreClient *nc,
730                       uint32_t request_id,
731                       const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
732                       const char *name,
733                       unsigned int rd_count,
734                       const struct GNUNET_GNSRECORD_Data *rd)
735 {
736   struct GNUNET_MQ_Envelope *env;
737   struct RecordResultMessage *zir_msg;
738   struct GNUNET_GNSRECORD_Data *nick;
739   struct GNUNET_GNSRECORD_Data *res;
740   unsigned int res_count;
741   size_t name_len;
742   ssize_t rd_ser_len;
743   char *name_tmp;
744   char *rd_ser;
745
746   nick = get_nick_record (zone_key);
747   GNUNET_assert (-1 !=
748                  GNUNET_GNSRECORD_records_get_size (rd_count,
749                                                     rd));
750
751   if ( (NULL != nick) &&
752        (0 != strcmp (name,
753                      GNUNET_GNS_EMPTY_LABEL_AT)))
754   {
755     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
756     merge_with_nick_records (nick,
757                              rd_count,
758                              rd,
759                              &res_count,
760                              &res);
761     GNUNET_free (nick);
762   }
763   else
764   {
765     res_count = rd_count;
766     res = (struct GNUNET_GNSRECORD_Data *) rd;
767   }
768
769   GNUNET_assert (-1 !=
770                  GNUNET_GNSRECORD_records_get_size (res_count,
771                                                     res));
772
773
774   name_len = strlen (name) + 1;
775   rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count,
776                                                   res);
777   if (rd_ser_len < 0)
778   {
779     GNUNET_break (0);
780     GNUNET_SERVICE_client_drop (nc->client);
781     return;
782   }
783   if (((size_t) rd_ser_len) >= UINT16_MAX - name_len - sizeof (*zir_msg))
784   {
785     GNUNET_break (0);
786     GNUNET_SERVICE_client_drop (nc->client);
787     return;
788   }
789   env = GNUNET_MQ_msg_extra (zir_msg,
790                              name_len + rd_ser_len,
791                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
792   zir_msg->gns_header.r_id = htonl (request_id);
793   zir_msg->name_len = htons (name_len);
794   zir_msg->rd_count = htons (res_count);
795   zir_msg->rd_len = htons ((uint16_t) rd_ser_len);
796   zir_msg->private_key = *zone_key;
797   name_tmp = (char *) &zir_msg[1];
798   GNUNET_memcpy (name_tmp,
799                  name,
800                  name_len);
801   rd_ser = &name_tmp[name_len];
802   GNUNET_assert (rd_ser_len ==
803                  GNUNET_GNSRECORD_records_serialize (res_count,
804                                                      res,
805                                                      rd_ser_len,
806                                                      rd_ser));
807   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808               "Sending RECORD_RESULT message with %u records\n",
809               res_count);
810   GNUNET_STATISTICS_update (statistics,
811                             "Record sets sent to clients",
812                             1,
813                             GNUNET_NO);
814   GNUNET_MQ_send (nc->mq,
815                   env);
816   if (rd != res)
817     GNUNET_free (res);
818 }
819
820
821 /**
822  * Send response to the store request to the client.
823  *
824  * @param client client to talk to
825  * @param res status of the operation
826  * @param rid client's request ID
827  */
828 static void
829 send_store_response (struct NamestoreClient *nc,
830                      int res,
831                      uint32_t rid)
832 {
833   struct GNUNET_MQ_Envelope *env;
834   struct RecordStoreResponseMessage *rcr_msg;
835
836   GNUNET_assert (NULL != nc);
837   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838               "Sending RECORD_STORE_RESPONSE message\n");
839   GNUNET_STATISTICS_update (statistics,
840                             "Store requests completed",
841                             1,
842                             GNUNET_NO);
843   env = GNUNET_MQ_msg (rcr_msg,
844                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE_RESPONSE);
845   rcr_msg->gns_header.r_id = htonl (rid);
846   rcr_msg->op_result = htonl (res);
847   GNUNET_MQ_send (nc->mq,
848                   env);
849 }
850
851
852 /**
853  * Function called once we are done with the zone iteration and
854  * allow the zone iteration client to send us more messages.
855  *
856  * @param zi zone iteration we are processing
857  */
858 static void
859 zone_iteration_done_client_continue (struct ZoneIteration *zi)
860 {
861   struct GNUNET_MQ_Envelope *env;
862   struct GNUNET_NAMESTORE_Header *em;
863
864   GNUNET_SERVICE_client_continue (zi->nc->client);
865   if (! zi->send_end)
866     return;
867   /* send empty response to indicate end of list */
868   env = GNUNET_MQ_msg (em,
869                        GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END);
870   em->r_id = htonl (zi->request_id);
871   GNUNET_MQ_send (zi->nc->mq,
872                   env);
873       
874   GNUNET_CONTAINER_DLL_remove (zi->nc->op_head,
875                                zi->nc->op_tail,
876                                zi);
877   GNUNET_free (zi);
878 }
879
880
881 /**
882  * Cache operation complete, clean up.
883  *
884  * @param cls the `struct CacheOperation`
885  * @param success success
886  * @param emsg error messages
887  */
888 static void
889 finish_cache_operation (void *cls,
890                         int32_t success,
891                         const char *emsg)
892 {
893   struct CacheOperation *cop = cls;
894   struct ZoneIteration *zi;
895
896   if (NULL != emsg)
897     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
898                 _("Failed to replicate block in namecache: %s\n"),
899                 emsg);
900   else
901     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
902                 "CACHE operation completed\n");
903   GNUNET_CONTAINER_DLL_remove (cop_head,
904                                cop_tail,
905                                cop);
906   if (NULL != cop->nc)
907     send_store_response (cop->nc,
908                          success,
909                          cop->rid);
910   if (NULL != (zi = cop->zi))
911     {
912       zi->cache_ops--;
913       if (0 == zi->cache_ops)
914       {
915         /* unchoke zone iteration, cache has caught up */
916         zone_iteration_done_client_continue (zi);
917       }
918     }
919   GNUNET_free (cop);
920 }
921
922
923 /**
924  * We just touched the plaintext information about a name in our zone;
925  * refresh the corresponding (encrypted) block in the namecache.
926  *
927  * @param nc client responsible for the request, can be NULL
928  * @param zi zone iteration response for the request, can be NULL
929  * @param rid request ID of the client
930  * @param zone_key private key of the zone
931  * @param name label for the records
932  * @param rd_count number of records
933  * @param rd records stored under the given @a name
934  */
935 static void
936 refresh_block (struct NamestoreClient *nc,
937                struct ZoneIteration *zi,
938                uint32_t rid,
939                const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
940                const char *name,
941                unsigned int rd_count,
942                const struct GNUNET_GNSRECORD_Data *rd)
943 {
944   struct GNUNET_GNSRECORD_Block *block;
945   struct CacheOperation *cop;
946   struct GNUNET_CRYPTO_EcdsaPublicKey pkey;
947   struct GNUNET_GNSRECORD_Data *nick;
948   struct GNUNET_GNSRECORD_Data *res;
949   unsigned int res_count;
950   struct GNUNET_TIME_Absolute exp_time;
951
952   nick = get_nick_record (zone_key);
953   res_count = rd_count;
954   res = (struct GNUNET_GNSRECORD_Data *) rd; /* fixme: a bit unclean... */
955   if (NULL != nick)
956   {
957     nick->flags = (nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
958     merge_with_nick_records (nick,
959                              rd_count,rd,
960                              &res_count,
961                              &res);
962     GNUNET_free (nick);
963   }
964   if (0 == res_count)
965   {
966     if (NULL != nc)
967       send_store_response (nc,
968                            GNUNET_OK,
969                            rid);
970     return; /* no data, no need to update cache */
971   }
972   if (GNUNET_YES == disable_namecache)
973   {
974     GNUNET_STATISTICS_update (statistics,
975                               "Namecache updates skipped (NC disabled)",
976                               1,
977                               GNUNET_NO);
978     if (NULL != nc)
979       send_store_response (nc,
980                            GNUNET_OK,
981                            rid);
982     return;
983   }
984   exp_time = GNUNET_GNSRECORD_record_get_expiration_time (res_count,
985                                                           res);
986   if (cache_keys)
987     block = GNUNET_GNSRECORD_block_create2 (zone_key,
988                                             exp_time,
989                                             name,
990                                             res,
991                                             res_count);
992   else
993     block = GNUNET_GNSRECORD_block_create (zone_key,
994                                            exp_time,
995                                            name,
996                                            res,
997                                            res_count);
998   GNUNET_assert (NULL != block);
999   GNUNET_CRYPTO_ecdsa_key_get_public (zone_key,
1000                                       &pkey);
1001   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002               "Caching block for label `%s' with %u records and expiration %s in zone `%s' in namecache\n",
1003               name,
1004               res_count,
1005               GNUNET_STRINGS_absolute_time_to_string (exp_time),
1006               GNUNET_GNSRECORD_z2s (&pkey));
1007   GNUNET_STATISTICS_update (statistics,
1008                             "Namecache updates pushed",
1009                             1,
1010                             GNUNET_NO);
1011   cop = GNUNET_new (struct CacheOperation);
1012   cop->nc = nc;
1013   cop->zi = zi;
1014   if (NULL != zi)
1015     zi->cache_ops++;
1016   cop->rid = rid;
1017   GNUNET_CONTAINER_DLL_insert (cop_head,
1018                                cop_tail,
1019                                cop);
1020   cop->qe = GNUNET_NAMECACHE_block_cache (namecache,
1021                                           block,
1022                                           &finish_cache_operation,
1023                                           cop);
1024   GNUNET_free (block);
1025 }
1026
1027
1028 /**
1029  * Print a warning that one of our monitors is no longer reacting.
1030  *
1031  * @param cls a `struct ZoneMonitor` to warn about
1032  */
1033 static void
1034 warn_monitor_slow (void *cls)
1035 {
1036   struct ZoneMonitor *zm = cls;
1037
1038   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1039               "No response from monitor since %s\n",
1040               GNUNET_STRINGS_absolute_time_to_string (zm->sa_waiting_start));
1041   zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1042                                                       &warn_monitor_slow,
1043                                                       zm);
1044 }
1045
1046
1047 /**
1048  * Continue processing the @a sa.
1049  *
1050  * @param sa store activity to process
1051  */
1052 static void
1053 continue_store_activity (struct StoreActivity *sa)
1054 {
1055   const struct RecordStoreMessage *rp_msg = sa->rsm;
1056   unsigned int rd_count;
1057   size_t name_len;
1058   size_t rd_ser_len;
1059   uint32_t rid;
1060   const char *name_tmp;
1061   const char *rd_ser;
1062
1063   rid = ntohl (rp_msg->gns_header.r_id);
1064   name_len = ntohs (rp_msg->name_len);
1065   rd_count = ntohs (rp_msg->rd_count);
1066   rd_ser_len = ntohs (rp_msg->rd_len);
1067   name_tmp = (const char *) &rp_msg[1];
1068   rd_ser = &name_tmp[name_len];
1069   {
1070     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1071
1072     /* We did this before, must succeed again */
1073     GNUNET_assert (GNUNET_OK ==
1074                    GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1075                                                          rd_ser,
1076                                                          rd_count,
1077                                                          rd));
1078
1079     for (struct ZoneMonitor *zm = sa->zm_pos;
1080          NULL != zm;
1081          zm = sa->zm_pos)
1082     {
1083       if ( (0 != memcmp (&rp_msg->private_key,
1084                          &zm->zone,
1085                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) &&
1086            (0 != memcmp (&zm->zone,
1087                          &zero,
1088                          sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) )
1089         {
1090           sa->zm_pos = zm->next; /* not interesting to this monitor */
1091           continue;
1092         }
1093       if (zm->limit == zm->iteration_cnt)
1094       {
1095         zm->sa_waiting = GNUNET_YES;
1096         zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
1097         if (NULL != zm->sa_wait_warning)
1098           GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1099         zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
1100                                                             &warn_monitor_slow,
1101                                                             zm);
1102         return; /* blocked on zone monitor */
1103       }
1104       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1105                   "Notifying monitor about changes under label `%s'\n",
1106                   sa->conv_name);
1107       zm->limit--;
1108       send_lookup_response (zm->nc,
1109                             0,
1110                             &rp_msg->private_key,
1111                             sa->conv_name,
1112                             rd_count,
1113                             rd);
1114       sa->zm_pos = zm->next;
1115     }
1116     /* great, done with the monitors, unpack (again) for refresh_block operation */
1117     refresh_block (sa->nc,
1118                    NULL,
1119                    rid,
1120                    &rp_msg->private_key,
1121                    sa->conv_name,
1122                    rd_count,
1123                    rd);
1124   }
1125   GNUNET_SERVICE_client_continue (sa->nc->client);
1126   free_store_activity (sa);
1127 }
1128
1129
1130 /**
1131  * Called whenever a client is disconnected.
1132  * Frees our resources associated with that client.
1133  *
1134  * @param cls closure
1135  * @param client identification of the client
1136  * @param app_ctx the `struct NamestoreClient` of @a client
1137  */
1138 static void
1139 client_disconnect_cb (void *cls,
1140                       struct GNUNET_SERVICE_Client *client,
1141                       void *app_ctx)
1142 {
1143   struct NamestoreClient *nc = app_ctx;
1144   struct ZoneIteration *no;
1145   struct CacheOperation *cop;
1146
1147   (void) cls;
1148   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1149               "Client %p disconnected\n",
1150               client);
1151   for (struct ZoneMonitor *zm = monitor_head; NULL != zm; zm = zm->next)
1152   {
1153     struct StoreActivity *san;
1154
1155     if (nc != zm->nc)
1156       continue;
1157     GNUNET_CONTAINER_DLL_remove (monitor_head,
1158                                  monitor_tail,
1159                                  zm);
1160     if (NULL != zm->task)
1161     {
1162       GNUNET_SCHEDULER_cancel (zm->task);
1163       zm->task = NULL;
1164     }
1165     if (NULL != zm->sa_wait_warning)
1166     {
1167       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
1168       zm->sa_wait_warning = NULL;
1169     }
1170     for (struct StoreActivity *sa = sa_head; NULL != sa; sa = san)
1171     {
1172       san = sa->next;
1173       if (zm == sa->zm_pos)
1174       {
1175         sa->zm_pos = zm->next;
1176         /* this may free sa */
1177         continue_store_activity (sa);
1178       }
1179     }
1180     GNUNET_free (zm);
1181     break;
1182   }
1183   for (struct StoreActivity *sa = sa_head; NULL != sa; sa = sa->next)
1184   {
1185     if (sa->nc == nc)
1186     {
1187       /* this may free sa */
1188       free_store_activity (sa);
1189       break; /* there can only be one per nc */
1190     }
1191   }
1192   while (NULL != (no = nc->op_head))
1193   {
1194     GNUNET_CONTAINER_DLL_remove (nc->op_head,
1195                                  nc->op_tail,
1196                                  no);
1197     GNUNET_free (no);
1198   }
1199   for (cop = cop_head; NULL != cop; cop = cop->next)
1200     if (nc == cop->nc)
1201       cop->nc = NULL;
1202   GNUNET_free (nc);
1203 }
1204
1205
1206 /**
1207  * Add a client to our list of active clients.
1208  *
1209  * @param cls NULL
1210  * @param client client to add
1211  * @param mq message queue for @a client
1212  * @return internal namestore client structure for this client
1213  */
1214 static void *
1215 client_connect_cb (void *cls,
1216                    struct GNUNET_SERVICE_Client *client,
1217                    struct GNUNET_MQ_Handle *mq)
1218 {
1219   struct NamestoreClient *nc;
1220
1221   (void) cls;
1222   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1223               "Client %p connected\n",
1224               client);
1225   nc = GNUNET_new (struct NamestoreClient);
1226   nc->client = client;
1227   nc->mq = mq;
1228   return nc;
1229 }
1230
1231
1232 /**
1233  * Closure for #lookup_it().
1234  */
1235 struct RecordLookupContext
1236 {
1237
1238   /**
1239    * FIXME.
1240    */
1241   const char *label;
1242
1243   /**
1244    * FIXME.
1245    */
1246   char *res_rd;
1247
1248   /**
1249    * FIXME.
1250    */
1251   struct GNUNET_GNSRECORD_Data *nick;
1252
1253   /**
1254    * FIXME.
1255    */
1256   int found;
1257
1258   /**
1259    * FIXME.
1260    */
1261   unsigned int res_rd_count;
1262
1263   /**
1264    * FIXME.
1265    */
1266   ssize_t rd_ser_len;
1267 };
1268
1269
1270 /**
1271  * FIXME.
1272  *
1273  * @param seq sequence number of the record
1274  */
1275 static void
1276 lookup_it (void *cls,
1277            uint64_t seq,
1278            const struct GNUNET_CRYPTO_EcdsaPrivateKey *private_key,
1279            const char *label,
1280            unsigned int rd_count,
1281            const struct GNUNET_GNSRECORD_Data *rd)
1282 {
1283   struct RecordLookupContext *rlc = cls;
1284
1285   (void) private_key;
1286   (void) seq;
1287   if (0 != strcmp (label,
1288                    rlc->label))
1289     return;
1290   rlc->found = GNUNET_YES;
1291   if (0 == rd_count)
1292   {
1293     rlc->rd_ser_len = 0;
1294     rlc->res_rd_count = 0;
1295     rlc->res_rd = NULL;
1296     return;
1297   }
1298   if ( (NULL != rlc->nick) &&
1299        (0 != strcmp (label,
1300                      GNUNET_GNS_EMPTY_LABEL_AT)) )
1301   {
1302     /* Merge */
1303     struct GNUNET_GNSRECORD_Data *rd_res;
1304     unsigned int rdc_res;
1305
1306     rd_res = NULL;
1307     rdc_res = 0;
1308     rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
1309     merge_with_nick_records (rlc->nick,
1310                              rd_count,
1311                              rd,
1312                              &rdc_res,
1313                              &rd_res);
1314     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res,
1315                                                          rd_res);
1316     if (rlc->rd_ser_len < 0)
1317     {
1318       GNUNET_break (0);
1319       GNUNET_free  (rd_res);
1320       rlc->found = GNUNET_NO;
1321       rlc->rd_ser_len = 0;
1322       return;
1323     }
1324     rlc->res_rd_count = rdc_res;
1325     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1326     if (rlc->rd_ser_len !=
1327         GNUNET_GNSRECORD_records_serialize (rdc_res,
1328                                             rd_res,
1329                                             rlc->rd_ser_len,
1330                                             rlc->res_rd))
1331     {
1332       GNUNET_break (0);
1333       GNUNET_free  (rlc->res_rd);
1334       rlc->res_rd = NULL;
1335       rlc->res_rd_count = 0;
1336       rlc->rd_ser_len = 0;
1337       GNUNET_free  (rd_res);
1338       rlc->found = GNUNET_NO;
1339       return;
1340     }
1341     GNUNET_free (rd_res);
1342     GNUNET_free (rlc->nick);
1343     rlc->nick = NULL;
1344   }
1345   else
1346   {
1347     rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1348                                                          rd);
1349     if (rlc->rd_ser_len < 0)
1350     {
1351       GNUNET_break (0);
1352       rlc->found = GNUNET_NO;
1353       rlc->rd_ser_len = 0;
1354       return;
1355     }
1356     rlc->res_rd_count = rd_count;
1357     rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
1358     if (rlc->rd_ser_len !=
1359         GNUNET_GNSRECORD_records_serialize (rd_count,
1360                                             rd,
1361                                             rlc->rd_ser_len,
1362                                             rlc->res_rd))
1363     {
1364       GNUNET_break (0);
1365       GNUNET_free  (rlc->res_rd);
1366       rlc->res_rd = NULL;
1367       rlc->res_rd_count = 0;
1368       rlc->rd_ser_len = 0;
1369       rlc->found = GNUNET_NO;
1370       return;
1371     }
1372   }
1373 }
1374
1375
1376 /**
1377  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1378  *
1379  * @param cls client sending the message
1380  * @param ll_msg message of type `struct LabelLookupMessage`
1381  * @return #GNUNET_OK if @a ll_msg is well-formed
1382  */
1383 static int
1384 check_record_lookup (void *cls,
1385                      const struct LabelLookupMessage *ll_msg)
1386 {
1387   uint32_t name_len;
1388   size_t src_size;
1389
1390   (void) cls;
1391   name_len = ntohl (ll_msg->label_len);
1392   src_size = ntohs (ll_msg->gns_header.header.size);
1393   if (name_len != src_size - sizeof (struct LabelLookupMessage))
1394   {
1395     GNUNET_break (0);
1396     return GNUNET_SYSERR;
1397   }
1398   GNUNET_MQ_check_zero_termination (ll_msg);
1399   return GNUNET_OK;
1400 }
1401
1402
1403 /**
1404  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP message
1405  *
1406  * @param cls client sending the message
1407  * @param ll_msg message of type `struct LabelLookupMessage`
1408  */
1409 static void
1410 handle_record_lookup (void *cls,
1411                       const struct LabelLookupMessage *ll_msg)
1412 {
1413   struct NamestoreClient *nc = cls;
1414   struct GNUNET_MQ_Envelope *env;
1415   struct LabelLookupResponseMessage *llr_msg;
1416   struct RecordLookupContext rlc;
1417   const char *name_tmp;
1418   char *res_name;
1419   char *conv_name;
1420   uint32_t name_len;
1421   int res;
1422
1423   name_len = ntohl (ll_msg->label_len);
1424   name_tmp = (const char *) &ll_msg[1];
1425   GNUNET_SERVICE_client_continue (nc->client);
1426   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1427               "Received NAMESTORE_RECORD_LOOKUP message for name `%s'\n",
1428               name_tmp);
1429
1430   conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1431   if (NULL == conv_name)
1432   {
1433     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1434                 "Error converting name `%s'\n",
1435                 name_tmp);
1436     GNUNET_SERVICE_client_drop (nc->client);
1437     return;
1438   }
1439   rlc.label = conv_name;
1440   rlc.found = GNUNET_NO;
1441   rlc.res_rd_count = 0;
1442   rlc.res_rd = NULL;
1443   rlc.rd_ser_len = 0;
1444   rlc.nick = get_nick_record (&ll_msg->zone);
1445   res = GSN_database->lookup_records (GSN_database->cls,
1446                                       &ll_msg->zone,
1447                                       conv_name,
1448                                       &lookup_it,
1449                                       &rlc);
1450   GNUNET_free (conv_name);
1451   env = GNUNET_MQ_msg_extra (llr_msg,
1452                              name_len + rlc.rd_ser_len,
1453                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP_RESPONSE);
1454   llr_msg->gns_header.r_id = ll_msg->gns_header.r_id;
1455   llr_msg->private_key = ll_msg->zone;
1456   llr_msg->name_len = htons (name_len);
1457   llr_msg->rd_count = htons (rlc.res_rd_count);
1458   llr_msg->rd_len = htons (rlc.rd_ser_len);
1459   res_name = (char *) &llr_msg[1];
1460   if  ((GNUNET_YES == rlc.found) && (GNUNET_OK == res))
1461     llr_msg->found = ntohs (GNUNET_YES);
1462   else
1463     llr_msg->found = ntohs (GNUNET_NO);
1464   GNUNET_memcpy (&llr_msg[1],
1465                  name_tmp,
1466                  name_len);
1467   GNUNET_memcpy (&res_name[name_len],
1468                  rlc.res_rd,
1469                  rlc.rd_ser_len);
1470   GNUNET_MQ_send (nc->mq,
1471                   env);
1472   GNUNET_free_non_null (rlc.res_rd);
1473 }
1474
1475
1476 /**
1477  * Checks a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1478  *
1479  * @param cls client sending the message
1480  * @param rp_msg message of type `struct RecordStoreMessage`
1481  * @return #GNUNET_OK if @a rp_msg is well-formed
1482  */
1483 static int
1484 check_record_store (void *cls,
1485                     const struct RecordStoreMessage *rp_msg)
1486 {
1487   size_t name_len;
1488   size_t msg_size;
1489   size_t msg_size_exp;
1490   size_t rd_ser_len;
1491   const char *name_tmp;
1492
1493   (void) cls;
1494   name_len = ntohs (rp_msg->name_len);
1495   msg_size = ntohs (rp_msg->gns_header.header.size);
1496   rd_ser_len = ntohs (rp_msg->rd_len);
1497   msg_size_exp = sizeof (struct RecordStoreMessage) + name_len + rd_ser_len;
1498   if (msg_size != msg_size_exp)
1499   {
1500     GNUNET_break (0);
1501     return GNUNET_SYSERR;
1502   }
1503   if ( (0 == name_len) ||
1504        (name_len > MAX_NAME_LEN) )
1505   {
1506     GNUNET_break (0);
1507     return GNUNET_SYSERR;
1508   }
1509   name_tmp = (const char *) &rp_msg[1];
1510   if ('\0' != name_tmp[name_len -1])
1511   {
1512     GNUNET_break (0);
1513     return GNUNET_SYSERR;
1514   }
1515   return GNUNET_OK;
1516 }
1517
1518
1519 /**
1520  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE message
1521  *
1522  * @param cls client sending the message
1523  * @param rp_msg message of type `struct RecordStoreMessage`
1524  */
1525 static void
1526 handle_record_store (void *cls,
1527                      const struct RecordStoreMessage *rp_msg)
1528 {
1529   struct NamestoreClient *nc = cls;
1530   size_t name_len;
1531   size_t rd_ser_len;
1532   uint32_t rid;
1533   const char *name_tmp;
1534   char *conv_name;
1535   const char *rd_ser;
1536   unsigned int rd_count;
1537   int res;
1538   struct StoreActivity *sa;
1539
1540   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1541               "Received NAMESTORE_RECORD_STORE message\n");
1542   rid = ntohl (rp_msg->gns_header.r_id);
1543   name_len = ntohs (rp_msg->name_len);
1544   rd_count = ntohs (rp_msg->rd_count);
1545   rd_ser_len = ntohs (rp_msg->rd_len);
1546   GNUNET_break (0 == ntohs (rp_msg->reserved));
1547   name_tmp = (const char *) &rp_msg[1];
1548   rd_ser = &name_tmp[name_len];
1549   {
1550     struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
1551
1552     if (GNUNET_OK !=
1553         GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
1554                                               rd_ser,
1555                                               rd_count,
1556                                               rd))
1557     {
1558       GNUNET_break (0);
1559       GNUNET_SERVICE_client_drop (nc->client);
1560       return;
1561     }
1562
1563     /* Extracting and converting private key */
1564     conv_name = GNUNET_GNSRECORD_string_to_lowercase (name_tmp);
1565     if (NULL == conv_name)
1566     {
1567       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1568                   "Error converting name `%s'\n",
1569                   name_tmp);
1570       GNUNET_SERVICE_client_drop (nc->client);
1571       return;
1572     }
1573     GNUNET_STATISTICS_update (statistics,
1574                               "Well-formed store requests received",
1575                               1,
1576                               GNUNET_NO);
1577     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1578                 "Creating %u records for name `%s'\n",
1579                 (unsigned int) rd_count,
1580                 conv_name);
1581     if ( (0 == rd_count) &&
1582          (GNUNET_NO ==
1583           GSN_database->lookup_records (GSN_database->cls,
1584                                         &rp_msg->private_key,
1585                                         conv_name,
1586                                         NULL,
1587                                         0)) )
1588     {
1589       /* This name does not exist, so cannot be removed */
1590       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1591                   "Name `%s' does not exist, no deletion required\n",
1592                   conv_name);
1593       res = GNUNET_NO;
1594     }
1595     else
1596     {
1597       /* remove "NICK" records, unless this is for the
1598          #GNUNET_GNS_EMPTY_LABEL_AT label */
1599       struct GNUNET_GNSRECORD_Data rd_clean[GNUNET_NZL(rd_count)];
1600       unsigned int rd_clean_off;
1601
1602       rd_clean_off = 0;
1603       for (unsigned int i=0;i<rd_count;i++)
1604       {
1605         rd_clean[rd_clean_off] = rd[i];
1606         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1607                            conv_name)) ||
1608              (GNUNET_GNSRECORD_TYPE_NICK != rd[i].record_type) )
1609           rd_clean_off++;
1610         
1611         if ( (0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT,
1612                            conv_name)) &&
1613              (GNUNET_GNSRECORD_TYPE_NICK == rd[i].record_type) )
1614           cache_nick (&rp_msg->private_key,
1615                       &rd[i]);
1616       }
1617       res = GSN_database->store_records (GSN_database->cls,
1618                                          &rp_msg->private_key,
1619                                          conv_name,
1620                                          rd_clean_off,
1621                                          rd_clean);
1622     }
1623
1624     if (GNUNET_OK != res)
1625     {
1626       /* store not successful, not need to tell monitors */
1627       send_store_response (nc,
1628                            res,
1629                            rid);
1630       GNUNET_SERVICE_client_continue (nc->client);
1631       GNUNET_free (conv_name);
1632       return;
1633     }
1634
1635     sa = GNUNET_malloc (sizeof (struct StoreActivity) +
1636                         ntohs (rp_msg->gns_header.header.size));
1637     GNUNET_CONTAINER_DLL_insert (sa_head,
1638                                  sa_tail,
1639                                  sa);
1640     sa->nc = nc;
1641     sa->rsm = (const struct RecordStoreMessage *) &sa[1];
1642     GNUNET_memcpy (&sa[1],
1643                    rp_msg,
1644                    ntohs (rp_msg->gns_header.header.size));
1645     sa->zm_pos = monitor_head;
1646     sa->conv_name = conv_name;
1647     continue_store_activity (sa);
1648   }
1649 }
1650
1651
1652 /**
1653  * Context for record remove operations passed from #handle_zone_to_name to
1654  * #handle_zone_to_name_it as closure
1655  */
1656 struct ZoneToNameCtx
1657 {
1658   /**
1659    * Namestore client
1660    */
1661   struct NamestoreClient *nc;
1662
1663   /**
1664    * Request id (to be used in the response to the client).
1665    */
1666   uint32_t rid;
1667
1668   /**
1669    * Set to #GNUNET_OK on success, #GNUNET_SYSERR on error.  Note that
1670    * not finding a name for the zone still counts as a 'success' here,
1671    * as this field is about the success of executing the IPC protocol.
1672    */
1673   int success;
1674 };
1675
1676
1677 /**
1678  * Zone to name iterator
1679  *
1680  * @param cls struct ZoneToNameCtx *
1681  * @param seq sequence number of the record
1682  * @param zone_key the zone key
1683  * @param name name
1684  * @param rd_count number of records in @a rd
1685  * @param rd record data
1686  */
1687 static void
1688 handle_zone_to_name_it (void *cls,
1689                         uint64_t seq,
1690                         const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1691                         const char *name,
1692                         unsigned int rd_count,
1693                         const struct GNUNET_GNSRECORD_Data *rd)
1694 {
1695   struct ZoneToNameCtx *ztn_ctx = cls;
1696   struct GNUNET_MQ_Envelope *env;
1697   struct ZoneToNameResponseMessage *ztnr_msg;
1698   int16_t res;
1699   size_t name_len;
1700   ssize_t rd_ser_len;
1701   size_t msg_size;
1702   char *name_tmp;
1703   char *rd_tmp;
1704
1705   (void) seq;
1706   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1707               "Found result for zone-to-name lookup: `%s'\n",
1708               name);
1709   res = GNUNET_YES;
1710   name_len = (NULL == name) ? 0 : strlen (name) + 1;
1711   rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
1712                                                   rd);
1713   if (rd_ser_len < 0)
1714   {
1715     GNUNET_break (0);
1716     ztn_ctx->success = GNUNET_SYSERR;
1717     return;
1718   }
1719   msg_size = sizeof (struct ZoneToNameResponseMessage) + name_len + rd_ser_len;
1720   if (msg_size >= GNUNET_MAX_MESSAGE_SIZE)
1721   {
1722     GNUNET_break (0);
1723     ztn_ctx->success = GNUNET_SYSERR;
1724     return;
1725   }
1726   env = GNUNET_MQ_msg_extra (ztnr_msg,
1727                              name_len + rd_ser_len,
1728                              GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1729   ztnr_msg->gns_header.header.size = htons (msg_size);
1730   ztnr_msg->gns_header.r_id = htonl (ztn_ctx->rid);
1731   ztnr_msg->res = htons (res);
1732   ztnr_msg->rd_len = htons (rd_ser_len);
1733   ztnr_msg->rd_count = htons (rd_count);
1734   ztnr_msg->name_len = htons (name_len);
1735   ztnr_msg->zone = *zone_key;
1736   name_tmp = (char *) &ztnr_msg[1];
1737   GNUNET_memcpy (name_tmp,
1738                  name,
1739                  name_len);
1740   rd_tmp = &name_tmp[name_len];
1741   GNUNET_assert (rd_ser_len ==
1742                  GNUNET_GNSRECORD_records_serialize (rd_count,
1743                                                      rd,
1744                                                      rd_ser_len,
1745                                                      rd_tmp));
1746   ztn_ctx->success = GNUNET_OK;
1747   GNUNET_MQ_send (ztn_ctx->nc->mq,
1748                   env);
1749 }
1750
1751
1752 /**
1753  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME message
1754  *
1755  * @param cls client client sending the message
1756  * @param ztn_msg message of type 'struct ZoneToNameMessage'
1757  */
1758 static void
1759 handle_zone_to_name (void *cls,
1760                      const struct ZoneToNameMessage *ztn_msg)
1761 {
1762   struct NamestoreClient *nc = cls;
1763   struct ZoneToNameCtx ztn_ctx;
1764   struct GNUNET_MQ_Envelope *env;
1765   struct ZoneToNameResponseMessage *ztnr_msg;
1766
1767   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1768               "Received ZONE_TO_NAME message\n");
1769   ztn_ctx.rid = ntohl (ztn_msg->gns_header.r_id);
1770   ztn_ctx.nc = nc;
1771   ztn_ctx.success = GNUNET_NO;
1772   if (GNUNET_SYSERR ==
1773       GSN_database->zone_to_name (GSN_database->cls,
1774                                   &ztn_msg->zone,
1775                                   &ztn_msg->value_zone,
1776                                   &handle_zone_to_name_it, &ztn_ctx))
1777   {
1778     /* internal error, hang up instead of signalling something
1779        that might be wrong */
1780     GNUNET_break (0);
1781     GNUNET_SERVICE_client_drop (nc->client);
1782     return;
1783   }
1784   if (GNUNET_NO == ztn_ctx.success)
1785   {
1786     /* no result found, send empty response */
1787     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1788                 "Found no result for zone-to-name lookup.\n");
1789     env = GNUNET_MQ_msg (ztnr_msg,
1790                          GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME_RESPONSE);
1791     ztnr_msg->gns_header.r_id = ztn_msg->gns_header.r_id;
1792     ztnr_msg->res = htons (GNUNET_NO);
1793     GNUNET_MQ_send (nc->mq,
1794                     env);
1795   }
1796   GNUNET_SERVICE_client_continue (nc->client);
1797 }
1798
1799
1800 /**
1801  * Context for record remove operations passed from
1802  * #run_zone_iteration_round to #zone_iterate_proc as closure
1803  */
1804 struct ZoneIterationProcResult
1805 {
1806   /**
1807    * The zone iteration handle
1808    */
1809   struct ZoneIteration *zi;
1810
1811   /**
1812    * Number of results left to be returned in this iteration.
1813    */
1814   uint64_t limit;
1815
1816 };
1817
1818
1819 /**
1820  * Process results for zone iteration from database
1821  *
1822  * @param cls struct ZoneIterationProcResult
1823  * @param seq sequence number of the record
1824  * @param zone_key the zone key
1825  * @param name name
1826  * @param rd_count number of records for this name
1827  * @param rd record data
1828  */
1829 static void
1830 zone_iterate_proc (void *cls,
1831                    uint64_t seq,
1832                    const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
1833                    const char *name,
1834                    unsigned int rd_count,
1835                    const struct GNUNET_GNSRECORD_Data *rd)
1836 {
1837   struct ZoneIterationProcResult *proc = cls;
1838   int do_refresh_block;
1839
1840   if ( (NULL == zone_key) &&
1841        (NULL == name) )
1842   {
1843     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1844                 "Iteration done\n");
1845     return;
1846   }
1847   if ( (NULL == zone_key) ||
1848        (NULL == name) )
1849   {
1850     /* what is this!? should never happen */
1851     GNUNET_break (0);
1852     return;
1853   }
1854   if (0 == proc->limit)
1855   {
1856     /* what is this!? should never happen */
1857     GNUNET_break (0);
1858     return;
1859   }
1860   proc->limit--;
1861   proc->zi->seq = seq;
1862   send_lookup_response (proc->zi->nc,
1863                         proc->zi->request_id,
1864                         zone_key,
1865                         name,
1866                         rd_count,
1867                         rd);
1868
1869
1870   do_refresh_block = GNUNET_NO;
1871   for (unsigned int i=0;i<rd_count;i++)
1872     if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
1873     {
1874       do_refresh_block = GNUNET_YES;
1875       break;
1876     }
1877   if (GNUNET_YES == do_refresh_block)    
1878     refresh_block (NULL,
1879                    proc->zi,
1880                    0,
1881                    zone_key,
1882                    name,
1883                    rd_count,
1884                    rd);
1885 }
1886
1887
1888 /**
1889  * Perform the next round of the zone iteration.
1890  *
1891  * @param zi zone iterator to process
1892  * @param limit number of results to return in one pass
1893  */
1894 static void
1895 run_zone_iteration_round (struct ZoneIteration *zi,
1896                           uint64_t limit)
1897 {
1898   struct ZoneIterationProcResult proc;
1899   struct GNUNET_TIME_Absolute start;
1900   struct GNUNET_TIME_Relative duration;
1901
1902   memset (&proc,
1903           0,
1904           sizeof (proc));
1905   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906               "Asked to return up to %llu records at position %llu\n",
1907               (unsigned long long) limit,
1908               (unsigned long long) zi->seq);
1909   proc.zi = zi;
1910   proc.limit = limit;
1911   start = GNUNET_TIME_absolute_get ();
1912   GNUNET_break (GNUNET_SYSERR !=
1913                 GSN_database->iterate_records (GSN_database->cls,
1914                                                (0 == memcmp (&zi->zone,
1915                                                              &zero,
1916                                                              sizeof (zero)))
1917                                                ? NULL
1918                                                : &zi->zone,
1919                                                zi->seq,
1920                                                limit,
1921                                                &zone_iterate_proc,
1922                                                &proc));
1923   duration = GNUNET_TIME_absolute_get_duration (start);
1924   duration = GNUNET_TIME_relative_divide (duration,
1925                                           limit - proc.limit);
1926   GNUNET_STATISTICS_set (statistics,
1927                          "NAMESTORE iteration delay (μs/record)",
1928                          duration.rel_value_us,
1929                          GNUNET_NO);
1930   if (0 == proc.limit)
1931     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1932                 "Returned %llu results, more results available\n",
1933                 (unsigned long long) limit);
1934   zi->send_end = (0 != proc.limit);
1935   if (0 == zi->cache_ops)
1936     zone_iteration_done_client_continue (zi);
1937 }
1938
1939
1940 /**
1941  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START message
1942  *
1943  * @param cls the client sending the message
1944  * @param zis_msg message from the client
1945  */
1946 static void
1947 handle_iteration_start (void *cls,
1948                         const struct ZoneIterationStartMessage *zis_msg)
1949 {
1950   struct NamestoreClient *nc = cls;
1951   struct ZoneIteration *zi;
1952
1953   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1954               "Received ZONE_ITERATION_START message\n");
1955   zi = GNUNET_new (struct ZoneIteration);
1956   zi->request_id = ntohl (zis_msg->gns_header.r_id);
1957   zi->offset = 0;
1958   zi->nc = nc;
1959   zi->zone = zis_msg->zone;
1960
1961   GNUNET_CONTAINER_DLL_insert (nc->op_head,
1962                                nc->op_tail,
1963                                zi);
1964   run_zone_iteration_round (zi,
1965                             1);
1966 }
1967
1968
1969 /**
1970  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP message
1971  *
1972  * @param cls the client sending the message
1973  * @param zis_msg message from the client
1974  */
1975 static void
1976 handle_iteration_stop (void *cls,
1977                        const struct ZoneIterationStopMessage *zis_msg)
1978 {
1979   struct NamestoreClient *nc = cls;
1980   struct ZoneIteration *zi;
1981   uint32_t rid;
1982
1983   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1984               "Received ZONE_ITERATION_STOP message\n");
1985   rid = ntohl (zis_msg->gns_header.r_id);
1986   for (zi = nc->op_head; NULL != zi; zi = zi->next)
1987     if (zi->request_id == rid)
1988       break;
1989   if (NULL == zi)
1990   {
1991     GNUNET_break (0);
1992     GNUNET_SERVICE_client_drop (nc->client);
1993     return;
1994   }
1995   GNUNET_CONTAINER_DLL_remove (nc->op_head,
1996                                nc->op_tail,
1997                                zi);
1998   GNUNET_free (zi);
1999   GNUNET_SERVICE_client_continue (nc->client);
2000 }
2001
2002
2003 /**
2004  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT message
2005  *
2006  * @param cls the client sending the message
2007  * @param message message from the client
2008  */
2009 static void
2010 handle_iteration_next (void *cls,
2011                        const struct ZoneIterationNextMessage *zis_msg)
2012 {
2013   struct NamestoreClient *nc = cls;
2014   struct ZoneIteration *zi;
2015   uint32_t rid;
2016   uint64_t limit;
2017
2018   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2019               "Received ZONE_ITERATION_NEXT message\n");
2020   GNUNET_STATISTICS_update (statistics,
2021                             "Iteration NEXT messages received",
2022                             1,
2023                             GNUNET_NO);
2024   rid = ntohl (zis_msg->gns_header.r_id);
2025   limit = GNUNET_ntohll (zis_msg->limit);
2026   for (zi = nc->op_head; NULL != zi; zi = zi->next)
2027     if (zi->request_id == rid)
2028       break;
2029   if (NULL == zi)
2030   {
2031     GNUNET_break (0);
2032     GNUNET_SERVICE_client_drop (nc->client);
2033     return;
2034   }
2035   run_zone_iteration_round (zi,
2036                             limit);
2037 }
2038
2039
2040 /**
2041  * Function called when the monitor is ready for more data, and we
2042  * should thus unblock PUT operations that were blocked on the
2043  * monitor not being ready.
2044  */
2045 static void
2046 monitor_unblock (struct ZoneMonitor *zm)
2047 {
2048   struct StoreActivity *sa = sa_head;
2049
2050   while ( (NULL != sa) &&
2051           (zm->limit > zm->iteration_cnt) )
2052   {
2053     struct StoreActivity *sn = sa->next;
2054
2055     if (sa->zm_pos == zm)
2056       continue_store_activity (sa);
2057     sa = sn;
2058   }
2059   if (zm->limit > zm->iteration_cnt)
2060   {
2061     zm->sa_waiting = GNUNET_NO;
2062     if (NULL != zm->sa_wait_warning)
2063     {
2064       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2065       zm->sa_wait_warning = NULL;
2066     }
2067   }
2068   else if (GNUNET_YES == zm->sa_waiting)
2069   {
2070     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2071     if (NULL != zm->sa_wait_warning)
2072       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2073     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2074                                                         &warn_monitor_slow,
2075                                                         zm);
2076   }
2077 }
2078
2079
2080 /**
2081  * Send 'sync' message to zone monitor, we're now in sync.
2082  *
2083  * @param zm monitor that is now in sync
2084  */
2085 static void
2086 monitor_sync (struct ZoneMonitor *zm)
2087 {
2088   struct GNUNET_MQ_Envelope *env;
2089   struct GNUNET_MessageHeader *sync;
2090
2091   env = GNUNET_MQ_msg (sync,
2092                        GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC);
2093   GNUNET_MQ_send (zm->nc->mq,
2094                   env);
2095   /* mark iteration done */
2096   zm->in_first_iteration = GNUNET_NO;
2097   zm->iteration_cnt = 0;
2098   if ( (zm->limit > 0) &&
2099        (zm->sa_waiting) )
2100     monitor_unblock (zm);
2101 }
2102
2103
2104 /**
2105  * Obtain the next datum during the zone monitor's zone initial iteration.
2106  *
2107  * @param cls zone monitor that does its initial iteration
2108  */
2109 static void
2110 monitor_iteration_next (void *cls);
2111
2112
2113 /**
2114  * A #GNUNET_NAMESTORE_RecordIterator for monitors.
2115  *
2116  * @param cls a 'struct ZoneMonitor *' with information about the monitor
2117  * @param seq sequence number of the record
2118  * @param zone_key zone key of the zone
2119  * @param name name
2120  * @param rd_count number of records in @a rd
2121  * @param rd array of records
2122  */
2123 static void
2124 monitor_iterate_cb (void *cls,
2125                     uint64_t seq,
2126                     const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone_key,
2127                     const char *name,
2128                     unsigned int rd_count,
2129                     const struct GNUNET_GNSRECORD_Data *rd)
2130 {
2131   struct ZoneMonitor *zm = cls;
2132
2133   zm->seq = seq;
2134   GNUNET_assert (NULL != name);
2135   GNUNET_STATISTICS_update (statistics,
2136                             "Monitor notifications sent",
2137                             1,
2138                             GNUNET_NO);
2139   zm->limit--;
2140   zm->iteration_cnt--;
2141   send_lookup_response (zm->nc,
2142                         0,
2143                         zone_key,
2144                         name,
2145                         rd_count,
2146                         rd);
2147   if ( (0 == zm->iteration_cnt) &&
2148        (0 != zm->limit) )
2149   {
2150     /* We are done with the current iteration batch, AND the
2151        client would right now accept more, so go again! */
2152     GNUNET_assert (NULL == zm->task);
2153     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2154                                          zm);
2155   }
2156 }
2157
2158
2159 /**
2160  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START message
2161  *
2162  * @param cls the client sending the message
2163  * @param zis_msg message from the client
2164  */
2165 static void
2166 handle_monitor_start (void *cls,
2167                       const struct ZoneMonitorStartMessage *zis_msg)
2168 {
2169   struct NamestoreClient *nc = cls;
2170   struct ZoneMonitor *zm;
2171
2172   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2173               "Received ZONE_MONITOR_START message\n");
2174   zm = GNUNET_new (struct ZoneMonitor);
2175   zm->nc = nc;
2176   zm->zone = zis_msg->zone;
2177   zm->limit = 1;
2178   zm->in_first_iteration = (GNUNET_YES == ntohl (zis_msg->iterate_first));
2179   GNUNET_CONTAINER_DLL_insert (monitor_head,
2180                                monitor_tail,
2181                                zm);
2182   GNUNET_SERVICE_client_mark_monitor (nc->client);
2183   GNUNET_SERVICE_client_continue (nc->client);
2184   GNUNET_notification_context_add (monitor_nc,
2185                                    nc->mq);
2186   if (zm->in_first_iteration)
2187     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2188                                          zm);
2189   else
2190     monitor_sync (zm);
2191 }
2192
2193
2194 /**
2195  * Obtain the next datum during the zone monitor's zone initial iteration.
2196  *
2197  * @param cls zone monitor that does its initial iteration
2198  */
2199 static void
2200 monitor_iteration_next (void *cls)
2201 {
2202   struct ZoneMonitor *zm = cls;
2203   int ret;
2204
2205   zm->task = NULL;
2206   GNUNET_assert (0 == zm->iteration_cnt);
2207   if (zm->limit > 16)
2208     zm->iteration_cnt = zm->limit / 2; /* leave half for monitor events */
2209   else
2210     zm->iteration_cnt = zm->limit; /* use it all */
2211   ret = GSN_database->iterate_records (GSN_database->cls,
2212                                        (0 == memcmp (&zm->zone,
2213                                                      &zero,
2214                                                      sizeof (zero)))
2215                                        ? NULL
2216                                        : &zm->zone,
2217                                        zm->seq,
2218                                        zm->iteration_cnt,
2219                                        &monitor_iterate_cb,
2220                                        zm);
2221   if (GNUNET_SYSERR == ret)
2222   {
2223     GNUNET_SERVICE_client_drop (zm->nc->client);
2224     return;
2225   }
2226   if (GNUNET_NO == ret)
2227   {
2228     /* empty zone */
2229     monitor_sync (zm);
2230     return;
2231   }
2232 }
2233
2234
2235 /**
2236  * Handles a #GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT message
2237  *
2238  * @param cls the client sending the message
2239  * @param nm message from the client
2240  */
2241 static void
2242 handle_monitor_next (void *cls,
2243                      const struct ZoneMonitorNextMessage *nm)
2244 {
2245   struct NamestoreClient *nc = cls;
2246   struct ZoneMonitor *zm;
2247   uint64_t inc;
2248
2249   inc = GNUNET_ntohll (nm->limit);
2250   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2251               "Received ZONE_MONITOR_NEXT message with limit %llu\n",
2252               (unsigned long long) inc);
2253   for (zm = monitor_head; NULL != zm; zm = zm->next)
2254     if (zm->nc == nc)
2255       break;
2256   if (NULL == zm)
2257   {
2258     GNUNET_break (0);
2259     GNUNET_SERVICE_client_drop (nc->client);
2260     return;
2261   }
2262   GNUNET_SERVICE_client_continue (nc->client);
2263   if (zm->limit + inc < zm->limit)
2264   {
2265     GNUNET_break (0);
2266     GNUNET_SERVICE_client_drop (nc->client);
2267     return;
2268   }
2269   zm->limit += inc;
2270   if ( (zm->in_first_iteration) &&
2271        (zm->limit == inc) )
2272   {
2273     /* We are still iterating, and the previous iteration must
2274        have stopped due to the client's limit, so continue it! */
2275     GNUNET_assert (NULL == zm->task);
2276     zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
2277                                          zm);
2278   }
2279   GNUNET_assert (zm->iteration_cnt <= zm->limit);
2280   if ( (zm->limit > zm->iteration_cnt) &&
2281        (zm->sa_waiting) )
2282   {
2283     monitor_unblock (zm);
2284   }
2285   else if (GNUNET_YES == zm->sa_waiting)
2286   {
2287     if (NULL != zm->sa_wait_warning)
2288       GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
2289     zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
2290     zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
2291                                                         &warn_monitor_slow,
2292                                                         zm);
2293   }
2294 }
2295
2296
2297 /**
2298  * Process namestore requests.
2299  *
2300  * @param cls closure
2301  * @param cfg configuration to use
2302  * @param service the initialized service
2303  */
2304 static void
2305 run (void *cls,
2306      const struct GNUNET_CONFIGURATION_Handle *cfg,
2307      struct GNUNET_SERVICE_Handle *service)
2308 {
2309   char *database;
2310
2311   (void) cls;
2312   (void) service;
2313   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2314               "Starting namestore service\n");
2315   cache_keys = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2316                                                      "namestore",
2317                                                      "CACHE_KEYS");
2318   disable_namecache = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2319                                                             "namecache",
2320                                                             "DISABLE");
2321   GSN_cfg = cfg;
2322   monitor_nc = GNUNET_notification_context_create (1);
2323   if (GNUNET_YES != disable_namecache)
2324   {
2325     namecache = GNUNET_NAMECACHE_connect (cfg);
2326     GNUNET_assert (NULL != namecache);
2327   }
2328   /* Loading database plugin */
2329   if (GNUNET_OK !=
2330       GNUNET_CONFIGURATION_get_value_string (cfg,
2331                                              "namestore",
2332                                              "database",
2333                                              &database))
2334     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2335                 "No database backend configured\n");
2336
2337   GNUNET_asprintf (&db_lib_name,
2338                    "libgnunet_plugin_namestore_%s",
2339                    database);
2340   GSN_database = GNUNET_PLUGIN_load (db_lib_name,
2341                                      (void *) GSN_cfg);
2342   GNUNET_free (database);
2343   statistics = GNUNET_STATISTICS_create ("namestore",
2344                                          cfg);
2345   GNUNET_SCHEDULER_add_shutdown (&cleanup_task,
2346                                  NULL);
2347   if (NULL == GSN_database)
2348   {
2349     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2350                 "Could not load database backend `%s'\n",
2351                 db_lib_name);
2352     GNUNET_SCHEDULER_shutdown ();
2353     return;
2354   }
2355 }
2356
2357
2358 /**
2359  * Define "main" method using service macro.
2360  */
2361 GNUNET_SERVICE_MAIN
2362 ("namestore",
2363  GNUNET_SERVICE_OPTION_NONE,
2364  &run,
2365  &client_connect_cb,
2366  &client_disconnect_cb,
2367  NULL,
2368  GNUNET_MQ_hd_var_size (record_store,
2369                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE,
2370                         struct RecordStoreMessage,
2371                         NULL),
2372  GNUNET_MQ_hd_var_size (record_lookup,
2373                         GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_LOOKUP,
2374                         struct LabelLookupMessage,
2375                         NULL),
2376  GNUNET_MQ_hd_fixed_size (zone_to_name,
2377                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_TO_NAME,
2378                           struct ZoneToNameMessage,
2379                           NULL),
2380  GNUNET_MQ_hd_fixed_size (iteration_start,
2381                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_START,
2382                           struct ZoneIterationStartMessage,
2383                           NULL),
2384  GNUNET_MQ_hd_fixed_size (iteration_next,
2385                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_NEXT,
2386                           struct ZoneIterationNextMessage,
2387                           NULL),
2388  GNUNET_MQ_hd_fixed_size (iteration_stop,
2389                           GNUNET_MESSAGE_TYPE_NAMESTORE_ZONE_ITERATION_STOP,
2390                           struct ZoneIterationStopMessage,
2391                           NULL),
2392  GNUNET_MQ_hd_fixed_size (monitor_start,
2393                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START,
2394                           struct ZoneMonitorStartMessage,
2395                           NULL),
2396  GNUNET_MQ_hd_fixed_size (monitor_next,
2397                           GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT,
2398                           struct ZoneMonitorNextMessage,
2399                           NULL),
2400  GNUNET_MQ_handler_end ());
2401
2402
2403 /* end of gnunet-service-namestore.c */