fix #4244
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20 /**
21  * @file peerstore/peerstore_api.c
22  * @brief API for peerstore
23  * @author Omar Tarabai
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Message queue
50    */
51   struct GNUNET_MQ_Handle *mq;
52
53   /**
54    * Head of active STORE requests.
55    */
56   struct GNUNET_PEERSTORE_StoreContext *store_head;
57
58   /**
59    * Tail of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_tail;
62
63   /**
64    * Head of active ITERATE requests.
65    */
66   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
67
68   /**
69    * Tail of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
72
73   /**
74    * Hashmap of watch requests
75    */
76   struct GNUNET_CONTAINER_MultiHashMap *watches;
77
78   /**
79    * ID of the task trying to reconnect to the service.
80    */
81   struct GNUNET_SCHEDULER_Task *reconnect_task;
82
83   /**
84    * Delay until we try to reconnect.
85    */
86   struct GNUNET_TIME_Relative reconnect_delay;
87
88   /**
89    * Are we in the process of disconnecting but need to sync first?
90    */
91   int disconnecting;
92
93 };
94
95 /**
96  * Context for a store request
97  */
98 struct GNUNET_PEERSTORE_StoreContext
99 {
100   /**
101    * Kept in a DLL.
102    */
103   struct GNUNET_PEERSTORE_StoreContext *next;
104
105   /**
106    * Kept in a DLL.
107    */
108   struct GNUNET_PEERSTORE_StoreContext *prev;
109
110   /**
111    * Handle to the PEERSTORE service.
112    */
113   struct GNUNET_PEERSTORE_Handle *h;
114
115   /**
116    * Continuation called with service response
117    */
118   GNUNET_PEERSTORE_Continuation cont;
119
120   /**
121    * Closure for @e cont
122    */
123   void *cont_cls;
124
125   /**
126    * Which subsystem does the store?
127    */
128   char *sub_system;
129
130   /**
131    * Key for the store operation.
132    */
133   char *key;
134
135   /**
136    * Contains @e size bytes.
137    */
138   void *value;
139
140   /**
141    * Peer the store is for.
142    */
143   struct GNUNET_PeerIdentity peer;
144
145   /**
146    * Number of bytes in @e value.
147    */
148   size_t size;
149
150   /**
151    * When does the value expire?
152    */
153   struct GNUNET_TIME_Absolute expiry;
154
155   /**
156    * Options for the store operation.
157    */
158   enum GNUNET_PEERSTORE_StoreOption options;
159
160 };
161
162 /**
163  * Context for a iterate request
164  */
165 struct GNUNET_PEERSTORE_IterateContext
166 {
167   /**
168    * Kept in a DLL.
169    */
170   struct GNUNET_PEERSTORE_IterateContext *next;
171
172   /**
173    * Kept in a DLL.
174    */
175   struct GNUNET_PEERSTORE_IterateContext *prev;
176
177   /**
178    * Handle to the PEERSTORE service.
179    */
180   struct GNUNET_PEERSTORE_Handle *h;
181
182   /**
183    * Which subsystem does the store?
184    */
185   char *sub_system;
186
187   /**
188    * Peer the store is for.
189    */
190   struct GNUNET_PeerIdentity peer;
191
192   /**
193    * Key for the store operation.
194    */
195   char *key;
196
197   /**
198    * Callback with each matching record
199    */
200   GNUNET_PEERSTORE_Processor callback;
201
202   /**
203    * Closure for @e callback
204    */
205   void *callback_cls;
206
207   /**
208    * #GNUNET_YES if we are currently processing records.
209    */
210   int iterating;
211
212 };
213
214 /**
215  * Context for a watch request
216  */
217 struct GNUNET_PEERSTORE_WatchContext
218 {
219   /**
220    * Kept in a DLL.
221    */
222   struct GNUNET_PEERSTORE_WatchContext *next;
223
224   /**
225    * Kept in a DLL.
226    */
227   struct GNUNET_PEERSTORE_WatchContext *prev;
228
229   /**
230    * Handle to the PEERSTORE service.
231    */
232   struct GNUNET_PEERSTORE_Handle *h;
233
234   /**
235    * Callback with each record received
236    */
237   GNUNET_PEERSTORE_Processor callback;
238
239   /**
240    * Closure for @e callback
241    */
242   void *callback_cls;
243
244   /**
245    * Hash of the combined key
246    */
247   struct GNUNET_HashCode keyhash;
248
249 };
250
251 /******************************************************************************/
252 /*******************             DECLARATIONS             *********************/
253 /******************************************************************************/
254
255 /**
256  * Close the existing connection to PEERSTORE and reconnect.
257  *
258  * @param cls a `struct GNUNET_PEERSTORE_Handle *h`
259  */
260 static void
261 reconnect (void *cls);
262
263
264 /**
265  * Disconnect from the peerstore service.
266  *
267  * @param h peerstore handle to disconnect
268  */
269 static void
270 disconnect (struct GNUNET_PEERSTORE_Handle *h)
271 {
272   struct GNUNET_PEERSTORE_IterateContext *next;
273
274   for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head;
275        NULL != ic;
276        ic = next)
277   {
278     next = ic->next;
279     if (GNUNET_YES == ic->iterating)
280     {
281       GNUNET_PEERSTORE_Processor icb;
282       void *icb_cls;
283
284       icb = ic->callback;
285       icb_cls = ic->callback_cls;
286       GNUNET_PEERSTORE_iterate_cancel (ic);
287       if (NULL != icb)
288         icb (icb_cls,
289              NULL,
290              "Iteration canceled due to reconnection");
291     }
292   }
293
294   if (NULL != h->mq)
295   {
296     GNUNET_MQ_destroy (h->mq);
297     h->mq = NULL;
298   }
299 }
300
301
302 /**
303  * Function that will schedule the job that will try
304  * to connect us again to the client.
305  *
306  * @param h peerstore to reconnect
307  */
308 static void
309 disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h)
310 {
311   GNUNET_assert (NULL == h->reconnect_task);
312   disconnect (h);
313   LOG (GNUNET_ERROR_TYPE_DEBUG,
314        "Scheduling task to reconnect to PEERSTORE service in %s.\n",
315        GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
316                                                GNUNET_YES));
317   h->reconnect_task =
318       GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
319                                     &reconnect,
320                                     h);
321   h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
322 }
323
324
325
326 /**
327  * Callback after MQ envelope is sent
328  *
329  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
330  */
331 static void
332 store_request_sent (void *cls)
333 {
334   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
335   GNUNET_PEERSTORE_Continuation cont;
336   void *cont_cls;
337
338   cont = sc->cont;
339   cont_cls = sc->cont_cls;
340   GNUNET_PEERSTORE_store_cancel (sc);
341   if (NULL != cont)
342     cont (cont_cls, GNUNET_OK);
343 }
344
345
346 /******************************************************************************/
347 /*******************         CONNECTION FUNCTIONS         *********************/
348 /******************************************************************************/
349
350
351 /**
352  * Function called when we had trouble talking to the service.
353  */
354 static void
355 handle_client_error (void *cls,
356                      enum GNUNET_MQ_Error error)
357 {
358   struct GNUNET_PEERSTORE_Handle *h = cls;
359
360   LOG (GNUNET_ERROR_TYPE_ERROR,
361        "Received an error notification from MQ of type: %d\n",
362        error);
363   disconnect_and_schedule_reconnect (h);
364 }
365
366
367 /**
368  * Iterator over previous watches to resend them
369  *
370  * @param cls the `struct GNUNET_PEERSTORE_Handle`
371  * @param key key for the watch
372  * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
373  * @return #GNUNET_YES (continue to iterate)
374  */
375 static int
376 rewatch_it (void *cls,
377             const struct GNUNET_HashCode *key,
378             void *value)
379 {
380   struct GNUNET_PEERSTORE_Handle *h = cls;
381   struct GNUNET_PEERSTORE_WatchContext *wc = value;
382   struct StoreKeyHashMessage *hm;
383   struct GNUNET_MQ_Envelope *ev;
384
385   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
386   hm->keyhash = wc->keyhash;
387   GNUNET_MQ_send (h->mq, ev);
388   return GNUNET_YES;
389 }
390
391
392 /**
393  * Iterator over watch requests to cancel them.
394  *
395  * @param cls unsused
396  * @param key key to the watch request
397  * @param value watch context
398  * @return #GNUNET_YES to continue iteration
399  */
400 static int
401 destroy_watch (void *cls,
402                const struct GNUNET_HashCode *key,
403                void *value)
404 {
405   struct GNUNET_PEERSTORE_WatchContext *wc = value;
406
407   GNUNET_PEERSTORE_watch_cancel (wc);
408   return GNUNET_YES;
409 }
410
411
412 /**
413  * Kill the connection to the service. This can be delayed in case of pending
414  * STORE requests and the user explicitly asked to sync first. Otherwise it is
415  * performed instantly.
416  *
417  * @param h Handle to the service.
418  */
419 static void
420 final_disconnect (struct GNUNET_PEERSTORE_Handle *h)
421 {
422   if (NULL != h->mq)
423   {
424     GNUNET_MQ_destroy (h->mq);
425     h->mq = NULL;
426   }
427   GNUNET_free (h);
428 }
429
430
431 /**
432  * Connect to the PEERSTORE service.
433  *
434  * @param cfg configuration to use
435  * @return NULL on error
436  */
437 struct GNUNET_PEERSTORE_Handle *
438 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
439 {
440   struct GNUNET_PEERSTORE_Handle *h;
441
442   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
443   h->cfg = cfg;
444   h->disconnecting = GNUNET_NO;
445   reconnect (h);
446   if (NULL == h->mq)
447   {
448     GNUNET_free (h);
449     return NULL;
450   }
451   return h;
452 }
453
454
455 /**
456  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
457  * will be canceled.
458  * Any pending STORE requests will depend on @e snyc_first flag.
459  *
460  * @param h handle to disconnect
461  * @param sync_first send any pending STORE requests before disconnecting
462  */
463 void
464 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
465                              int sync_first)
466 {
467   struct GNUNET_PEERSTORE_IterateContext *ic;
468   struct GNUNET_PEERSTORE_StoreContext *sc;
469
470   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
471   if (NULL != h->watches)
472   {
473     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
474     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
475     h->watches = NULL;
476   }
477   while (NULL != (ic = h->iterate_head))
478   {
479     GNUNET_break (0);
480     GNUNET_PEERSTORE_iterate_cancel (ic);
481   }
482   if (NULL != h->store_head)
483   {
484     if (GNUNET_YES == sync_first)
485     {
486       LOG (GNUNET_ERROR_TYPE_DEBUG,
487            "Delaying disconnection due to pending store requests.\n");
488       h->disconnecting = GNUNET_YES;
489       return;
490     }
491     while (NULL != (sc = h->store_head))
492       GNUNET_PEERSTORE_store_cancel (sc);
493   }
494   final_disconnect (h);
495 }
496
497
498 /******************************************************************************/
499 /*******************            STORE FUNCTIONS           *********************/
500 /******************************************************************************/
501
502
503 /**
504  * Cancel a store request
505  *
506  * @param sc Store request context
507  */
508 void
509 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
510 {
511   struct GNUNET_PEERSTORE_Handle *h = sc->h;
512
513   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
514   GNUNET_free (sc->sub_system);
515   GNUNET_free (sc->value);
516   GNUNET_free (sc->key);
517   GNUNET_free (sc);
518   if ( (GNUNET_YES == h->disconnecting) &&
519        (NULL == h->store_head) )
520     final_disconnect (h);
521 }
522
523
524 /**
525  * Store a new entry in the PEERSTORE.
526  * Note that stored entries can be lost in some cases
527  * such as power failure.
528  *
529  * @param h Handle to the PEERSTORE service
530  * @param sub_system name of the sub system
531  * @param peer Peer Identity
532  * @param key entry key
533  * @param value entry value BLOB
534  * @param size size of @e value
535  * @param expiry absolute time after which the entry is (possibly) deleted
536  * @param options options specific to the storage operation
537  * @param cont Continuation function after the store request is sent
538  * @param cont_cls Closure for @a cont
539  */
540 struct GNUNET_PEERSTORE_StoreContext *
541 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
542                         const char *sub_system,
543                         const struct GNUNET_PeerIdentity *peer,
544                         const char *key,
545                         const void *value, size_t size,
546                         struct GNUNET_TIME_Absolute expiry,
547                         enum GNUNET_PEERSTORE_StoreOption options,
548                         GNUNET_PEERSTORE_Continuation cont,
549                         void *cont_cls)
550 {
551   struct GNUNET_MQ_Envelope *ev;
552   struct GNUNET_PEERSTORE_StoreContext *sc;
553
554   LOG (GNUNET_ERROR_TYPE_DEBUG,
555        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
556        size, sub_system, GNUNET_i2s (peer), key);
557   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
558                                             expiry, options,
559                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
560   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
561
562   sc->sub_system = GNUNET_strdup (sub_system);
563   sc->peer = *peer;
564   sc->key = GNUNET_strdup (key);
565   sc->value = GNUNET_memdup (value, size);
566   sc->size = size;
567   sc->expiry = expiry;
568   sc->options = options;
569   sc->cont = cont;
570   sc->cont_cls = cont_cls;
571   sc->h = h;
572
573   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
574   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
575   GNUNET_MQ_send (h->mq, ev);
576   return sc;
577
578 }
579
580
581 /******************************************************************************/
582 /*******************           ITERATE FUNCTIONS          *********************/
583 /******************************************************************************/
584
585
586 /**
587  * When a response for iterate request is received
588  *
589  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
590  * @param msg message received
591  */
592 static void
593 handle_iterate_end (void *cls,
594                     const struct GNUNET_MessageHeader *msg)
595 {
596   struct GNUNET_PEERSTORE_Handle *h = cls;
597   struct GNUNET_PEERSTORE_IterateContext *ic;
598   GNUNET_PEERSTORE_Processor callback;
599   void *callback_cls;
600
601   ic = h->iterate_head;
602   if (NULL == ic)
603   {
604     LOG (GNUNET_ERROR_TYPE_ERROR,
605          _("Unexpected iteration response, this should not happen.\n"));
606     disconnect_and_schedule_reconnect (h);
607     return;
608   }
609   callback = ic->callback;
610   callback_cls = ic->callback_cls;
611   ic->iterating = GNUNET_NO;
612   GNUNET_PEERSTORE_iterate_cancel (ic);
613   if (NULL != callback)
614     callback (callback_cls,
615               NULL,
616               NULL);
617   h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
618 }
619
620
621 /**
622  * When a response for iterate request is received, check the
623  * message is well-formed.
624  *
625  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
626  * @param msg message received
627  */
628 static int
629 check_iterate_result (void *cls,
630                       const struct StoreRecordMessage *msg)
631 {
632   /* we defer validation to #handle_iterate_result */
633   return GNUNET_OK;
634 }
635
636
637 /**
638  * When a response for iterate request is received
639  *
640  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
641  * @param msg message received
642  */
643 static void
644 handle_iterate_result (void *cls,
645                        const struct StoreRecordMessage *msg)
646 {
647   struct GNUNET_PEERSTORE_Handle *h = cls;
648   struct GNUNET_PEERSTORE_IterateContext *ic;
649   GNUNET_PEERSTORE_Processor callback;
650   void *callback_cls;
651   struct GNUNET_PEERSTORE_Record *record;
652
653   ic = h->iterate_head;
654   if (NULL == ic)
655   {
656     LOG (GNUNET_ERROR_TYPE_ERROR,
657          _("Unexpected iteration response, this should not happen.\n"));
658     disconnect_and_schedule_reconnect (h);
659     return;
660   }
661   ic->iterating = GNUNET_YES;
662   callback = ic->callback;
663   callback_cls = ic->callback_cls;
664   if (NULL == callback)
665     return;
666   record = PEERSTORE_parse_record_message (msg);
667   if (NULL == record)
668   {
669     callback (callback_cls,
670               NULL,
671               _("Received a malformed response from service."));
672   }
673   else
674   {
675     callback (callback_cls,
676               record,
677               NULL);
678     PEERSTORE_destroy_record (record);
679   }
680 }
681
682
683 /**
684  * Cancel an iterate request
685  * Please do not call after the iterate request is done
686  *
687  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
688  */
689 void
690 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
691 {
692   if (GNUNET_NO == ic->iterating)
693   {
694     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
695                                  ic->h->iterate_tail,
696                                  ic);
697     GNUNET_free (ic->sub_system);
698     GNUNET_free_non_null (ic->key);
699     GNUNET_free (ic);
700   }
701   else
702     ic->callback = NULL;
703 }
704
705
706 /**
707  * Iterate over records matching supplied key information
708  *
709  * @param h handle to the PEERSTORE service
710  * @param sub_system name of sub system
711  * @param peer Peer identity (can be NULL)
712  * @param key entry key string (can be NULL)
713  * @param callback function called with each matching record, all NULL's on end
714  * @param callback_cls closure for @a callback
715  * @return Handle to iteration request
716  */
717 struct GNUNET_PEERSTORE_IterateContext *
718 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
719                           const char *sub_system,
720                           const struct GNUNET_PeerIdentity *peer,
721                           const char *key,
722                           GNUNET_PEERSTORE_Processor callback,
723                           void *callback_cls)
724 {
725   struct GNUNET_MQ_Envelope *ev;
726   struct GNUNET_PEERSTORE_IterateContext *ic;
727
728   ev = PEERSTORE_create_record_mq_envelope (sub_system,
729                                             peer,
730                                             key,
731                                             NULL, 0,
732                                             GNUNET_TIME_UNIT_FOREVER_ABS,
733                                             0,
734                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
735   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
736   ic->callback = callback;
737   ic->callback_cls = callback_cls;
738   ic->h = h;
739   ic->sub_system = GNUNET_strdup (sub_system);
740   if (NULL != peer)
741     ic->peer = *peer;
742   if (NULL != key)
743     ic->key = GNUNET_strdup (key);
744   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
745                                     h->iterate_tail,
746                                     ic);
747   LOG (GNUNET_ERROR_TYPE_DEBUG,
748        "Sending an iterate request for sub system `%s'\n",
749        sub_system);
750   GNUNET_MQ_send (h->mq, ev);
751   return ic;
752 }
753
754
755 /******************************************************************************/
756 /*******************            WATCH FUNCTIONS           *********************/
757 /******************************************************************************/
758
759 /**
760  * When a watch record is received, validate it is well-formed.
761  *
762  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
763  * @param msg message received
764  */
765 static int
766 check_watch_record (void *cls,
767                     const struct StoreRecordMessage *msg)
768 {
769   /* we defer validation to #handle_watch_result */
770   return GNUNET_OK;
771 }
772
773
774 /**
775  * When a watch record is received, process it.
776  *
777  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
778  * @param msg message received
779  */
780 static void
781 handle_watch_record (void *cls,
782                      const struct StoreRecordMessage *msg)
783 {
784   struct GNUNET_PEERSTORE_Handle *h = cls;
785   struct GNUNET_PEERSTORE_Record *record;
786   struct GNUNET_HashCode keyhash;
787   struct GNUNET_PEERSTORE_WatchContext *wc;
788
789   LOG (GNUNET_ERROR_TYPE_DEBUG,
790        "Received a watch record from service.\n");
791   record = PEERSTORE_parse_record_message (msg);
792   if (NULL == record)
793   {
794     disconnect_and_schedule_reconnect (h);
795     return;
796   }
797   PEERSTORE_hash_key (record->sub_system,
798                       &record->peer,
799                       record->key,
800                       &keyhash);
801   // FIXME: what if there are multiple watches for the same key?
802   wc = GNUNET_CONTAINER_multihashmap_get (h->watches,
803                                           &keyhash);
804   if (NULL == wc)
805   {
806     LOG (GNUNET_ERROR_TYPE_ERROR,
807          _("Received a watch result for a non existing watch.\n"));
808     PEERSTORE_destroy_record (record);
809     disconnect_and_schedule_reconnect (h);
810     return;
811   }
812   if (NULL != wc->callback)
813     wc->callback (wc->callback_cls,
814                   record,
815                   NULL);
816   h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
817   PEERSTORE_destroy_record (record);
818 }
819
820
821 /**
822  * Close the existing connection to PEERSTORE and reconnect.
823  *
824  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
825  */
826 static void
827 reconnect (void *cls)
828 {
829   struct GNUNET_PEERSTORE_Handle *h = cls;
830   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
831     GNUNET_MQ_hd_fixed_size (iterate_end,
832                              GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
833                              struct GNUNET_MessageHeader,
834                              h),
835     GNUNET_MQ_hd_var_size (iterate_result,
836                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
837                            struct StoreRecordMessage,
838                            h),
839     GNUNET_MQ_hd_var_size (watch_record,
840                            GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
841                            struct StoreRecordMessage,
842                            h),
843     GNUNET_MQ_handler_end ()
844   };
845   struct GNUNET_MQ_Envelope *ev;
846
847   LOG (GNUNET_ERROR_TYPE_DEBUG,
848        "Reconnecting...\n");
849   h->mq = GNUNET_CLIENT_connect (h->cfg,
850                                  "peerstore",
851                                  mq_handlers,
852                                  &handle_client_error,
853                                  h);
854   if (NULL == h->mq)
855     return;
856   LOG (GNUNET_ERROR_TYPE_DEBUG,
857        "Resending pending requests after reconnect.\n");
858   if (NULL != h->watches)
859     GNUNET_CONTAINER_multihashmap_iterate (h->watches,
860                                            &rewatch_it,
861                                            h);
862   for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head;
863        NULL != ic;
864        ic = ic->next)
865   {
866     ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
867                                               &ic->peer,
868                                               ic->key,
869                                               NULL, 0,
870                                               GNUNET_TIME_UNIT_FOREVER_ABS,
871                                               0,
872                                               GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
873     GNUNET_MQ_send (h->mq, ev);
874   }
875   for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head;
876        NULL != sc;
877        sc = sc->next)
878   {
879     ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
880                                               &sc->peer,
881                                               sc->key,
882                                               sc->value,
883                                               sc->size,
884                                               sc->expiry,
885                                               sc->options,
886                                               GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
887     GNUNET_MQ_notify_sent (ev,
888                            &store_request_sent,
889                            sc);
890     GNUNET_MQ_send (h->mq,
891                     ev);
892   }
893 }
894
895
896 /**
897  * Cancel a watch request
898  *
899  * @param wc handle to the watch request
900  */
901 void
902 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
903 {
904   struct GNUNET_PEERSTORE_Handle *h = wc->h;
905   struct GNUNET_MQ_Envelope *ev;
906   struct StoreKeyHashMessage *hm;
907
908   LOG (GNUNET_ERROR_TYPE_DEBUG,
909        "Canceling watch.\n");
910   ev = GNUNET_MQ_msg (hm,
911                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
912   hm->keyhash = wc->keyhash;
913   GNUNET_MQ_send (h->mq, ev);
914   GNUNET_CONTAINER_multihashmap_remove (h->watches,
915                                         &wc->keyhash,
916                                         wc);
917   GNUNET_free (wc);
918 }
919
920
921 /**
922  * Request watching a given key
923  * User will be notified with any new values added to key
924  *
925  * @param h handle to the PEERSTORE service
926  * @param sub_system name of sub system
927  * @param peer Peer identity
928  * @param key entry key string
929  * @param callback function called with each new value
930  * @param callback_cls closure for @a callback
931  * @return Handle to watch request
932  */
933 struct GNUNET_PEERSTORE_WatchContext *
934 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
935                         const char *sub_system,
936                         const struct GNUNET_PeerIdentity *peer,
937                         const char *key,
938                         GNUNET_PEERSTORE_Processor callback,
939                         void *callback_cls)
940 {
941   struct GNUNET_MQ_Envelope *ev;
942   struct StoreKeyHashMessage *hm;
943   struct GNUNET_PEERSTORE_WatchContext *wc;
944
945   ev = GNUNET_MQ_msg (hm,
946                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
947   PEERSTORE_hash_key (sub_system,
948                       peer,
949                       key,
950                       &hm->keyhash);
951   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
952   wc->callback = callback;
953   wc->callback_cls = callback_cls;
954   wc->h = h;
955   wc->keyhash = hm->keyhash;
956   if (NULL == h->watches)
957     h->watches = GNUNET_CONTAINER_multihashmap_create (5,
958                                                        GNUNET_NO);
959   GNUNET_assert (GNUNET_OK ==
960                  GNUNET_CONTAINER_multihashmap_put (h->watches,
961                                                     &wc->keyhash,
962                                                     wc,
963                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
964   LOG (GNUNET_ERROR_TYPE_DEBUG,
965        "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
966        sub_system,
967        GNUNET_i2s (peer),
968        key);
969   GNUNET_MQ_send (h->mq,
970                   ev);
971   return wc;
972 }
973
974 /* end of peerstore_api.c */